This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dab5f5d7905 YAML example suite update (#35960)
dab5f5d7905 is described below

commit dab5f5d7905b04dc488486933939b8d535fc3235
Author: Charles Nguyen <[email protected]>
AuthorDate: Fri Aug 29 18:58:26 2025 -0400

    YAML example suite update (#35960)
    
    * YAML example suite update
    
    * Update README and notebook
    
    * Update README
---
 examples/yaml/README.md                            | 54 ++++++++++++++++++++++
 .../transforms/ml/sentiment_analysis/README.md     | 21 ++++++---
 .../streaming_sentiment_analysis.yaml              | 29 ++++++------
 .../examples/transforms/ml/taxi_fare/README.md     | 21 ++++++---
 .../custom_nyc_taxifare_model_deployment.ipynb     |  2 +-
 .../taxi_fare/streaming_taxifare_prediction.yaml   |  2 +-
 6 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/examples/yaml/README.md b/examples/yaml/README.md
new file mode 100644
index 00000000000..121b0b03bcb
--- /dev/null
+++ b/examples/yaml/README.md
@@ -0,0 +1,54 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+## Example YAML Pipelines
+
+A suite of YAML pipeline examples is currently located under the directory
+[sdks/python/apache_beam/yaml/examples](../../sdks/python/apache_beam/yaml/examples).
+
+### 
[Aggregation](../../sdks/python/apache_beam/yaml/examples/transforms/aggregation)
+
+These examples leverage the built-in `Combine` transform for performing simple
+aggregations including sum, mean, count, etc.
+
+### 
[Blueprints](../../sdks/python/apache_beam/yaml/examples/transforms/blueprint)
+
+These examples leverage DF or other existing templates and convert them to yaml
+blueprints.
+
+### 
[Element-wise](../../sdks/python/apache_beam/yaml/examples/transforms/elementwise)
+
+These examples leverage the built-in mapping transforms including 
`MapToFields`,
+`Filter` and `Explode`.
+
+### [IO](../../sdks/python/apache_beam/yaml/examples/transforms/io)
+
+These examples leverage the built-in IO transforms to read from and write to
+various sources and sinks, including Iceberg, Kafka and Spanner.
+
+### [Jinja](../../sdks/python/apache_beam/yaml/examples/transforms/jinja)
+
+These examples use Jinja 
[templatization](https://beam.apache.org/documentation/sdks/yaml/#jinja-templatization)
+to build off of different contexts and/or with different
+configurations.
+
+### [ML](../../sdks/python/apache_beam/yaml/examples/transforms/ml)
+
+These examples include built-in ML-specific transforms such as `RunInference`,
+`MLTransform` and `Enrichment`.
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
index ad44d433017..e86075fb133 100644
--- 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
@@ -30,7 +30,8 @@ 
https://www.kaggle.com/datasets/datasnaek/youtube?select=UScomments.csv).
 
 Download the dataset and copy over to a GCS bucket:
 ```sh
-gcloud storage cp /path/to/UScomments.csv gs://YOUR_BUCKET/UScomments.csv
+export GCS_PATH="gs://YOUR-BUCKET/USComments.csv"
+gcloud storage cp /path/to/UScomments.csv $GCS_PATH
 ```
 
 For setting up Kafka, an option is to use [Click to Deploy](
@@ -54,6 +55,7 @@ a BigQuery dataset:
 ```sh
 bq --location=us-central1 mk \
   --dataset DATASET_ID
+export BQ_TABLE_ID="PROJECT_ID:DATASET_ID.TABLE_ID"
 ```
 See also [here](
 https://cloud.google.com/bigquery/docs/datasets) for more details on
@@ -76,18 +78,23 @@ export TEMP_LOCATION="gs://YOUR-BUCKET/tmp"
 export REGION="us-central1"
 export JOB_NAME="streaming-sentiment-analysis-`date +%Y%m%d-%H%M%S`"
 export NUM_WORKERS="3"
+export KAFKA_BOOTSTRAP_SERVERS="BOOTSTRAP_IP_ADD:9092"
+export KAFKA_TOPIC="YOUR_TOPIC"
+export KAFKA_USERNAME="KAFKA_USERNAME"
+export KAFKA_PASSWORD="KAFKA_PASSWORD"
+export VERTEXAI_ENDPOINT="ENDPOINT_ID"
 
 python -m apache_beam.yaml.main \
-  --yaml_pipeline_file 
transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml \
+  --yaml_pipeline_file streaming_sentiment_analysis.yaml \
   --runner DataflowRunner \
   --temp_location $TEMP_LOCATION \
   --project $PROJECT \
   --region $REGION \
   --num_workers $NUM_WORKERS \
   --job_name $JOB_NAME \
-  --jinja_variables '{ "GCS_PATH": "gs://YOUR-BUCKET/USComments.csv",
-  "BOOTSTRAP_SERVERS": "BOOTSTRAP_IP_ADD:9092",
-  "TOPIC": "YOUR_TOPIC", "USERNAME": "KAFKA_USERNAME", "PASSWORD": 
"KAFKA_PASSWORD",
-  "ENDPOINT": "ENDPOINT_ID", "PROJECT": "PROJECT_ID", "LOCATION": "LOCATION",
-  "DATASET": "DATASET_ID", "TABLE": "TABLE_ID" }'
+  --jinja_variables '{ "GCS_PATH": "'$GCS_PATH'",
+  "BOOTSTRAP_SERVERS": "'$KAFKA_BOOTSTRAP_SERVERS'",
+  "TOPIC": "'$KAFKA_TOPIC'", "USERNAME": "'$KAFKA_USERNAME'", "PASSWORD": 
"'$KAFKA_PASSWORD'",
+  "ENDPOINT": "'$VERTEXAI_ENDPOINT'", "PROJECT": "'$PROJECT'", "LOCATION": 
"'$REGION'",
+  "BQ_TABLE": "'$BQ_TABLE_ID'" }'
 ```
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
index 63521208daa..10790998a9e 100644
--- 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
@@ -44,19 +44,19 @@ pipeline:
                   return int(x)
                 except (ValueError):
                   return None
-            
+
               return (
-                  pcoll 
+                  pcoll
                   | beam.io.ReadFromCsv(
                         file_pattern,
                         names=['video_id', 'comment_text', 'likes', 'replies'],
                         on_bad_lines='skip',
                         converters={'likes': _to_int, 'replies': _to_int})
-                  | beam.Filter(lambda row: 
+                  | beam.Filter(lambda row:
                         None not in list(row._asdict().values()))
                   | beam.Map(lambda row: beam.Row(
-                        video_id=row.video_id,
-                        comment_text=row.comment_text,
+                        video_id=str(row.video_id),
+                        comment_text=str(row.comment_text),
                         likes=int(row.likes),
                         replies=int(row.replies)))
               )
@@ -154,17 +154,16 @@ pipeline:
           comment_text:
             callable: |
               from transformers import AutoTokenizer
-              
+
               tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", 
use_fast=True)
-              
-              def truncate_sentence(row):
-                tokens = tokenizer.tokenize(row.comment_text)
-                if len(tokens) >= 250:
-                  tokens = tokens[:250]
-                  truncated_sentence = 
tokenizer.convert_tokens_to_string(tokens)
-                else:
-                  truncated_sentence = row.comment_text
 
+              def truncate_sentence(row):
+                tokens = tokenizer(
+                    row.comment_text,
+                    max_length=512,
+                    padding='max_length',
+                    truncation=True)
+                truncated_sentence = tokenizer.decode(tokens["input_ids"], 
skip_special_tokens=True)
                 return truncated_sentence
           likes: likes
           replies: replies
@@ -242,7 +241,7 @@ pipeline:
       name: WriteInferenceResultsToBQ
       input: Windowing
       config:
-        table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
+        table: "{{ BQ_TABLE }}"
         create_disposition: CREATE_IF_NEEDED
         write_disposition: WRITE_APPEND
 
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md
index be2b48ffda3..75aa9545601 100644
--- a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md
+++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md
@@ -42,6 +42,7 @@ a BigQuery dataset:
 ```sh
 bq --location=us-central1 mk \
   --dataset DATASET_ID
+export BQ_TABLE_ID="PROJECT_ID:DATASET_ID.TABLE_ID"
 ```
 See also [here](
 https://cloud.google.com/bigquery/docs/datasets) for more details on
@@ -49,8 +50,9 @@ how to create BigQuery datasets.
 
 A trained model hosted on Vertex AI is needed before being able to use
 the Vertex AI model handler. To train and deploy a custom model for the
-taxi fare prediction problem, open and run this [notebook](
-custom_nyc_taxifare_model_deployment.ipynb) in Colab Enterprise.
+taxi fare prediction problem, open and run the
+[custom_nyc_taxifare_model_deployment](
+custom_nyc_taxifare_model_deployment.ipynb) notebook in Colab Enterprise.
 
 The pipeline first reads the data stream of taxi rides events from the
 public PubSub topic and performs some transformations before writing it
@@ -68,17 +70,22 @@ export TEMP_LOCATION="gs://YOUR-BUCKET/tmp"
 export REGION="us-central1"
 export JOB_NAME="streaming-taxifare-prediction`date +%Y%m%d-%H%M%S`"
 export NUM_WORKERS="3"
+export KAFKA_BOOTSTRAP_SERVERS="BOOTSTRAP_IP_ADD:9092"
+export KAFKA_TOPIC="YOUR_TOPIC"
+export KAFKA_USERNAME="KAFKA_USERNAME"
+export KAFKA_PASSWORD="KAFKA_PASSWORD"
+export VERTEXAI_ENDPOINT="ENDPOINT_ID"
 
 python -m apache_beam.yaml.main \
-  --yaml_pipeline_file 
transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml \
+  --yaml_pipeline_file streaming_taxifare_prediction.yaml \
   --runner DataflowRunner \
   --temp_location $TEMP_LOCATION \
   --project $PROJECT \
   --region $REGION \
   --num_workers $NUM_WORKERS \
   --job_name $JOB_NAME \
-  --jinja_variables '{ "BOOTSTRAP_SERVERS": "BOOTSTRAP_IP_ADD:9092",
-  "TOPIC": "YOUR_TOPIC", "USERNAME": "KAFKA_USERNAME", "PASSWORD": 
"KAFKA_PASSWORD",
-  "ENDPOINT": "ENDPOINT_ID", "PROJECT": "PROJECT_ID", "LOCATION": "LOCATION",
-  "DATASET": "DATASET_ID", "TABLE": "TABLE_ID" }'
+  --jinja_variables '{ "BOOTSTRAP_SERVERS": "'$KAFKA_BOOTSTRAP_SERVERS'",
+  "TOPIC": "'$KAFKA_TOPIC'", "USERNAME": "'$KAFKA_USERNAME'", "PASSWORD": 
"'$KAFKA_PASSWORD'",
+  "ENDPOINT": "'$VERTEXAI_ENDPOINT'", "PROJECT": "'$PROJECT'", "LOCATION": 
"'$REGION'",
+  "BQ_TABLE": "'$BQ_TABLE_ID'" }'
 ```
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
index eaabc73d5b0..1201c1011d4 100644
--- 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
@@ -36,7 +36,7 @@
     "\n",
     "<table><tbody><tr>\n",
     "  <td style=\"text-align: center\">\n",
-    "    <a 
href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fpython%2Fapache_beam%2Fyaml%2Fexamples%2Ftransforms%2Fml%2Ftaxi_fare%2Fcustom_nyc_taxifare_model_deployment.ipynb\";>\n",
+    "    <a 
href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fapache%2Fbeam%2Frefs%2Fheads%2Fmaster%2Fsdks%2Fpython%2Fapache_beam%2Fyaml%2Fexamples%2Ftransforms%2Fml%2Ftaxi_fare%2Fcustom_nyc_taxifare_model_deployment.ipynb\";>\n",
     "      <img alt=\"Google Cloud Colab Enterprise logo\" 
src=\"https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN\";
 width=\"32px\"><br> Run in Colab Enterprise\n",
     "    </a>\n",
     "  </td>\n",
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
index 2f3cfa56027..c213cb0ab94 100644
--- 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
@@ -308,7 +308,7 @@ pipeline:
       name: WritePredictionsBQ
       input: FormatInferenceOutput
       config:
-        table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
+        table: "{{ BQ_TABLE }}"
         create_disposition: "CREATE_IF_NEEDED"
         write_disposition: "WRITE_APPEND"
 

Reply via email to