This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dab5f5d7905 YAML example suite update (#35960)
dab5f5d7905 is described below
commit dab5f5d7905b04dc488486933939b8d535fc3235
Author: Charles Nguyen <[email protected]>
AuthorDate: Fri Aug 29 18:58:26 2025 -0400
YAML example suite update (#35960)
* YAML example suite update
* Update README and notebook
* Update README
---
examples/yaml/README.md | 54 ++++++++++++++++++++++
.../transforms/ml/sentiment_analysis/README.md | 21 ++++++---
.../streaming_sentiment_analysis.yaml | 29 ++++++------
.../examples/transforms/ml/taxi_fare/README.md | 21 ++++++---
.../custom_nyc_taxifare_model_deployment.ipynb | 2 +-
.../taxi_fare/streaming_taxifare_prediction.yaml | 2 +-
6 files changed, 98 insertions(+), 31 deletions(-)
diff --git a/examples/yaml/README.md b/examples/yaml/README.md
new file mode 100644
index 00000000000..121b0b03bcb
--- /dev/null
+++ b/examples/yaml/README.md
@@ -0,0 +1,54 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Example YAML Pipelines
+
+A suite of YAML pipeline examples is currently located under the directory
+[sdks/python/apache_beam/yaml/examples](../../sdks/python/apache_beam/yaml/examples).
+
+###
[Aggregation](../../sdks/python/apache_beam/yaml/examples/transforms/aggregation)
+
+These examples leverage the built-in `Combine` transform for performing simple
+aggregations including sum, mean, count, etc.
+
+###
[Blueprints](../../sdks/python/apache_beam/yaml/examples/transforms/blueprint)
+
+These examples leverage DF or other existing templates and convert them to yaml
+blueprints.
+
+###
[Element-wise](../../sdks/python/apache_beam/yaml/examples/transforms/elementwise)
+
+These examples leverage the built-in mapping transforms including
`MapToFields`,
+`Filter` and `Explode`.
+
+### [IO](../../sdks/python/apache_beam/yaml/examples/transforms/io)
+
+These examples leverage the built-in IO transforms to read from and write to
+various sources and sinks, including Iceberg, Kafka and Spanner.
+
+### [Jinja](../../sdks/python/apache_beam/yaml/examples/transforms/jinja)
+
+These examples use Jinja
[templatization](https://beam.apache.org/documentation/sdks/yaml/#jinja-templatization)
+to build off of different contexts and/or with different
+configurations.
+
+### [ML](../../sdks/python/apache_beam/yaml/examples/transforms/ml)
+
+These examples include built-in ML-specific transforms such as `RunInference`,
+`MLTransform` and `Enrichment`.
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
index ad44d433017..e86075fb133 100644
---
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
@@ -30,7 +30,8 @@
https://www.kaggle.com/datasets/datasnaek/youtube?select=UScomments.csv).
Download the dataset and copy over to a GCS bucket:
```sh
-gcloud storage cp /path/to/UScomments.csv gs://YOUR_BUCKET/UScomments.csv
+export GCS_PATH="gs://YOUR-BUCKET/USComments.csv"
+gcloud storage cp /path/to/UScomments.csv $GCS_PATH
```
For setting up Kafka, an option is to use [Click to Deploy](
@@ -54,6 +55,7 @@ a BigQuery dataset:
```sh
bq --location=us-central1 mk \
--dataset DATASET_ID
+export BQ_TABLE_ID="PROJECT_ID:DATASET_ID.TABLE_ID"
```
See also [here](
https://cloud.google.com/bigquery/docs/datasets) for more details on
@@ -76,18 +78,23 @@ export TEMP_LOCATION="gs://YOUR-BUCKET/tmp"
export REGION="us-central1"
export JOB_NAME="streaming-sentiment-analysis-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="3"
+export KAFKA_BOOTSTRAP_SERVERS="BOOTSTRAP_IP_ADD:9092"
+export KAFKA_TOPIC="YOUR_TOPIC"
+export KAFKA_USERNAME="KAFKA_USERNAME"
+export KAFKA_PASSWORD="KAFKA_PASSWORD"
+export VERTEXAI_ENDPOINT="ENDPOINT_ID"
python -m apache_beam.yaml.main \
- --yaml_pipeline_file
transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml \
+ --yaml_pipeline_file streaming_sentiment_analysis.yaml \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $PROJECT \
--region $REGION \
--num_workers $NUM_WORKERS \
--job_name $JOB_NAME \
- --jinja_variables '{ "GCS_PATH": "gs://YOUR-BUCKET/USComments.csv",
- "BOOTSTRAP_SERVERS": "BOOTSTRAP_IP_ADD:9092",
- "TOPIC": "YOUR_TOPIC", "USERNAME": "KAFKA_USERNAME", "PASSWORD":
"KAFKA_PASSWORD",
- "ENDPOINT": "ENDPOINT_ID", "PROJECT": "PROJECT_ID", "LOCATION": "LOCATION",
- "DATASET": "DATASET_ID", "TABLE": "TABLE_ID" }'
+ --jinja_variables '{ "GCS_PATH": "'$GCS_PATH'",
+ "BOOTSTRAP_SERVERS": "'$KAFKA_BOOTSTRAP_SERVERS'",
+ "TOPIC": "'$KAFKA_TOPIC'", "USERNAME": "'$KAFKA_USERNAME'", "PASSWORD":
"'$KAFKA_PASSWORD'",
+ "ENDPOINT": "'$VERTEXAI_ENDPOINT'", "PROJECT": "'$PROJECT'", "LOCATION":
"'$REGION'",
+ "BQ_TABLE": "'$BQ_TABLE_ID'" }'
```
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
index 63521208daa..10790998a9e 100644
---
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
@@ -44,19 +44,19 @@ pipeline:
return int(x)
except (ValueError):
return None
-
+
return (
- pcoll
+ pcoll
| beam.io.ReadFromCsv(
file_pattern,
names=['video_id', 'comment_text', 'likes', 'replies'],
on_bad_lines='skip',
converters={'likes': _to_int, 'replies': _to_int})
- | beam.Filter(lambda row:
+ | beam.Filter(lambda row:
None not in list(row._asdict().values()))
| beam.Map(lambda row: beam.Row(
- video_id=row.video_id,
- comment_text=row.comment_text,
+ video_id=str(row.video_id),
+ comment_text=str(row.comment_text),
likes=int(row.likes),
replies=int(row.replies)))
)
@@ -154,17 +154,16 @@ pipeline:
comment_text:
callable: |
from transformers import AutoTokenizer
-
+
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased",
use_fast=True)
-
- def truncate_sentence(row):
- tokens = tokenizer.tokenize(row.comment_text)
- if len(tokens) >= 250:
- tokens = tokens[:250]
- truncated_sentence =
tokenizer.convert_tokens_to_string(tokens)
- else:
- truncated_sentence = row.comment_text
+ def truncate_sentence(row):
+ tokens = tokenizer(
+ row.comment_text,
+ max_length=512,
+ padding='max_length',
+ truncation=True)
+ truncated_sentence = tokenizer.decode(tokens["input_ids"],
skip_special_tokens=True)
return truncated_sentence
likes: likes
replies: replies
@@ -242,7 +241,7 @@ pipeline:
name: WriteInferenceResultsToBQ
input: Windowing
config:
- table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
+ table: "{{ BQ_TABLE }}"
create_disposition: CREATE_IF_NEEDED
write_disposition: WRITE_APPEND
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md
index be2b48ffda3..75aa9545601 100644
--- a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md
+++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/README.md
@@ -42,6 +42,7 @@ a BigQuery dataset:
```sh
bq --location=us-central1 mk \
--dataset DATASET_ID
+export BQ_TABLE_ID="PROJECT_ID:DATASET_ID.TABLE_ID"
```
See also [here](
https://cloud.google.com/bigquery/docs/datasets) for more details on
@@ -49,8 +50,9 @@ how to create BigQuery datasets.
A trained model hosted on Vertex AI is needed before being able to use
the Vertex AI model handler. To train and deploy a custom model for the
-taxi fare prediction problem, open and run this [notebook](
-custom_nyc_taxifare_model_deployment.ipynb) in Colab Enterprise.
+taxi fare prediction problem, open and run the
+[custom_nyc_taxifare_model_deployment](
+custom_nyc_taxifare_model_deployment.ipynb) notebook in Colab Enterprise.
The pipeline first reads the data stream of taxi rides events from the
public PubSub topic and performs some transformations before writing it
@@ -68,17 +70,22 @@ export TEMP_LOCATION="gs://YOUR-BUCKET/tmp"
export REGION="us-central1"
export JOB_NAME="streaming-taxifare-prediction`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="3"
+export KAFKA_BOOTSTRAP_SERVERS="BOOTSTRAP_IP_ADD:9092"
+export KAFKA_TOPIC="YOUR_TOPIC"
+export KAFKA_USERNAME="KAFKA_USERNAME"
+export KAFKA_PASSWORD="KAFKA_PASSWORD"
+export VERTEXAI_ENDPOINT="ENDPOINT_ID"
python -m apache_beam.yaml.main \
- --yaml_pipeline_file
transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml \
+ --yaml_pipeline_file streaming_taxifare_prediction.yaml \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $PROJECT \
--region $REGION \
--num_workers $NUM_WORKERS \
--job_name $JOB_NAME \
- --jinja_variables '{ "BOOTSTRAP_SERVERS": "BOOTSTRAP_IP_ADD:9092",
- "TOPIC": "YOUR_TOPIC", "USERNAME": "KAFKA_USERNAME", "PASSWORD":
"KAFKA_PASSWORD",
- "ENDPOINT": "ENDPOINT_ID", "PROJECT": "PROJECT_ID", "LOCATION": "LOCATION",
- "DATASET": "DATASET_ID", "TABLE": "TABLE_ID" }'
+ --jinja_variables '{ "BOOTSTRAP_SERVERS": "'$KAFKA_BOOTSTRAP_SERVERS'",
+ "TOPIC": "'$KAFKA_TOPIC'", "USERNAME": "'$KAFKA_USERNAME'", "PASSWORD":
"'$KAFKA_PASSWORD'",
+ "ENDPOINT": "'$VERTEXAI_ENDPOINT'", "PROJECT": "'$PROJECT'", "LOCATION":
"'$REGION'",
+ "BQ_TABLE": "'$BQ_TABLE_ID'" }'
```
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
index eaabc73d5b0..1201c1011d4 100644
---
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/custom_nyc_taxifare_model_deployment.ipynb
@@ -36,7 +36,7 @@
"\n",
"<table><tbody><tr>\n",
" <td style=\"text-align: center\">\n",
- " <a
href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fpython%2Fapache_beam%2Fyaml%2Fexamples%2Ftransforms%2Fml%2Ftaxi_fare%2Fcustom_nyc_taxifare_model_deployment.ipynb\">\n",
+ " <a
href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2Fapache%2Fbeam%2Frefs%2Fheads%2Fmaster%2Fsdks%2Fpython%2Fapache_beam%2Fyaml%2Fexamples%2Ftransforms%2Fml%2Ftaxi_fare%2Fcustom_nyc_taxifare_model_deployment.ipynb\">\n",
" <img alt=\"Google Cloud Colab Enterprise logo\"
src=\"https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN\"
width=\"32px\"><br> Run in Colab Enterprise\n",
" </a>\n",
" </td>\n",
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
index 2f3cfa56027..c213cb0ab94 100644
---
a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml
@@ -308,7 +308,7 @@ pipeline:
name: WritePredictionsBQ
input: FormatInferenceOutput
config:
- table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
+ table: "{{ BQ_TABLE }}"
create_disposition: "CREATE_IF_NEEDED"
write_disposition: "WRITE_APPEND"