This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b235f8b2a6 Add more YAML examples involving Kafka and Iceberg (#35151)
7b235f8b2a6 is described below

commit 7b235f8b2a6998b9b317f4f00e50d3a01424959b
Author: Charles Nguyen <[email protected]>
AuthorDate: Mon Jun 9 18:13:35 2025 -0400

    Add more YAML examples involving Kafka and Iceberg (#35151)
    
    * Add more YAML examples involving Kafka and Iceberg
    
    * Fix some missed details from rebasing
    
    * Adding unit tests for YAML examples
    
    * Clean up and address PR comments
    
    * Formatting
    
    * Formatting
---
 sdks/python/apache_beam/yaml/examples/README.md    | 165 +++++++++++++++++++--
 .../yaml/examples/testing/examples_test.py         |  95 +++++++++++-
 .../yaml/examples/testing/input_data.py            |  21 +++
 .../yaml/examples/transforms/io/iceberg_read.yaml  |  54 +++++++
 .../yaml/examples/transforms/io/iceberg_write.yaml |  62 ++++++++
 .../yaml/examples/transforms/io/kafka.yaml         |  96 ++++++++++++
 6 files changed, 476 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/examples/README.md 
b/sdks/python/apache_beam/yaml/examples/README.md
index 009245f6f14..70533655a6a 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -20,34 +20,40 @@
 # Examples Catalog
 
 <!-- TOC -->
+
 * [Examples Catalog](#examples-catalog)
-  * [Wordcount](#wordcount)
-  * [Transforms](#transforms)
-    * [Aggregation](#aggregation)
-    * [Blueprints](#blueprints)
-    * [Element-wise](#element-wise)
-    * [IO](#io)
-    * [ML](#ml)
+    * [Wordcount](#wordcount)
+    * [Transforms](#transforms)
+        * [Aggregation](#aggregation)
+        * [Blueprints](#blueprints)
+        * [Element-wise](#element-wise)
+        * [IO](#io)
+        * [ML](#ml)
 
 <!-- TOC -->
 
 ## Prerequistes
+
 Build this jar for running with the run command in the next stage:
+
 ```
 cd <path_to_beam_repo>/beam; ./gradlew 
sdks:java:io:google-cloud-platform:expansion-service:shadowJar
 ```
 
 ## Example Run
+
 This module contains a series of Beam YAML code samples that can be run using
 the command:
+
 ```
-python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
+python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/example.yaml
 ```
 
 Depending on the yaml pipeline, the output may be emitted to standard output or
 a file located in the execution folder used.
 
 ## Wordcount
+
 A good starting place is the [Wordcount](wordcount_minimal.yaml) example under
 the root example directory.
 This example reads in a text file, splits the text on each word, groups by each
@@ -55,8 +61,10 @@ word, and counts the occurrence of each word. This is a 
classic example used in
 the other SDK's and shows off many of the functionalities of Beam YAML.
 
 ## Testing
+
 A test file is located in the testing folder that will execute all the example
 yamls and confirm the expected results.
+
 ```
 pytest -v testing/
 
@@ -71,25 +79,160 @@ Examples in this directory show off the various built-in 
transforms of the Beam
 YAML framework.
 
 ### Aggregation
+
 These examples leverage the built-in `Combine` transform for performing simple
 aggregations including sum, mean, count, etc.
 
 ### Blueprints
+
 These examples leverage DF or other existing templates and convert them to yaml
 blueprints.
 
 ### Element-wise
+
 These examples leverage the built-in mapping transforms including 
`MapToFields`,
 `Filter` and `Explode`. More information can be found about mapping transforms
 [here](https://beam.apache.org/documentation/sdks/yaml-udf/).
 
 ### IO
-These examples leverage the built-in `Spanner_Read` and `Spanner_Write`
-transform for performing simple reads and writes from a spanner DB.
+
+#### Spanner
+
+Examples [Spanner Read](transforms/io/spanner_read.yaml) and [Spanner Write](
+transforms/io/spanner_write.yaml) leverage the built-in `Spanner_Read` and
+`Spanner_Write` transforms for performing simple reads and writes from a
+Google Spanner database.
+
+#### Kafka
+
+Examples involving Kafka such as [Kafka Read Write](transforms/io/kafka.yaml)
+require users to set up a Kafka cluster that Dataflow runner executing the
+Beam pipeline has access to.
+Please note that `ReadFromKafka` transform has
+a [known issue](https://github.com/apache/beam/issues/22809) when
+using non-Dataflow portable runners where reading may get stuck in streaming
+pipelines. Hence using the Dataflow runner is recommended for examples that
+involve reading from Kafka in a streaming pipeline.
+
+See [here](https://kafka.apache.org/quickstart) for general instructions on
+setting up a Kafka cluster. An option is to use [Click to Deploy](
+https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?)
+to quickly launch a Kafka cluster on [GCE](
+https://cloud.google.com/products/compute?hl=en). [SASL/PLAIN](
+https://kafka.apache.org/documentation/#security_sasl_plain) authentication
+mechanism is configured for the brokers as part of the deployment. See
+also [here](
+https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/dataflow/flex-templates/kafka_to_bigquery)
+for an alternative step-by-step guide on setting up Kafka on GCE without the
+authentication mechanism.
+
+Let's assume one of the bootstrap servers is on VM instance `kafka-vm-0`
+with the internal IP address `123.45.67.89` and port `9092` that the bootstrap
+server is listening on. SASL/PLAIN `USERNAME` and `PASSWORD` can be viewed from
+the VM instance's metadata on the GCE console, or with gcloud CLI:
+
+```sh
+gcloud compute instances describe kafka-vm-0 \
+  --format='value[](metadata.items.kafka-user)'
+gcloud compute instances describe kafka-vm-0 \
+  --format='value[](metadata.items.kafka-password)'
+```
+
+Beam pipeline [Kafka Read Write](transforms/io/kafka.yaml) first writes data to
+the Kafka topic using the `WriteToKafka` transform and then reads that data 
back
+using the `ReadFromKafka` transform. Run the pipeline:
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/tmp"
+export REGION="us-central1"
+export JOB_NAME="demo-kafka-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="1"
+
+python -m apache_beam.yaml.main \
+  --yaml_pipeline_file transforms/io/kafka.yaml \
+  --runner DataflowRunner \
+  --temp_location $TEMP_LOCATION \
+  --project $PROJECT \
+  --region $REGION \
+  --num_workers $NUM_WORKERS \
+  --job_name $JOB_NAME \
+  --jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
+    "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'
+```
+
+**_Optional_**: If Kafka cluster is set up with no SASL/PLAINTEXT 
authentication
+configured for the brokers, there's no SASL/PLAIN `USERNAME` and `PASSWORD`
+needed. In the pipelines, omit the configurations `producer_config_updates` and
+`consumer_config` from the `WriteToKafka` and `ReadFromKafka` transforms.
+Run the commands above without specifying the username and password in
+`--jinja_variables` flag.
+
+#### Iceberg
+
+Beam pipelines [Iceberg Write](transforms/io/iceberg_write.yaml) and
+[Iceberg Read](transforms/io/iceberg_read.yaml) are examples of how to interact
+with Iceberg tables on GCS storage and with Hadoop catalog configured.
+
+To create a GCS bucket as our warehouse storage,
+see 
[here](https://cloud.google.com/storage/docs/creating-buckets#command-line).
+To run the pipelines locally, an option is to create a service account key in
+order to access GCS (see
+[here](https://cloud.google.com/iam/docs/keys-create-delete#creating)).
+Within the pipelines, specify GCS bucket name and the path to the saved service
+account key .json file.
+
+**_Note_**: With Hadoop catalog, Iceberg will use Hadoop connector for GCS.
+See 
[here](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md)
+for full list of configuration options for Hadoop catalog when use with GCS.
+
+To create and write to Iceberg tables on GCS, run:
+
+```sh
+python -m apache_beam.yaml.main \
+  --yaml_pipeline_file transforms/io/iceberg_write.yaml
+```
+
+The pipeline uses [Dynamic destinations](
+https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
+write to dynamically create and select a table destination based on field
+values in the incoming records.
+
+To read from a created Iceberg table on GCS, run:
+
+```sh
+python -m apache_beam.yaml.main \
+  --yaml_pipeline_file transforms/io/iceberg_read.yaml
+```
+
+**_Optional_**: To run the pipeline on Dataflow, service account key is
+[not needed](
+https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/INSTALL.md).
+Omit the authentication settings in the Hadoop catalog configuration `
+config_properties`, and run:
+
+```sh
+export REGION="us-central1"
+export JOB_NAME="demo-iceberg_write-`date +%Y%m%d-%H%M%S`"
+
+gcloud dataflow yaml run $JOB_NAME \
+  --yaml-pipeline-file transforms/io/iceberg_write.yaml \
+  --region $REGION
+```
+
+```sh
+export REGION="us-central1"
+export JOB_NAME="demo-iceberg_read-`date +%Y%m%d-%H%M%S`"
+
+gcloud dataflow yaml run $JOB_NAME \
+  --yaml-pipeline-file transforms/io/iceberg_read.yaml \
+  --region $REGION
+```
 
 ### ML
+
 These examples leverage the built-in `Enrichment` transform for performing
 ML enrichments.
 
 More information can be found about aggregation transforms
-[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
\ No newline at end of file
+[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py 
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index 6fef278039e..ad753961512 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -58,7 +58,7 @@ def test_enrichment(
 
   This PTransform simulates the behavior of the Enrichment transform by
   looking up data from predefined in-memory tables based on the provided
-  `enrichment_handler` and `handler_config`.  
+  `enrichment_handler` and `handler_config`.
 
   Note: The Github action that invokes these tests does not have gcp
   dependencies installed which is a prerequisite to
@@ -111,10 +111,25 @@ def test_enrichment(
   return pcoll | beam.Map(_fn)
 
 
[email protected]_fn
+def test_kafka_read(
+    pcoll,
+    format,
+    topic,
+    bootstrap_servers,
+    auto_offset_reset_config,
+    consumer_config):
+  return (
+      pcoll | beam.Create(input_data.text_data().split('\n'))
+      | beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))
+
+
 TEST_PROVIDERS = {
-    'TestEnrichment': test_enrichment,
+    'TestEnrichment': test_enrichment, 'TestReadFromKafka': test_kafka_read
 }
 
+INPUT_TRANSFORM_TEST_PROVIDERS = ['TestReadFromKafka']
+
 
 def check_output(expected: List[str]):
   """
@@ -184,7 +199,11 @@ def create_test_method(
         actual = [
             yaml_transform.expand_pipeline(
                 p,
-                pipeline_spec, [yaml_provider.InlineProvider(TEST_PROVIDERS)])
+                pipeline_spec,
+                [
+                    yaml_provider.InlineProvider(
+                        TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS)
+                ])
         ]
         if not actual[0]:
           actual = list(p.transforms_stack[0].parts[-1].outputs.values())
@@ -373,9 +392,30 @@ def _wordcount_test_preprocessor(
       env.input_file('kinglear.txt', '\n'.join(lines)))
 
 
[email protected]_test_preprocessor('test_kafka_yaml')
+def _kafka_test_preprocessor(
+    test_spec: dict, expected: List[str], env: TestEnvironment):
+
+  test_spec = replace_recursive(
+      test_spec,
+      'ReadFromText',
+      'path',
+      env.input_file('kinglear.txt', input_data.text_data()))
+
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'ReadFromKafka':
+        transform['type'] = 'TestReadFromKafka'
+
+  return test_spec
+
+
 @YamlExamplesTestSuite.register_test_preprocessor([
     'test_simple_filter_yaml',
     'test_simple_filter_and_combine_yaml',
+    'test_iceberg_read_yaml',
+    'test_iceberg_write_yaml',
+    'test_kafka_yaml',
     'test_spanner_read_yaml',
     'test_spanner_write_yaml',
     'test_enrich_spanner_with_bigquery_yaml'
@@ -417,9 +457,10 @@ def _io_write_test_preprocessor(
 def _file_io_read_test_preprocessor(
     test_spec: dict, expected: List[str], env: TestEnvironment):
   """
-  This preprocessor replaces any ReadFrom transform with a Create transform
-  that reads from a predefined in-memory dictionary. This allows the test
-  to verify the pipeline's correctness without relying on external files.
+  This preprocessor replaces any file IO ReadFrom transform with a Create
+  transform that reads from a predefined in-memory dictionary. This allows
+  the test to verify the pipeline's correctness without relying on external
+  files.
 
   Args:
     test_spec: The dictionary representation of the YAML pipeline 
specification.
@@ -445,6 +486,47 @@ def _file_io_read_test_preprocessor(
   return test_spec
 
 
[email protected]_test_preprocessor(['test_iceberg_read_yaml'])
+def _iceberg_io_read_test_preprocessor(
+    test_spec: dict, expected: List[str], env: TestEnvironment):
+  """
+  Preprocessor for tests that involve reading from Iceberg.
+
+  This preprocessor replaces any ReadFromIceberg transform with a Create
+  transform that reads from a predefined in-memory dictionary. This allows
+  the test to verify the pipeline's correctness without relying on Iceberg
+  tables stored externally.
+
+  Args:
+    test_spec: The dictionary representation of the YAML pipeline 
specification.
+    expected: A list of strings representing the expected output of the
+      pipeline.
+    env: The TestEnvironment object providing utilities for creating temporary
+      files.
+
+  Returns:
+    The modified test_spec dictionary with ReadFromIceberg transforms replaced.
+  """
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'ReadFromIceberg':
+        config = transform['config']
+        (db_name, table_name,
+         field_value_dynamic_destinations) = config['table'].split('.')
+
+        transform['type'] = 'Create'
+        transform['config'] = {
+            k: v
+            for k, v in config.items() if k.startswith('__')
+        }
+        transform['config']['elements'] = INPUT_TABLES[(
+            str(db_name),
+            str(table_name),
+            str(field_value_dynamic_destinations))]
+
+  return test_spec
+
+
 @YamlExamplesTestSuite.register_test_preprocessor(
     ['test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml'])
 def _spanner_io_read_test_preprocessor(
@@ -531,6 +613,7 @@ INPUT_TABLES = {
     spanner_shipments_data(),
     ('orders-test', 'order-database', 'orders'): input_data.
     spanner_orders_data(),
+    ('db', 'users', 'NY'): 
input_data.iceberg_dynamic_destinations_users_data(),
     ('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
     bigtable_data(),
     ('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data()
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py 
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 9c915785dd1..6661a29873b 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -22,6 +22,27 @@ needed.
 """
 
 
+def text_data():
+  return '\n'.join([
+      "Fool\tThou shouldst not have been old till thou hadst",
+      "\tbeen wise.",
+      "KING LEAR\tNothing will come of nothing: speak again.",
+      "\tNever, never, never, never, never!"
+  ])
+
+
+def iceberg_dynamic_destinations_users_data():
+  return [{
+      'id': 3, 'name': 'Smith', 'email': '[email protected]', 'zip': 'NY'
+  },
+          {
+              'id': 4,
+              'name': 'Beamberg',
+              'email': '[email protected]',
+              'zip': 'NY'
+          }]
+
+
 def products_csv():
   return '\n'.join([
       'transaction_id,product_name,category,price',
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_read.yaml 
b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_read.yaml
new file mode 100644
index 00000000000..1ad291ddd10
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_read.yaml
@@ -0,0 +1,54 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads from Iceberg table 'db.users.NY' on GCS with Hadoop 
catalog
+# configured. The table, if not exists already, can be created and populated
+# using the iceberg_write.yaml pipeline.
+#
+# Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name.
+# If this example is run locally then replace 
'/path/to/service/account/key.json'
+# with the correct path to your service account key .json file on your machine.
+# Otherwise, if Dataflow runner is used then omit the 'config_properties' 
field.
+
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromIceberg
+      name: ReadFromAnIcebergTable
+      config:
+        table: "db.users.NY"
+        catalog_name: "hadoop_catalog"
+        catalog_properties:
+          type: "hadoop"
+          warehouse: "gs://MY-WAREHOUSE"
+        # Hadoop catalog config required to run pipeline locally
+        # Omit if running on Dataflow
+        config_properties:
+          "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
+          "fs.gs.auth.service.account.json.keyfile": 
"/path/to/service/account/key.json"
+
+    - type: LogForTesting
+
+    - type: WriteToCsv
+      name: OutputToCSVFile
+      config:
+        path: "gs://MY-WAREHOUSE/my-csv.csv"
+
+# Expected:
+#  Row(id=3, name='Smith', email='[email protected]', zip='NY')
+#  Row(id=4, name='Beamberg', email='[email protected]', zip='NY')
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_write.yaml 
b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_write.yaml
new file mode 100644
index 00000000000..ac1bdbbeeca
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/io/iceberg_write.yaml
@@ -0,0 +1,62 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline uses Dynamic destinations (see
+# 
https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
+# to dynamically create and select a table destination based on field values in
+# the incoming records.
+#
+# Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name.
+# If this example is run locally then replace 
'/path/to/service/account/key.json'
+# with the correct path to your service account key .json file on your machine.
+# Otherwise, if Dataflow runner is used then omit the 'config_properties' 
field.
+
+pipeline:
+  type: chain
+  transforms:
+    - type: Create
+      name: CreateSampleData
+      config:
+        elements:
+          - { id: 1, name: "John", email: "[email protected]", zip: "WA" }
+          - { id: 2, name: "Jane", email: "[email protected]", zip: "CA" }
+          - { id: 3, name: "Smith", email: "[email protected]",zip: "NY"}
+          - { id: 4, name: "Beamberg", email: "[email protected]", zip: 
"NY" }
+
+    - type: LogForTesting
+
+    - type: WriteToIceberg
+      name: WriteToAnIcebergTable
+      config:
+        # Dynamic destinations
+        table: "db.users.{zip}"
+        catalog_name: "hadoop_catalog"
+        catalog_properties:
+          type: "hadoop"
+          warehouse: "gs://MY-WAREHOUSE"
+        # Hadoop catalog config required to run pipeline locally
+        # Omit if running on Dataflow
+        config_properties:
+          "fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
+          "fs.gs.auth.service.account.json.keyfile": 
"/path/to/service/account/key.json"
+
+# Expected:
+#  Row(id=1, name='John', email='[email protected]', zip='WA')
+#  Row(id=2, name='Jane', email='[email protected]', zip='CA')
+#  Row(id=3, name='Smith', email='[email protected]', zip='NY')
+#  Row(id=4, name='Beamberg', email='[email protected]', zip='NY')
diff --git a/sdks/python/apache_beam/yaml/examples/transforms/io/kafka.yaml 
b/sdks/python/apache_beam/yaml/examples/transforms/io/kafka.yaml
new file mode 100644
index 00000000000..7ab7b5ab0af
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/io/kafka.yaml
@@ -0,0 +1,96 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A pipeline that both writes to and reads from the same Kafka topic.
+
+pipeline:
+  transforms:
+    - type: ReadFromText
+      name: ReadFromGCS
+      config:
+        path: gs://dataflow-samples/shakespeare/kinglear.txt
+
+    - type: MapToFields
+      name: BuildKafkaRecords
+      input: ReadFromGCS
+      config:
+        language: python
+        fields:
+          value:
+            callable: |
+              def func(row):
+                return row.line.encode('utf-8')
+            output_type: bytes
+
+    - type: WriteToKafka
+      name: SendRecordsToKafka
+      input: BuildKafkaRecords
+      config:
+        format: "RAW"
+        topic: "{{ TOPIC }}"
+        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+        producer_config_updates:
+          sasl.jaas.config: 
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+            username={{ USERNAME }} \
+            password={{ PASSWORD }};"
+          security.protocol: "SASL_PLAINTEXT"
+          sasl.mechanism: "PLAIN"
+
+    - type: ReadFromKafka
+      name: ReadFromMyTopic
+      config:
+        format: "RAW"
+        topic: "{{ TOPIC }}"
+        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+        auto_offset_reset_config: earliest
+        consumer_config:
+          sasl.jaas.config: 
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+            username={{ USERNAME }} \
+            password={{ PASSWORD }};"
+          security.protocol: "SASL_PLAINTEXT"
+          sasl.mechanism: "PLAIN"
+
+    - type: MapToFields
+      name: ParseKafkaRecords
+      input: ReadFromMyTopic
+      config:
+        language: python
+        fields:
+          text:
+            callable: |
+              def func(row):
+                # Kafka RAW format reads messages as bytes 
+                # in the 'payload' field of a Row
+                return row.payload.decode('utf-8')
+
+    - type: LogForTesting
+      input: ParseKafkaRecords
+
+# Since the pipeline both writes to and reads from a Kafka topic, we expect
+#   the first pipeline component to write the rows containing the `value`
+#   field as bytes to Kafka, and the second pipeline component to read the byte
+#   messages from Kafka before parsing them as string in the new `text` field.
+# Expected:
+#  Row(value=b'Fool\tThou shouldst not have been old till thou hadst')
+#  Row(value=b'\tbeen wise.')
+#  Row(value=b'KING LEAR\tNothing will come of nothing: speak again.')
+#  Row(value=b'\tNever, never, never, never, never!')
+#  Row(text='Fool\tThou shouldst not have been old till thou hadst')
+#  Row(text='\tbeen wise.')
+#  Row(text='KING LEAR\tNothing will come of nothing: speak again.')
+#  Row(text='\tNever, never, never, never, never!')

Reply via email to