Docs Redpanda Quickstart Brand’s Hardware & Software Requirements Bloblang Syntax Highlighting Test Test page for Bloblang syntax highlighting and interactive features. Hover over functions and methods to see documentation. Click "Try It" to execute mappings! Pure Bloblang Code Block root = this root.id = uuid_v4() root.timestamp = now() # Variables let foo = "bar" root.message = $foo # Methods and functions root.uppercased = this.name.uppercase() root.replaced = this.text.replace_all("old", "new") # Boolean and arithmetic root.is_valid = this.count > 10 && this.status == "active" root.total = this.price * this.quantity # Arrays and objects root.items = this.data.map_each(item -> { "name": item.name.uppercase(), "value": item.value * 2 }) # Deletions root.sensitive = deleted() # In: {"name": "alice", "text": "hello old world", "count": 15, "status": "active", "price": 10.5, "quantity": 3, "data": [{"name": "item1", "value": 5}, {"name": "item2", "value": 10}], "sensitive": "secret123"} Bloblang with Alias (blobl) root.transformed = this.input.uppercase().trim() root.uuid = uuid_v4() # In: {"input": " hello world "} Bloblang in YAML Pipeline input: kafka: addresses: ["localhost:9092"] topics: ["my-topic"] pipeline: processors: - mapping: | # Transform the message root.id = uuid_v4() root.timestamp = now() root.data = this.payload.parse_json() # Uppercase all text fields root.title = this.payload.title.uppercase() root.body = this.payload.body # Add metadata meta processed_at = now() meta source = @kafka_topic output: http_client: url: "https://api.example.com/data" verb: POST Complex Bloblang Example root = this root.id = uuid_v4() # Conditional transformations root.status = if this.score > 80 { "excellent" } else if this.score > 60 { "good" } else { "needs_improvement" } # Array operations root.filtered_items = this.items.filter(item -> item.active == true) root.unique_tags = this.tags.unique() # String operations root.email = this.email.lowercase().trim() root.name = this.first_name + " " + this.last_name # Error handling root.safe_value = this.risky_field.catch("default_value") # Delete sensitive fields root.password = deleted() # In: {"score": 85, "items": [{"name": "a", "active": true}, {"name": "b", "active": false}, {"name": "c", "active": true}], "tags": ["red", "blue", "red", "green"], "email": " JOHN@EXAMPLE.COM ", "first_name": "John", "last_name": "Doe", "password": "secret"} Testing All Token Types # Simple example showing basic transformations root.message = this.text.uppercase() root.length = this.text.length() root.sorted = this.items.sort() root.unique_nums = this.numbers.unique() root.id = uuid_v4() # In: {"text": "hello world", "items": ["c", "a", "b"], "numbers": [1, 2, 2, 3, 3, 3]} Multi-line Input Format # This example shows the multi-line # In: format root.fullName = this.first + " " + this.last root.age = this.age + 1 # In: # { # "first": "Jane", # "last": "Smith", # "age": 29 # } YAML Pipeline with Switch and Check input: kafka: addresses: ["localhost:9092"] topics: ["events"] pipeline: processors: - switch: - check: this.type == "order" processors: - mapping: | root = this root.processed_at = now() root.order_id = uuid_v4() - check: this.type == "user" processors: - mapping: | root.user = this root.user.email = this.email.lowercase() output: stdout: {} YAML with Branch Processor pipeline: processors: - branch: request_map: | root.url = "https://api.example.com/enrich" root.body = this.id processors: - http: url: "${! this.url }" verb: POST result_map: | root.enriched = this root.enriched.fetched_at = now() YAML with Interpolation Functions input: kafka: addresses: ["localhost:9092"] topics: ["events"] consumer_group: "${! env(\"CONSUMER_GROUP\") }" pipeline: processors: - cache: resource: my_cache key: "${! meta(\"kafka_key\") }" value: "${! content().hash(\"xxhash64\") }" - dedupe: key: "${! this.id }-${! meta(\"kafka_partition\") }" output: http_client: url: "${! env(\"TARGET_URL\") }/ingest" MCP Tool: Weather Service Real-world example from Redpanda Connect MCP tools. label: weather-service processors: - label: validate_inputs mutation: | # Validate and sanitize city input meta city = this.city.string(). re_replace_all("[^a-zA-Z\\s\\-]", ""). trim() # Check for empty input root = if @city == "" { throw("City name cannot be empty") } else { "" } - label: fetch_weather_data try: - http: url: "https://wttr.in/${! @city }?format=j1" verb: GET headers: User-Agent: "redpanda-connect-mcp/1.0" timeout: "10s" - mutation: | root = { "city": @city, "temperature_c": this.current_condition.0.temp_C.number(), "temperature_f": this.current_condition.0.temp_F.number(), "feels_like_c": this.current_condition.0.FeelsLikeC.number(), "humidity": this.current_condition.0.humidity.number(), "description": this.current_condition.0.weatherDesc.0.value, "wind_speed_kmh": this.current_condition.0.windspeedKmph.number(), "timestamp": now().format_timestamp("2006-01-02T15:04:05Z07:00") } - log: message: "Weather data fetched for city: ${! @city }" level: "INFO" - label: handle_weather_errors catch: - mutation: | root = { "error": "Failed to fetch weather data", "city": @city, "details": error(), "timestamp": now().format_timestamp("2006-01-02T15:04:05Z07:00") } - log: message: "Weather API error for city ${! @city }: ${! error() }" level: "ERROR" meta: tags: [ weather, example ] mcp: enabled: true description: "Get current weather conditions for any city worldwide" MCP Tool: Conditional Processing Switch-based conditional processing with different data formats. label: parse-data-by-type processors: - label: conditional_processing switch: - check: this.data_type == "json" processors: - mapping: | root.parsed_data = this.content.parse_json() root.format = "json" - check: this.data_type == "csv" processors: - mapping: | root.parsed_data = this.content.parse_csv() root.format = "csv" - processors: - mapping: | root.error = "Unsupported data type" root.supported_types = ["json", "csv"] meta: mcp: enabled: true description: "Parse data based on specified type" Pipeline: Error Handling with Try/Catch input: generate: count: 1 interval: "" mapping: | root = {"request": "data"} pipeline: processors: - try: - http: url: "https://api.example.com/data" verb: GET - catch: - mutation: | root.error = true root.message = "Request failed: " + error() output: stdout: {} Regular YAML (Kubernetes - Should NOT Be Processed) This Kubernetes manifest should remain as normal YAML without Bloblang highlighting. apiVersion: apps/v1 kind: Deployment metadata: name: my-app labels: app: my-app spec: replicas: 3 selector: matchLabels: app: my-app template: metadata: labels: app: my-app spec: containers: - name: my-app image: my-registry/my-app:latest ports: - containerPort: 8080 env: - name: DATABASE_URL value: "postgres://localhost:5432/mydb" Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback!