This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e591ef3dc1f YAML A Batch ML Pipeline Example (#35810)
e591ef3dc1f is described below
commit e591ef3dc1ff3dbb4f8808bd50cab669b3d81054
Author: Charles Nguyen <[email protected]>
AuthorDate: Mon Aug 18 12:15:21 2025 -0400
YAML A Batch ML Pipeline Example (#35810)
* WIP
* YAML Batch ML Pipeline Example
A YAML Batch ML Pipeline Example with
Iceberg, BigQuery IOs and MLTransform.
* Finish example and add unit test
* Fix CI/CD
* fixup! Fix CI/CD
* Fix CI/CD
* Fix pipeline dependencies problem
* Final touches and verify e2e
* Rebase and update
* fixup! Rebase and update
* Refine with comments and instructions
---
sdks/python/apache_beam/yaml/examples/README.md | 5 +-
.../yaml/examples/testing/examples_test.py | 125 +++++++++++++++++++--
.../yaml/examples/testing/input_data.py | 43 +++++++
.../examples/transforms/ml/log_analysis/README.md | 98 ++++++++++++++++
.../ml/log_analysis/anomaly_scoring.yaml | 93 +++++++++++++++
.../ml/log_analysis/batch_log_analysis.sh | 108 ++++++++++++++++++
.../ml/log_analysis/iceberg_migration.yaml | 45 ++++++++
.../ml/log_analysis/ml_preprocessing.yaml | 124 ++++++++++++++++++++
.../transforms/ml/log_analysis/requirements.txt | 24 ++++
.../examples/transforms/ml/log_analysis/train.py | 89 +++++++++++++++
10 files changed, 746 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/examples/README.md
b/sdks/python/apache_beam/yaml/examples/README.md
index 284de17a27a..c788b6d3e60 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -236,8 +236,11 @@ ML enrichments:
-
[enrich_spanner_with_bigquery.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/enrich_spanner_with_bigquery.yaml)
-
[bigtable_enrichment.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/bigtable_enrichment.yaml)
-Examples that include the `RunInference` transform for ML inference:
+Examples that include ML-specific transforms such as `RunInference` and
+`MLTransform`:
-
[streaming_sentiment_analysis.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml)
+-
[streaming_taxifare_prediction.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml)
+-
[batch_log_analysis.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.yaml)
More information can be found about aggregation transforms
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index 72b7208fab8..d75cdd99431 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -367,8 +367,8 @@ def create_test_method(
def _python_deps_involved(spec_filename):
return any(
- substr in spec_filename
- for substr in ['deps', 'streaming_sentiment_analysis'])
+ substr in spec_filename for substr in
+ ['deps', 'streaming_sentiment_analysis', 'ml_preprocessing'])
def _java_deps_involved(spec_filename):
return any(
@@ -596,7 +596,10 @@ def _kafka_test_preprocessor(
'test_oracle_to_bigquery_yaml',
'test_mysql_to_bigquery_yaml',
'test_spanner_to_bigquery_yaml',
- 'test_streaming_sentiment_analysis_yaml'
+ 'test_streaming_sentiment_analysis_yaml',
+ 'test_iceberg_migration_yaml',
+ 'test_ml_preprocessing_yaml',
+ 'test_anomaly_scoring_yaml'
])
def _io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -1063,15 +1066,123 @@ def _streaming_taxifare_prediction_test_preprocessor(
return test_spec
[email protected]_test_preprocessor([
+ 'test_iceberg_migration_yaml',
+ 'test_ml_preprocessing_yaml',
+ 'test_anomaly_scoring_yaml'
+])
+def _batch_log_analysis_test_preprocessor(
+ test_spec: dict, expected: List[str], env: TestEnvironment):
+ """
+ Preprocessor for tests that involve the batch log analysis example.
+
+ This preprocessor replaces several IO transforms and the MLTransform.
+ This allows the test to verify the pipeline's correctness
+ without relying on external data sources or MLTransform's many dependencies.
+
+ Args:
+ test_spec: The dictionary representation of the YAML pipeline
specification.
+ expected: A list of strings representing the expected output of the
+ pipeline.
+ env: The TestEnvironment object providing utilities for creating temporary
+ files.
+
+ Returns:
+ The modified test_spec dictionary with ReadFromText transforms replaced.
+ """
+
+ if pipeline := test_spec.get('pipeline', None):
+ for transform in pipeline.get('transforms', []):
+ # Mock ReadFromCsv in iceberg_migration.yaml pipeline
+ if transform.get('type', '') == 'ReadFromCsv':
+ file_name = 'system-logs.csv'
+ local_path = env.input_file(file_name, INPUT_FILES[file_name])
+ transform['config']['path'] = local_path
+
+ # Mock ReadFromIceberg in ml_preprocessing.yaml pipeline
+ elif transform.get('type', '') == 'ReadFromIceberg':
+ transform['type'] = 'Create'
+ transform['config'] = {
+ k: v
+ for (k, v) in transform.get('config', {}).items()
+ if (k.startswith('__'))
+ }
+
+ transform['config']['elements'] = input_data.system_logs_data()
+
+ # Mock MLTransform in ml_preprocessing.yaml pipeline
+ elif transform.get('type', '') == 'MLTransform':
+ transform['type'] = 'MapToFields'
+ transform['config'] = {
+ k: v
+ for (k, v) in transform.get('config', {}).items()
+ if k.startswith('__')
+ }
+
+ transform['config']['language'] = 'python'
+ transform['config']['fields'] = {
+ 'LineId': 'LineId',
+ 'Date': 'Date',
+ 'Time': 'Time',
+ 'Level': 'Level',
+ 'Process': 'Process',
+ 'Component': 'Component',
+ 'Content': 'Content',
+ 'embedding': {
+ 'callable': f"lambda row: {input_data.embedding_data()}",
+ }
+ }
+
+ # Mock MapToFields in ml_preprocessing.yaml pipeline
+ elif transform.get('type', '') == 'MapToFields' and \
+ transform.get('name', '') == 'Normalize':
+ transform['config']['dependencies'] = ['numpy']
+
+ # Mock ReadFromBigQuery in anomaly_scoring.yaml pipeline
+ elif transform.get('type', '') == 'ReadFromBigQuery':
+ transform['type'] = 'Create'
+ transform['config'] = {
+ k: v
+ for (k, v) in transform.get('config', {}).items()
+ if (k.startswith('__'))
+ }
+
+ transform['config']['elements'] = (
+ input_data.system_logs_embedding_data())
+
+ # Mock PyTransform in anomaly_scoring.yaml pipeline
+ elif transform.get('type', '') == 'PyTransform' and \
+ transform.get('name', '') == 'AnomalyScoring':
+ transform['type'] = 'MapToFields'
+ transform['config'] = {
+ k: v
+ for (k, v) in transform.get('config', {}).items()
+ if k.startswith('__')
+ }
+
+ transform['config']['language'] = 'python'
+ transform['config']['fields'] = {
+ 'example': 'embedding',
+ 'predictions': {
+ 'callable': """lambda row: [{
+ 'score': 0.65,
+ 'label': 0,
+ 'threshold': 0.8
+ }]""",
+ }
+ }
+
+ return test_spec
+
+
INPUT_FILES = {
'products.csv': input_data.products_csv(),
'kinglear.txt': input_data.text_data(),
- 'youtube-comments.csv': input_data.youtube_comments_csv()
+ 'youtube-comments.csv': input_data.youtube_comments_csv(),
+ 'system-logs.csv': input_data.system_logs_csv()
}
-KAFKA_TOPICS = {
- 'test-topic': input_data.kafka_messages_data(),
-}
+KAFKA_TOPICS = {'test-topic': input_data.kafka_messages_data()}
PUBSUB_TOPICS = {
'test-topic': input_data.pubsub_messages_data(),
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 99f8e9e3c67..27f210b4856 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -253,3 +253,46 @@ class TaxiRideEventSchema(typing.NamedTuple):
meter_reading: float
timestamp: str
ride_status: str
+
+
+def system_logs_csv():
+ return '\n'.join([
+ 'LineId,Date,Time,Level,Process,Component,Content',
+ '1,2024-10-01,12:00:00,INFO,Main,ComponentA,System started successfully',
+ '2,2024-10-01,12:00:05,WARN,Main,ComponentA,Memory usage is high',
+ '3,2024-10-01,12:00:10,ERROR,Main,ComponentA,Task failed due to timeout',
+ ])
+
+
+def system_logs_data():
+ csv_data = system_logs_csv()
+ lines = csv_data.strip().split('\n')
+ headers = lines[0].split(',')
+ logs = []
+ for row in lines[1:]:
+ values = row.split(',')
+ log = dict(zip(headers, values))
+ log['LineId'] = int(log['LineId'])
+ logs.append(log)
+
+ return logs
+
+
+def embedding_data():
+ return [0.1, 0.2, 0.3, 0.4, 0.5]
+
+
+def system_logs_embedding_data():
+ csv_data = system_logs_csv()
+ lines = csv_data.strip().split('\n')
+ headers = lines[0].split(',')
+ headers.append('embedding')
+ logs = []
+ for row in lines[1:]:
+ values = row.split(',')
+ values.append(embedding_data())
+ log = dict(zip(headers, values))
+ log['LineId'] = int(log['LineId'])
+ logs.append(log)
+
+ return logs
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/README.md
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/README.md
new file mode 100644
index 00000000000..b8763bcb511
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/README.md
@@ -0,0 +1,98 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Batch Log Analysis ML Workflow
+
+This example contains several pipelines that leverage Iceberg and BigQuery
+IOs as well as MLTransform to demonstrate an end-to-end ML anomaly detection
+workflow on system logs.
+
+Download [Google Cloud CLI](https://cloud.google.com/sdk/docs/install-sdk).
+
+Install required Python dependencies in a virtual environment:
+```sh
+python -m venv env
+. env/bin/activate
+pip install 'apache-beam[gcp,yaml]' db-dtypes -r requirements.txt
+```
+
+The system logs dataset is from
[logpai/loghub](https://github.com/logpai/loghub)
+GitHub repository, and specifically the [sample HDFS `.csv` dataset](
+https://github.com/logpai/loghub/blob/master/Hadoop/Hadoop_2k.log_structured.csv)
+is used in this example.
+
+Download the dataset and copy it over to a GCS bucket:
+```sh
+gcloud storage cp /path/to/Hadoop_2k.log_structured.csv \
+ gs://YOUR-BUCKET/Hadoop_2k.log_structured.csv
+```
+**NOTE**: This example requires the GCS bucket created to be a single-region
+bucket.
+
+For Iceberg tables, GCS is also used as the storage layer in this workflow.
+In a data lakehouse with Iceberg and GCS object storage, a natural choice
+for Iceberg catalog is [BigLake
metastore](https://cloud.google.com/bigquery/docs/about-blms).
+It is a managed, serverless metastore that doesn't require any setup.
+
+A BigQuery dataset needs to exist first before the pipeline can
+create/write to a table. Run the following command to create
+a BigQuery dataset:
+
+```sh
+bq --location=YOUR_REGION mk \
+ --dataset YOUR_DATASET
+```
+
+The workflow starts with pipeline
[iceberg_migration.yaml](./iceberg_migration.yaml)
+that ingests the `.csv` log data and writes to an Iceberg table on GCS with
+BigLake metastore for catalog.
+The next pipeline [ml_preprocessing.yaml](./ml_preprocessing.yaml) reads
+from this Iceberg table and perform ML-specific transformations such as
+computing text embedding and normalization, before writing the embeddings to
+a BigQuery table.
+An anomaly detection model is then trained (in [train.py](./train.py) script)
+on these vector embeddings, and is subsequently saved as artifact on GCS.
+The last pipeline [anomaly_scoring.yaml](./anomaly_scoring.yaml) reads the
+same embeddings from the BigQuery table, uses Beam's anomaly detection
+module to load the model artifact from GCS and perform anomaly scoring,
+before writing it to another Iceberg table.
+
+This entire workflow execution is encapsulated in the `batch_log_analysis.sh`
+script that runs these workloads sequentially.
+
+Run the pipelines locally:
+```sh
+./batch_log_analysis.sh --runner DirectRunner \
+ --project YOUR_PROJECT \
+ --region YOUR_REGION \
+ --warehouse gs://YOUR-BUCKET \
+ --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+```
+
+Run the pipelines on Dataflow:
+```sh
+ ./batch_log_analysis.sh --runner DataflowRunner \
+ --project YOUR_PROJECT \
+ --region YOUR_REGION \
+ --temp_location gs://YOUR-BUCKET/tmp \
+ --num_workers 1 \
+ --worker_machine_type n1-standard-2 \
+ --warehouse gs://YOUR-BUCKET \
+ --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+```
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/anomaly_scoring.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/anomaly_scoring.yaml
new file mode 100644
index 00000000000..dd4f4840127
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/anomaly_scoring.yaml
@@ -0,0 +1,93 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads text embeddings from a BigQuery table, applies anomaly
+# scoring using a custom pre-trained k-Nearest Neighbours (KNN) model, and
+# writes the results to an Iceberg table on GCS with BigLake metastore for
+# catalog.
+
+pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromBigQuery
+ name: ReadFromBigQuery
+ config:
+ table: "{{ BQ_TABLE }}"
+ fields: [embedding]
+
+ - type: PyTransform
+ name: AnomalyScoring
+ config:
+ constructor: __constructor__
+ kwargs:
+ source: |
+ import apache_beam as beam
+ from apache_beam.ml.anomaly.detectors.pyod_adapter import
PyODFactory
+ from apache_beam.ml.anomaly.transforms import AnomalyDetection
+
+ class KNN(beam.PTransform):
+ def __init__(self, model_artifact_path):
+ self.model_artifact_path = model_artifact_path
+ self.model = PyODFactory.create_detector(
+ self.model_artifact_path,
+ model_id="knn",
+ )
+
+ def expand(self, pcoll):
+ return (
+ pcoll
+ | beam.Map(lambda x: x.embedding)
+ | AnomalyDetection(detector=self.model)
+ | beam.Map(lambda x: beam.Row(
+ example=x.example,
+ predictions=[pred.__dict__ for pred in x.predictions]))
+ )
+
+ model_artifact_path: "{{ WAREHOUSE }}/knn_model.pkl"
+
+ - type: MapToFields
+ name: ResultSchemaMapping
+ config:
+ language: python
+ fields:
+ anomaly_score:
+ callable: "lambda row: row.predictions[0]['score']"
+ output_type: number
+ anomaly_label:
+ callable: "lambda row: row.predictions[0]['label']"
+ output_type: integer
+ threshold:
+ callable: "lambda row: row.predictions[0]['threshold']"
+ output_type: number
+
+ - type: WriteToIceberg
+ name: WriteToIceberg
+ config:
+ table: "logs_analytics.hdfs_anomaly"
+ catalog_name: "rest_catalog"
+ catalog_properties:
+ warehouse: "{{ WAREHOUSE }}"
+ catalog-impl:
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
+ io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
+ gcp_project: "{{ PROJECT }}"
+ gcp_location: "{{ REGION }}"
+
+# Expected:
+# Row(anomaly_score=0.65, anomaly_label=0, threshold=0.8)
+# Row(anomaly_score=0.65, anomaly_label=0, threshold=0.8)
+# Row(anomaly_score=0.65, anomaly_label=0, threshold=0.8)
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.sh
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.sh
new file mode 100755
index 00000000000..6709419076a
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.sh
@@ -0,0 +1,108 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Usage:
+# With DirectRunner:
+# ./batch_log_analysis.sh --runner DirectRunner \
+# --project YOUR_PROJECT \
+# --region YOUR_REGION \
+# --warehouse gs://YOUR-BUCKET \
+# --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+
+# With DataflowRunner:
+# ./batch_log_analysis.sh --runner DataflowRunner \
+# --project YOUR_PROJECT \
+# --region YOUR_REGION \
+# --temp_location gs://YOUR-BUCKET/temp \
+# --num_workers 1 \
+# --worker_machine_type n1-standard-2 \
+# --warehouse gs://YOUR-BUCKET \
+# --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+
+set -e
+
+while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --runner) RUNNER="$2"; shift 2 ;;
+ --project) PROJECT="$2"; shift 2 ;;
+ --region) REGION="$2"; shift 2 ;;
+ --temp_location) TEMP_LOCATION="$2"; shift 2 ;;
+ --num_workers) NUM_WORKERS="$2"; shift 2 ;;
+ --worker_machine_type) WORKER_MACHINE_TYPE="$2"; shift 2 ;;
+ --warehouse) WAREHOUSE="$2"; shift 2 ;;
+ --bq_table) BQ_TABLE="$2"; shift 2 ;;
+ *) echo "Unknown argument: $1"; exit 1 ;;
+ esac
+done
+
+if [[ "$RUNNER" == "DataflowRunner" ]]; then
+ DATAFLOW_COMMON_ARGS="--job_name batch-log-analysis-$(date +%Y%m%d-%H%M%S)
+ --project $PROJECT --region $REGION
+ --temp_location $TEMP_LOCATION
+ --num_workers $NUM_WORKERS --worker_machine_type $WORKER_MACHINE_TYPE
+ --disk_size_gb 50"
+else
+ DATAFLOW_COMMON_ARGS=""
+fi
+
+if ! command -v python &> /dev/null; then
+ echo "Error: Python is not installed. Please install Python to continue."
+ exit 1
+fi
+
+if ! command -v gcloud &> /dev/null; then
+ echo "Error: gcloud CLI is not installed. Please install gcloud to continue."
+ exit 1
+fi
+
+if [ -d "./beam-ml-artifacts" ]; then
+ echo "Removing existing MLTransform's artifact directory..."
+ rm -rf ./beam-ml-artifacts
+fi
+
+echo "Running iceberg_migration.yaml pipeline..."
+python -m apache_beam.yaml.main --yaml_pipeline_file iceberg_migration.yaml \
+ --runner $RUNNER \
+ --jinja_variables '{ "WAREHOUSE":"'$WAREHOUSE'", "PROJECT":"'$PROJECT'",
"REGION":"'$REGION'" }' \
+ $DATAFLOW_COMMON_ARGS
+
+echo "Running ml_preprocessing.yaml pipeline..."
+python -m apache_beam.yaml.main --yaml_pipeline_file ml_preprocessing.yaml \
+ --runner $RUNNER \
+ --jinja_variables '{ "WAREHOUSE":"'$WAREHOUSE'", "PROJECT":"'$PROJECT'",
"REGION":"'$REGION'", "BQ_TABLE":"'$BQ_TABLE'" }' \
+ $DATAFLOW_COMMON_ARGS \
+ --requirements_file requirements.txt
+
+echo "Running train.py..."
+python train.py --bq_table $BQ_TABLE
+
+if [ ! -f "./knn_model.pkl" ]; then
+ echo "Error: Model artifact 'knn_model.pkl' not found. Ensure model training
is successful."
+ exit 1
+fi
+
+echo "Uploading trained model to GCS..."
+gcloud storage cp "./knn_model.pkl" "$WAREHOUSE/knn_model.pkl"
+
+echo "Running anomaly_scoring.yaml pipeline..."
+python -m apache_beam.yaml.main --yaml_pipeline_file anomaly_scoring.yaml \
+ --runner $RUNNER \
+ --jinja_variables '{ "WAREHOUSE":"'$WAREHOUSE'", "PROJECT":"'$PROJECT'",
"REGION":"'$REGION'", "BQ_TABLE":"'$BQ_TABLE'" }' \
+ $DATAFLOW_COMMON_ARGS \
+ --requirements_file requirements.txt
+
+echo "All steps completed."
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/iceberg_migration.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/iceberg_migration.yaml
new file mode 100644
index 00000000000..8d0518c3635
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/iceberg_migration.yaml
@@ -0,0 +1,45 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads structured logs from a CSV file and writes them to an
+# Iceberg table on GCS with BigLake metastore for catalog.
+
+pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromCsv
+ name: ReadLogs
+ config:
+ path: "{{ WAREHOUSE }}/Hadoop_2k.log_structured.csv"
+
+ - type: WriteToIceberg
+ name: WriteToIceberg
+ config:
+ table: "logs_dataset.logs_hdfs"
+ catalog_name: "rest_catalog"
+ catalog_properties:
+ warehouse: "{{ WAREHOUSE }}"
+ catalog-impl:
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
+ io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
+ gcp_project: "{{ PROJECT }}"
+ gcp_location: "{{ REGION }}"
+
+# Expected:
+# Row(LineId=1, Date='2024-10-01', Time='12:00:00', Level='INFO',
Process='Main', Component='ComponentA', Content='System started successfully')
+# Row(LineId=2, Date='2024-10-01', Time='12:00:05', Level='WARN',
Process='Main', Component='ComponentA', Content='Memory usage is high')
+# Row(LineId=3, Date='2024-10-01', Time='12:00:10', Level='ERROR',
Process='Main', Component='ComponentA', Content='Task failed due to timeout')
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/ml_preprocessing.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/ml_preprocessing.yaml
new file mode 100644
index 00000000000..c83eb19e648
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/ml_preprocessing.yaml
@@ -0,0 +1,124 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads structured logs from an Iceberg table, applied
+# ML-specific transformations before writing them to a BigQuery table.
+
+pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromIceberg
+ name: ReadFromIceberg
+ config:
+ table: "logs_dataset.logs_hdfs"
+ catalog_name: "rest_catalog"
+ catalog_properties:
+ warehouse: "{{ WAREHOUSE }}"
+ catalog-impl:
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
+ io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
+ gcp_project: "{{ PROJECT }}"
+ gcp_location: "{{ REGION }}"
+
+ - type: MapToFields
+ name: MapToFields
+ config:
+ language: python
+ append: true
+ fields:
+ embedding:
+ callable: |
+ def fn(row):
+ line = [row.Date, row.Time, row.Level, row.Process,
+ row.Component, row.Content]
+ return " ".join(line)
+
+ - type: MLTransform
+ name: Embedding
+ config:
+ write_artifact_location: "./beam-ml-artifacts"
+ transforms:
+ - type: SentenceTransformerEmbeddings
+ config: { model_name: all-MiniLM-L6-v2, columns: [ embedding ] }
+
+ - type: MapToFields
+ name: SchemaMapping
+ config:
+ language: python
+ fields:
+ id:
+ callable: "lambda row: row.LineId"
+ output_type: integer
+ date:
+ callable: "lambda row: row.Date"
+ output_type: string
+ time:
+ callable: "lambda row: row.Time"
+ output_type: string
+ level:
+ callable: "lambda row: row.Level"
+ output_type: string
+ process:
+ callable: "lambda row: row.Process"
+ output_type: string
+ component:
+ callable: "lambda row: row.Component"
+ output_type: string
+ content:
+ callable: "lambda row: row.Content"
+ output_type: string
+ embedding:
+ callable: "lambda row: row.embedding"
+ output_type:
+ type: array
+ items:
+ type: number
+
+ - type: MapToFields
+ name: Normalize
+ config:
+ language: python
+ append: true
+ drop: [embedding]
+ fields:
+ embedding:
+ callable: |
+ import numpy as np
+
+ def normalize(row):
+ embedding = row.embedding
+ norm = np.linalg.norm(embedding)
+ return embedding / norm
+ output_type:
+ type: array
+ items:
+ type: number
+
+ - type: WriteToBigQuery
+ name: WriteToBigQuery
+ config:
+ table: "{{ BQ_TABLE }}"
+ write_disposition: "WRITE_TRUNCATE"
+ create_disposition: "CREATE_IF_NEEDED"
+
+options:
+ yaml_experimental_features: [ 'ML' ]
+
+# Expected:
+# Row(id=1, date='2024-10-01', time='12:00:00', level='INFO', process='Main',
component='ComponentA', content='System started successfully',
embedding=[0.13483997249264842, 0.26967994498529685, 0.40451991747794525,
0.5393598899705937, 0.674199862463242])
+# Row(id=2, date='2024-10-01', time='12:00:05', level='WARN', process='Main',
component='ComponentA', content='Memory usage is high',
embedding=[0.13483997249264842, 0.26967994498529685, 0.40451991747794525,
0.5393598899705937, 0.674199862463242])
+# Row(id=3, date='2024-10-01', time='12:00:10', level='ERROR',
process='Main', component='ComponentA', content='Task failed due to timeout',
embedding=[0.13483997249264842, 0.26967994498529685, 0.40451991747794525,
0.5393598899705937, 0.674199862463242])
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/requirements.txt
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/requirements.txt
new file mode 100644
index 00000000000..2a06309044e
--- /dev/null
+++
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/requirements.txt
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Additional dependencies needed for the pipelines in the log analysis
+# ML workflow.
+# These can be installed on local machine to run the pipelines with local
+# runner, or submitted as part of Dataflow job for the workers' execution
+# environment.
+sentence-transformers~=5.0.0
+pyod~=2.0.5
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/train.py
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/train.py
new file mode 100644
index 00000000000..f0f957aa7ba
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/train.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A script that trains a k-Nearest Neighbors (KNN) model on vector embeddings
+data queried from BigQuery.
+"""
+
+import argparse
+import logging
+import pickle
+
+import numpy as np
+from google.cloud import bigquery
+
+from pyod.models.knn import KNN
+
+
+def parse_arguments():
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--bq_table',
+ required=True,
+ help='BigQuery fully qualified table name that contains vector '
+ 'embeddings for training, '
+ 'specified as `YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE`.')
+
+ return parser.parse_known_args()
+
+
+class ModelHelper():
+ def __init__(self):
+ args, _ = parse_arguments()
+ self.n_neighbors = 8
+ self.method = 'largest'
+ self.metric = 'euclidean'
+ self.contamination = 0.1
+ self.bq_table = args.bq_table
+ self.dataset = None
+
+ def load_data(self):
+ logging.info("Querying vector embeddings from BigQuery...")
+
+ client = bigquery.Client()
+ sql = f"""
+ SELECT *
+ FROM `{self.bq_table}`
+ """
+ df = client.query_and_wait(sql).to_dataframe()
+ self.dataset = np.stack(df['embedding'].to_numpy())
+
+ def train_model(self):
+ logging.info("Training KNN model...")
+
+ model = KNN(
+ n_neighbors=self.n_neighbors,
+ method=self.method,
+ metric=self.metric,
+ contamination=self.contamination,
+ )
+ model.fit(self.dataset)
+
+ logging.info("KNN model trained successfully! Saving model...")
+
+ model_pickled_filename = 'knn_model.pkl'
+ with open(model_pickled_filename, 'wb') as f:
+ pickle.dump(model, f)
+
+
+if __name__ == "__main__":
+ logging.getLogger().setLevel(logging.INFO)
+
+ helper = ModelHelper()
+ helper.load_data()
+ helper.train_model()