This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7b235f8b2a6 Add more YAML examples involving Kafka and Iceberg (#35151)
7b235f8b2a6 is described below
commit 7b235f8b2a6998b9b317f4f00e50d3a01424959b
Author: Charles Nguyen <[email protected]>
AuthorDate: Mon Jun 9 18:13:35 2025 -0400
Add more YAML examples involving Kafka and Iceberg (#35151)
* Add more YAML examples involving Kafka and Iceberg
* Fix some missed details from rebasing
* Adding unit tests for YAML examples
* Clean up and address PR comments
* Formatting
* Formatting
---
sdks/python/apache_beam/yaml/examples/README.md | 165 +++++++++++++++++++--
.../yaml/examples/testing/examples_test.py | 95 +++++++++++-
.../yaml/examples/testing/input_data.py | 21 +++
.../yaml/examples/transforms/io/iceberg_read.yaml | 54 +++++++
.../yaml/examples/transforms/io/iceberg_write.yaml | 62 ++++++++
.../yaml/examples/transforms/io/kafka.yaml | 96 ++++++++++++
6 files changed, 476 insertions(+), 17 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/examples/README.md
b/sdks/python/apache_beam/yaml/examples/README.md
index 009245f6f14..70533655a6a 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -20,34 +20,40 @@
# Examples Catalog
<!-- TOC -->
+
* [Examples Catalog](#examples-catalog)
- * [Wordcount](#wordcount)
- * [Transforms](#transforms)
- * [Aggregation](#aggregation)
- * [Blueprints](#blueprints)
- * [Element-wise](#element-wise)
- * [IO](#io)
- * [ML](#ml)
+ * [Wordcount](#wordcount)
+ * [Transforms](#transforms)
+ * [Aggregation](#aggregation)
+ * [Blueprints](#blueprints)
+ * [Element-wise](#element-wise)
+ * [IO](#io)
+ * [ML](#ml)
<!-- TOC -->
## Prerequistes
+
Build this jar for running with the run command in the next stage:
+
```
cd <path_to_beam_repo>/beam; ./gradlew
sdks:java:io:google-cloud-platform:expansion-service:shadowJar
```
## Example Run
+
This module contains a series of Beam YAML code samples that can be run using
the command:
+
```
-python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
+python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/example.yaml
```
Depending on the yaml pipeline, the output may be emitted to standard output or
a file located in the execution folder used.
## Wordcount
+
A good starting place is the [Wordcount](wordcount_minimal.yaml) example under
the root example directory.
This example reads in a text file, splits the text on each word, groups by each
@@ -55,8 +61,10 @@ word, and counts the occurrence of each word. This is a
classic example used in
the other SDK's and shows off many of the functionalities of Beam YAML.
## Testing
+
A test file is located in the testing folder that will execute all the example
yamls and confirm the expected results.
+
```
pytest -v testing/
@@ -71,25 +79,160 @@ Examples in this directory show off the various built-in
transforms of the Beam
YAML framework.
### Aggregation
+
These examples leverage the built-in `Combine` transform for performing simple
aggregations including sum, mean, count, etc.
### Blueprints
+
These examples leverage DF or other existing templates and convert them to yaml
blueprints.
### Element-wise
+
These examples leverage the built-in mapping transforms including
`MapToFields`,
`Filter` and `Explode`. More information can be found about mapping transforms
[here](https://beam.apache.org/documentation/sdks/yaml-udf/).
### IO
-These examples leverage the built-in `Spanner_Read` and `Spanner_Write`
-transform for performing simple reads and writes from a spanner DB.
+
+#### Spanner
+
+Examples [Spanner Read](transforms/io/spanner_read.yaml) and [Spanner Write](
+transforms/io/spanner_write.yaml) leverage the built-in `Spanner_Read` and
+`Spanner_Write` transforms for performing simple reads and writes from a
+Google Spanner database.
+
+#### Kafka
+
+Examples involving Kafka such as [Kafka Read Write](transforms/io/kafka.yaml)
+require users to set up a Kafka cluster that Dataflow runner executing the
+Beam pipeline has access to.
+Please note that `ReadFromKafka` transform has
+a [known issue](https://github.com/apache/beam/issues/22809) when
+using non-Dataflow portable runners where reading may get stuck in streaming
+pipelines. Hence using the Dataflow runner is recommended for examples that
+involve reading from Kafka in a streaming pipeline.
+
+See [here](https://kafka.apache.org/quickstart) for general instructions on
+setting up a Kafka cluster. An option is to use [Click to Deploy](
+https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?)
+to quickly launch a Kafka cluster on [GCE](
+https://cloud.google.com/products/compute?hl=en). [SASL/PLAIN](
+https://kafka.apache.org/documentation/#security_sasl_plain) authentication
+mechanism is configured for the brokers as part of the deployment. See
+also [here](
+https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/dataflow/flex-templates/kafka_to_bigquery)
+for an alternative step-by-step guide on setting up Kafka on GCE without the
+authentication mechanism.
+
+Let's assume one of the bootstrap servers is on VM instance `kafka-vm-0`
+with the internal IP address `123.45.67.89` and port `9092` that the bootstrap
+server is listening on. SASL/PLAIN `USERNAME` and `PASSWORD` can be viewed from
+the VM instance's metadata on the GCE console, or with gcloud CLI:
+
+```sh
+gcloud compute instances describe kafka-vm-0 \
+ --format='value[](metadata.items.kafka-user)'
+gcloud compute instances describe kafka-vm-0 \
+ --format='value[](metadata.items.kafka-password)'
+```
+
+Beam pipeline [Kafka Read Write](transforms/io/kafka.yaml) first writes data to
+the Kafka topic using the `WriteToKafka` transform and then reads that data
back
+using the `ReadFromKafka` transform. Run the pipeline:
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/tmp"
+export REGION="us-central1"
+export JOB_NAME="demo-kafka-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="1"
+
+python -m apache_beam.yaml.main \
+ --yaml_pipeline_file transforms/io/kafka.yaml \
+ --runner DataflowRunner \
+ --temp_location $TEMP_LOCATION \
+ --project $PROJECT \
+ --region $REGION \
+ --num_workers $NUM_WORKERS \
+ --job_name $JOB_NAME \
+ --jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
+ "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'
+```
+
+**_Optional_**: If Kafka cluster is set up with no SASL/PLAINTEXT
authentication
+configured for the brokers, there's no SASL/PLAIN `USERNAME` and `PASSWORD`
+needed. In the pipelines, omit the configurations `producer_config_updates` and
+`consumer_config` from the `WriteToKafka` and `ReadFromKafka` transforms.
+Run the commands above without specifying the username and password in
+`--jinja_variables` flag.
+
+#### Iceberg
+
+Beam pipelines [Iceberg Write](transforms/io/iceberg_write.yaml) and
+[Iceberg Read](transforms/io/iceberg_read.yaml) are examples of how to interact
+with Iceberg tables on GCS storage and with Hadoop catalog configured.
+
+To create a GCS bucket as our warehouse storage,
+see
[here](https://cloud.google.com/storage/docs/creating-buckets#command-line).
+To run the pipelines locally, an option is to create a service account key in
+order to access GCS (see
+[here](https://cloud.google.com/iam/docs/keys-create-delete#creating)).
+Within the pipelines, specify GCS bucket name and the path to the saved service
+account key .json file.
+
+**_Note_**: With Hadoop catalog, Iceberg will use Hadoop connector for GCS.
+See
[here](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md)
+for full list of configuration options for Hadoop catalog when use with GCS.
+
+To create and write to Iceberg tables on GCS, run:
+
+```sh
+python -m apache_beam.yaml.main \
+ --yaml_pipeline_file transforms/io/iceberg_write.yaml
+```
+
+The pipeline uses [Dynamic destinations](
+https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
+write to dynamically create and select a table destination based on field
+values in the incoming records.
+
+To read from a created Iceberg table on GCS, run:
+
+```sh
+python -m apache_beam.yaml.main \
+ --yaml_pipeline_file transforms/io/iceberg_read.yaml
+```
+
+**_Optional_**: To run the pipeline on Dataflow, service account key is
+[not needed](
+https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/INSTALL.md).
+Omit the authentication settings in the Hadoop catalog configuration `
+config_properties`, and run:
+
+```sh
+export REGION="us-central1"
+export JOB_NAME="demo-iceberg_write-`date +%Y%m%d-%H%M%S`"
+
+gcloud dataflow yaml run $JOB_NAME \
+ --yaml-pipeline-file transforms/io/iceberg_write.yaml \
+ --region $REGION
+```
+
+```sh
+export REGION="us-central1"
+export JOB_NAME="demo-iceberg_read-`date +%Y%m%d-%H%M%S`"
+
+gcloud dataflow yaml run $JOB_NAME \
+ --yaml-pipeline-file transforms/io/iceberg_read.yaml \
+ --region $REGION
+```
### ML
+
These examples leverage the built-in `Enrichment` transform for performing
ML enrichments.
More information can be found about aggregation transforms
-[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
\ No newline at end of file
+[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index 6fef278039e..ad753961512 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -58,7 +58,7 @@ def test_enrichment(
This PTransform simulates the behavior of the Enrichment transform by
looking up data from predefined in-memory tables based on the provided
- `enrichment_handler` and `handler_config`.
+ `enrichment_handler` and `handler_config`.
Note: The Github action that invokes these tests does not have gcp
dependencies installed which is a prerequisite to
@@ -111,10 +111,25 @@ def test_enrichment(
return pcoll | beam.Map(_fn)
[email protected]_fn
+def test_kafka_read(
+ pcoll,
+ format,
+ topic,
+ bootstrap_servers,
+ auto_offset_reset_config,
+ consumer_config):
+ return (
+ pcoll | beam.Create(input_data.text_data().split('\n'))
+ | beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))
+
+
TEST_PROVIDERS = {
- 'TestEnrichment': test_enrichment,
+ 'TestEnrichment': test_enrichment, 'TestReadFromKafka': test_kafka_read
}
+INPUT_TRANSFORM_TEST_PROVIDERS = ['TestReadFromKafka']
+
def check_output(expected: List[str]):
"""
@@ -184,7 +199,11 @@ def create_test_method(
actual = [
yaml_transform.expand_pipeline(
p,
- pipeline_spec, [yaml_provider.InlineProvider(TEST_PROVIDERS)])
+ pipeline_spec,
+ [
+ yaml_provider.InlineProvider(
+ TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS)
+ ])
]
if not actual[0]:
actual = list(p.transforms_stack[0].parts[-1].outputs.values())
@@ -373,9 +392,30 @@ def _wordcount_test_preprocessor(
env.input_file('kinglear.txt', '\n'.join(lines)))
[email protected]_test_preprocessor('test_kafka_yaml')
+def _kafka_test_preprocessor(
+ test_spec: dict, expected: List[str], env: TestEnvironment):
+
+ test_spec = replace_recursive(
+ test_spec,
+ 'ReadFromText',
+ 'path',
+ env.input_file('kinglear.txt', input_data.text_data()))
+
+ if pipeline := test_spec.get('pipeline', None):
+ for transform in pipeline.get('transforms', []):
+ if transform.get('type', '') == 'ReadFromKafka':
+ transform['type'] = 'TestReadFromKafka'
+
+ return test_spec
+
+
@YamlExamplesTestSuite.register_test_preprocessor([
'test_simple_filter_yaml',
'test_simple_filter_and_combine_yaml',
+ 'test_iceberg_read_yaml',
+ 'test_iceberg_write_yaml',
+ 'test_kafka_yaml',
'test_spanner_read_yaml',
'test_spanner_write_yaml',
'test_enrich_spanner_with_bigquery_yaml'
@@ -417,9 +457,10 @@ def _io_write_test_preprocessor(
def _file_io_read_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
- This preprocessor replaces any ReadFrom transform with a Create transform
- that reads from a predefined in-memory dictionary. This allows the test
- to verify the pipeline's correctness without relying on external files.
+ This preprocessor replaces any file IO ReadFrom transform with a Create
+ transform that reads from a predefined in-memory dictionary. This allows
+ the test to verify the pipeline's correctness without relying on external
+ files.
Args:
test_spec: The dictionary representation of the YAML pipeline
specification.
@@ -445,6 +486,47 @@ def _file_io_read_test_preprocessor(
return test_spec
[email protected]_test_preprocessor(['test_iceberg_read_yaml'])
+def _iceberg_io_read_test_preprocessor(
+ test_spec: dict, expected: List[str], env: TestEnvironment):
+ """
+ Preprocessor for tests that involve reading from Iceberg.
+
+ This preprocessor replaces any ReadFromIceberg transform with a Create
+ transform that reads from a predefined in-memory dictionary. This allows
+ the test to verify the pipeline's correctness without relying on Iceberg
+ tables stored externally.
+
+ Args:
+ test_spec: The dictionary representation of the YAML pipeline
specification.
+ expected: A list of strings representing the expected output of the
+ pipeline.
+ env: The TestEnvironment object providing utilities for creating temporary
+ files.
+
+ Returns:
+ The modified test_spec dictionary with ReadFromIceberg transforms replaced.
+ """
+ if pipeline := test_spec.get('pipeline', None):
+ for transform in pipeline.get('transforms', []):
+ if transform.get('type', '') == 'ReadFromIceberg':
+ config = transform['config']
+ (db_name, table_name,
+ field_value_dynamic_destinations) = config['table'].split('.')
+
+ transform['type'] = 'Create'
+ transform['config'] = {
+ k: v
+ for k, v in config.items() if k.startswith('__')
+ }
+ transform['config']['elements'] = INPUT_TABLES[(
+ str(db_name),
+ str(table_name),
+ str(field_value_dynamic_destinations))]
+
+ return test_spec
+
+
@YamlExamplesTestSuite.register_test_preprocessor(
['test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml'])
def _spanner_io_read_test_preprocessor(
@@ -531,6 +613,7 @@ INPUT_TABLES = {
spanner_shipments_data(),
('orders-test', 'order-database', 'orders'): input_data.
spanner_orders_data(),
+ ('db', 'users', 'NY'):
input_data.iceberg_dynamic_destinations_users_data(),
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
bigtable_data(),
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data()
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 9c915785dd1..6661a29873b 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -22,6 +22,27 @@ needed.
"""
+def text_data():
+ return '\n'.join([
+ "Fool\tThou shouldst not have been old till thou hadst",
+ "\tbeen wise.",
+ "KING LEAR\tNothing will come of nothing: speak again.",
+ "\tNever, never, never, never, never!"
+ ])
+
+
+def iceberg_dynamic_destinations_users_data():
+ return [{
+ 'id': 3, 'name': 'Smith', 'email': '[email protected]', 'zip': 'NY'
+ },
+ {
+ 'id': 4,
+ 'name': 'Beamberg',
+ 'email': '[email protected]',
+ 'zip': 'NY'
+ }]
+
+
def products_csv():
return '\n'.join([
'transaction_id,product_name,category,price',
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_read.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_read.yaml
new file mode 100644
index 00000000000..1ad291ddd10
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_read.yaml
@@ -0,0 +1,54 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads from Iceberg table 'db.users.NY' on GCS with Hadoop
catalog
+# configured. The table, if not exists already, can be created and populated
+# using the iceberg_write.yaml pipeline.
+#
+# Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name.
+# If this example is run locally then replace
'/path/to/service/account/key.json'
+# with the correct path to your service account key .json file on your machine.
+# Otherwise, if Dataflow runner is used then omit the 'config_properties'
field.
+
+pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromIceberg
+ name: ReadFromAnIcebergTable
+ config:
+ table: "db.users.NY"
+ catalog_name: "hadoop_catalog"
+ catalog_properties:
+ type: "hadoop"
+ warehouse: "gs://MY-WAREHOUSE"
+ # Hadoop catalog config required to run pipeline locally
+ # Omit if running on Dataflow
+ config_properties:
+ "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
+ "fs.gs.auth.service.account.json.keyfile":
"/path/to/service/account/key.json"
+
+ - type: LogForTesting
+
+ - type: WriteToCsv
+ name: OutputToCSVFile
+ config:
+ path: "gs://MY-WAREHOUSE/my-csv.csv"
+
+# Expected:
+# Row(id=3, name='Smith', email='[email protected]', zip='NY')
+# Row(id=4, name='Beamberg', email='[email protected]', zip='NY')
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_write.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_write.yaml
new file mode 100644
index 00000000000..ac1bdbbeeca
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_write.yaml
@@ -0,0 +1,62 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline uses Dynamic destinations (see
+#
https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
+# to dynamically create and select a table destination based on field values in
+# the incoming records.
+#
+# Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name.
+# If this example is run locally then replace
'/path/to/service/account/key.json'
+# with the correct path to your service account key .json file on your machine.
+# Otherwise, if Dataflow runner is used then omit the 'config_properties'
field.
+
+pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ name: CreateSampleData
+ config:
+ elements:
+ - { id: 1, name: "John", email: "[email protected]", zip: "WA" }
+ - { id: 2, name: "Jane", email: "[email protected]", zip: "CA" }
+ - { id: 3, name: "Smith", email: "[email protected]",zip: "NY"}
+ - { id: 4, name: "Beamberg", email: "[email protected]", zip:
"NY" }
+
+ - type: LogForTesting
+
+ - type: WriteToIceberg
+ name: WriteToAnIcebergTable
+ config:
+ # Dynamic destinations
+ table: "db.users.{zip}"
+ catalog_name: "hadoop_catalog"
+ catalog_properties:
+ type: "hadoop"
+ warehouse: "gs://MY-WAREHOUSE"
+ # Hadoop catalog config required to run pipeline locally
+ # Omit if running on Dataflow
+ config_properties:
+ "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
+ "fs.gs.auth.service.account.json.keyfile":
"/path/to/service/account/key.json"
+
+# Expected:
+# Row(id=1, name='John', email='[email protected]', zip='WA')
+# Row(id=2, name='Jane', email='[email protected]', zip='CA')
+# Row(id=3, name='Smith', email='[email protected]', zip='NY')
+# Row(id=4, name='Beamberg', email='[email protected]', zip='NY')
diff --git a/sdks/python/apache_beam/yaml/examples/transforms/io/kafka.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/io/kafka.yaml
new file mode 100644
index 00000000000..7ab7b5ab0af
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/io/kafka.yaml
@@ -0,0 +1,96 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A pipeline that both writes to and reads from the same Kafka topic.
+
+pipeline:
+ transforms:
+ - type: ReadFromText
+ name: ReadFromGCS
+ config:
+ path: gs://dataflow-samples/shakespeare/kinglear.txt
+
+ - type: MapToFields
+ name: BuildKafkaRecords
+ input: ReadFromGCS
+ config:
+ language: python
+ fields:
+ value:
+ callable: |
+ def func(row):
+ return row.line.encode('utf-8')
+ output_type: bytes
+
+ - type: WriteToKafka
+ name: SendRecordsToKafka
+ input: BuildKafkaRecords
+ config:
+ format: "RAW"
+ topic: "{{ TOPIC }}"
+ bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+ producer_config_updates:
+ sasl.jaas.config:
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+ username={{ USERNAME }} \
+ password={{ PASSWORD }};"
+ security.protocol: "SASL_PLAINTEXT"
+ sasl.mechanism: "PLAIN"
+
+ - type: ReadFromKafka
+ name: ReadFromMyTopic
+ config:
+ format: "RAW"
+ topic: "{{ TOPIC }}"
+ bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+ auto_offset_reset_config: earliest
+ consumer_config:
+ sasl.jaas.config:
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+ username={{ USERNAME }} \
+ password={{ PASSWORD }};"
+ security.protocol: "SASL_PLAINTEXT"
+ sasl.mechanism: "PLAIN"
+
+ - type: MapToFields
+ name: ParseKafkaRecords
+ input: ReadFromMyTopic
+ config:
+ language: python
+ fields:
+ text:
+ callable: |
+ def func(row):
+ # Kafka RAW format reads messages as bytes
+ # in the 'payload' field of a Row
+ return row.payload.decode('utf-8')
+
+ - type: LogForTesting
+ input: ParseKafkaRecords
+
+# Since the pipeline both writes to and reads from a Kafka topic, we expect
+# the first pipeline component to write the rows containing the `value`
+# field as bytes to Kafka, and the second pipeline component to read the byte
+# messages from Kafka before parsing them as string in the new `text` field.
+# Expected:
+# Row(value=b'Fool\tThou shouldst not have been old till thou hadst')
+# Row(value=b'\tbeen wise.')
+# Row(value=b'KING LEAR\tNothing will come of nothing: speak again.')
+# Row(value=b'\tNever, never, never, never, never!')
+# Row(text='Fool\tThou shouldst not have been old till thou hadst')
+# Row(text='\tbeen wise.')
+# Row(text='KING LEAR\tNothing will come of nothing: speak again.')
+# Row(text='\tNever, never, never, never, never!')