This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e591ef3dc1f YAML A Batch ML Pipeline Example (#35810)
e591ef3dc1f is described below

commit e591ef3dc1ff3dbb4f8808bd50cab669b3d81054
Author: Charles Nguyen <[email protected]>
AuthorDate: Mon Aug 18 12:15:21 2025 -0400

    YAML A Batch ML Pipeline Example (#35810)
    
    * WIP
    
    * YAML Batch ML Pipeline Example
    
    A YAML Batch ML Pipeline Example with
    Iceberg, BigQuery IOs and MLTransform.
    
    * Finish example and add unit test
    
    * Fix CI/CD
    
    * fixup! Fix CI/CD
    
    * Fix CI/CD
    
    * Fix pipeline dependencies problem
    
    * Final touches and verify e2e
    
    * Rebase and update
    
    * fixup! Rebase and update
    
    * Refine with comments and instructions
---
 sdks/python/apache_beam/yaml/examples/README.md    |   5 +-
 .../yaml/examples/testing/examples_test.py         | 125 +++++++++++++++++++--
 .../yaml/examples/testing/input_data.py            |  43 +++++++
 .../examples/transforms/ml/log_analysis/README.md  |  98 ++++++++++++++++
 .../ml/log_analysis/anomaly_scoring.yaml           |  93 +++++++++++++++
 .../ml/log_analysis/batch_log_analysis.sh          | 108 ++++++++++++++++++
 .../ml/log_analysis/iceberg_migration.yaml         |  45 ++++++++
 .../ml/log_analysis/ml_preprocessing.yaml          | 124 ++++++++++++++++++++
 .../transforms/ml/log_analysis/requirements.txt    |  24 ++++
 .../examples/transforms/ml/log_analysis/train.py   |  89 +++++++++++++++
 10 files changed, 746 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/examples/README.md 
b/sdks/python/apache_beam/yaml/examples/README.md
index 284de17a27a..c788b6d3e60 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -236,8 +236,11 @@ ML enrichments:
 - 
[enrich_spanner_with_bigquery.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/enrich_spanner_with_bigquery.yaml)
 - 
[bigtable_enrichment.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/bigtable_enrichment.yaml)
 
-Examples that include the `RunInference` transform for ML inference:
+Examples that include ML-specific transforms such as `RunInference` and
+`MLTransform`:
 - 
[streaming_sentiment_analysis.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml)
+- 
[streaming_taxifare_prediction.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml)
+- 
[batch_log_analysis.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.yaml)
 
 More information can be found about aggregation transforms
 [here](https://beam.apache.org/documentation/sdks/yaml-combine/).
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py 
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index 72b7208fab8..d75cdd99431 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -367,8 +367,8 @@ def create_test_method(
 
   def _python_deps_involved(spec_filename):
     return any(
-        substr in spec_filename
-        for substr in ['deps', 'streaming_sentiment_analysis'])
+        substr in spec_filename for substr in
+        ['deps', 'streaming_sentiment_analysis', 'ml_preprocessing'])
 
   def _java_deps_involved(spec_filename):
     return any(
@@ -596,7 +596,10 @@ def _kafka_test_preprocessor(
     'test_oracle_to_bigquery_yaml',
     'test_mysql_to_bigquery_yaml',
     'test_spanner_to_bigquery_yaml',
-    'test_streaming_sentiment_analysis_yaml'
+    'test_streaming_sentiment_analysis_yaml',
+    'test_iceberg_migration_yaml',
+    'test_ml_preprocessing_yaml',
+    'test_anomaly_scoring_yaml'
 ])
 def _io_write_test_preprocessor(
     test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -1063,15 +1066,123 @@ def _streaming_taxifare_prediction_test_preprocessor(
   return test_spec
 
 
[email protected]_test_preprocessor([
+    'test_iceberg_migration_yaml',
+    'test_ml_preprocessing_yaml',
+    'test_anomaly_scoring_yaml'
+])
+def _batch_log_analysis_test_preprocessor(
+    test_spec: dict, expected: List[str], env: TestEnvironment):
+  """
+  Preprocessor for tests that involve the batch log analysis example.
+
+  This preprocessor replaces several IO transforms and the MLTransform.
+  This allows the test to verify the pipeline's correctness
+  without relying on external data sources or MLTransform's many dependencies.
+
+  Args:
+    test_spec: The dictionary representation of the YAML pipeline 
specification.
+    expected: A list of strings representing the expected output of the
+      pipeline.
+    env: The TestEnvironment object providing utilities for creating temporary
+      files.
+
+  Returns:
+    The modified test_spec dictionary with ReadFromText transforms replaced.
+  """
+
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      # Mock ReadFromCsv in iceberg_migration.yaml pipeline
+      if transform.get('type', '') == 'ReadFromCsv':
+        file_name = 'system-logs.csv'
+        local_path = env.input_file(file_name, INPUT_FILES[file_name])
+        transform['config']['path'] = local_path
+
+      # Mock ReadFromIceberg in ml_preprocessing.yaml pipeline
+      elif transform.get('type', '') == 'ReadFromIceberg':
+        transform['type'] = 'Create'
+        transform['config'] = {
+            k: v
+            for (k, v) in transform.get('config', {}).items()
+            if (k.startswith('__'))
+        }
+
+        transform['config']['elements'] = input_data.system_logs_data()
+
+      # Mock MLTransform in ml_preprocessing.yaml pipeline
+      elif transform.get('type', '') == 'MLTransform':
+        transform['type'] = 'MapToFields'
+        transform['config'] = {
+            k: v
+            for (k, v) in transform.get('config', {}).items()
+            if k.startswith('__')
+        }
+
+        transform['config']['language'] = 'python'
+        transform['config']['fields'] = {
+            'LineId': 'LineId',
+            'Date': 'Date',
+            'Time': 'Time',
+            'Level': 'Level',
+            'Process': 'Process',
+            'Component': 'Component',
+            'Content': 'Content',
+            'embedding': {
+                'callable': f"lambda row: {input_data.embedding_data()}",
+            }
+        }
+
+      # Mock MapToFields in ml_preprocessing.yaml pipeline
+      elif transform.get('type', '') == 'MapToFields' and \
+          transform.get('name', '') == 'Normalize':
+        transform['config']['dependencies'] = ['numpy']
+
+      # Mock ReadFromBigQuery in anomaly_scoring.yaml pipeline
+      elif transform.get('type', '') == 'ReadFromBigQuery':
+        transform['type'] = 'Create'
+        transform['config'] = {
+            k: v
+            for (k, v) in transform.get('config', {}).items()
+            if (k.startswith('__'))
+        }
+
+        transform['config']['elements'] = (
+            input_data.system_logs_embedding_data())
+
+      # Mock PyTransform in anomaly_scoring.yaml pipeline
+      elif transform.get('type', '') == 'PyTransform' and \
+          transform.get('name', '') == 'AnomalyScoring':
+        transform['type'] = 'MapToFields'
+        transform['config'] = {
+            k: v
+            for (k, v) in transform.get('config', {}).items()
+            if k.startswith('__')
+        }
+
+        transform['config']['language'] = 'python'
+        transform['config']['fields'] = {
+            'example': 'embedding',
+            'predictions': {
+                'callable': """lambda row: [{
+                  'score': 0.65,
+                  'label': 0,
+                  'threshold': 0.8
+              }]""",
+            }
+        }
+
+  return test_spec
+
+
 INPUT_FILES = {
     'products.csv': input_data.products_csv(),
     'kinglear.txt': input_data.text_data(),
-    'youtube-comments.csv': input_data.youtube_comments_csv()
+    'youtube-comments.csv': input_data.youtube_comments_csv(),
+    'system-logs.csv': input_data.system_logs_csv()
 }
 
-KAFKA_TOPICS = {
-    'test-topic': input_data.kafka_messages_data(),
-}
+KAFKA_TOPICS = {'test-topic': input_data.kafka_messages_data()}
 
 PUBSUB_TOPICS = {
     'test-topic': input_data.pubsub_messages_data(),
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py 
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 99f8e9e3c67..27f210b4856 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -253,3 +253,46 @@ class TaxiRideEventSchema(typing.NamedTuple):
   meter_reading: float
   timestamp: str
   ride_status: str
+
+
+def system_logs_csv():
+  return '\n'.join([
+      'LineId,Date,Time,Level,Process,Component,Content',
+      '1,2024-10-01,12:00:00,INFO,Main,ComponentA,System started successfully',
+      '2,2024-10-01,12:00:05,WARN,Main,ComponentA,Memory usage is high',
+      '3,2024-10-01,12:00:10,ERROR,Main,ComponentA,Task failed due to timeout',
+  ])
+
+
+def system_logs_data():
+  csv_data = system_logs_csv()
+  lines = csv_data.strip().split('\n')
+  headers = lines[0].split(',')
+  logs = []
+  for row in lines[1:]:
+    values = row.split(',')
+    log = dict(zip(headers, values))
+    log['LineId'] = int(log['LineId'])
+    logs.append(log)
+
+  return logs
+
+
+def embedding_data():
+  return [0.1, 0.2, 0.3, 0.4, 0.5]
+
+
+def system_logs_embedding_data():
+  csv_data = system_logs_csv()
+  lines = csv_data.strip().split('\n')
+  headers = lines[0].split(',')
+  headers.append('embedding')
+  logs = []
+  for row in lines[1:]:
+    values = row.split(',')
+    values.append(embedding_data())
+    log = dict(zip(headers, values))
+    log['LineId'] = int(log['LineId'])
+    logs.append(log)
+
+  return logs
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/README.md 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/README.md
new file mode 100644
index 00000000000..b8763bcb511
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/README.md
@@ -0,0 +1,98 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+## Batch Log Analysis ML Workflow
+
+This example contains several pipelines that leverage Iceberg and BigQuery
+IOs as well as MLTransform to demonstrate an end-to-end ML anomaly detection
+workflow on system logs.
+
+Download [Google Cloud CLI](https://cloud.google.com/sdk/docs/install-sdk).
+
+Install required Python dependencies in a virtual environment:
+```sh
+python -m venv env
+. env/bin/activate
+pip install 'apache-beam[gcp,yaml]' db-dtypes -r requirements.txt
+```
+
+The system logs dataset is from 
[logpai/loghub](https://github.com/logpai/loghub)
+GitHub repository, and specifically the [sample HDFS `.csv` dataset](
+https://github.com/logpai/loghub/blob/master/Hadoop/Hadoop_2k.log_structured.csv)
+is used in this example.
+
+Download the dataset and copy it over to a GCS bucket:
+```sh
+gcloud storage cp /path/to/Hadoop_2k.log_structured.csv \
+  gs://YOUR-BUCKET/Hadoop_2k.log_structured.csv
+```
+**NOTE**: This example requires the GCS bucket created to be a single-region
+bucket.
+
+For Iceberg tables, GCS is also used as the storage layer in this workflow.
+In a data lakehouse with Iceberg and GCS object storage, a natural choice
+for Iceberg catalog is [BigLake 
metastore](https://cloud.google.com/bigquery/docs/about-blms).
+It is a managed, serverless metastore that doesn't require any setup.
+
+A BigQuery dataset needs to exist first before the pipeline can
+create/write to a table. Run the following command to create
+a BigQuery dataset:
+
+```sh
+bq --location=YOUR_REGION mk \
+  --dataset YOUR_DATASET
+```
+
+The workflow starts with pipeline 
[iceberg_migration.yaml](./iceberg_migration.yaml)
+that ingests the `.csv` log data and writes to an Iceberg table on GCS with
+BigLake metastore for catalog.
+The next pipeline [ml_preprocessing.yaml](./ml_preprocessing.yaml) reads
+from this Iceberg table and perform ML-specific transformations such as
+computing text embedding and normalization, before writing the embeddings to
+a BigQuery table.
+An anomaly detection model is then trained (in [train.py](./train.py) script)
+on these vector embeddings, and is subsequently saved as artifact on GCS.
+The last pipeline [anomaly_scoring.yaml](./anomaly_scoring.yaml) reads the
+same embeddings from the BigQuery table, uses Beam's anomaly detection
+module to load the model artifact from GCS and perform anomaly scoring,
+before writing it to another Iceberg table.
+
+This entire workflow execution is encapsulated in the `batch_log_analysis.sh`
+script that runs these workloads sequentially.
+
+Run the pipelines locally:
+```sh
+./batch_log_analysis.sh --runner DirectRunner \
+  --project YOUR_PROJECT \
+  --region YOUR_REGION \
+  --warehouse gs://YOUR-BUCKET \
+  --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+```
+
+Run the pipelines on Dataflow:
+```sh
+ ./batch_log_analysis.sh --runner DataflowRunner \
+   --project YOUR_PROJECT \
+   --region YOUR_REGION \
+   --temp_location gs://YOUR-BUCKET/tmp \
+   --num_workers 1 \
+   --worker_machine_type n1-standard-2 \
+   --warehouse gs://YOUR-BUCKET \
+   --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+```
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/anomaly_scoring.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/anomaly_scoring.yaml
new file mode 100644
index 00000000000..dd4f4840127
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/anomaly_scoring.yaml
@@ -0,0 +1,93 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads text embeddings from a BigQuery table, applies anomaly
+# scoring using a custom pre-trained k-Nearest Neighbours (KNN) model, and
+# writes the results to an Iceberg table on GCS with BigLake metastore for
+# catalog.
+
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromBigQuery
+      name: ReadFromBigQuery
+      config:
+        table: "{{ BQ_TABLE }}"
+        fields: [embedding]
+
+    - type: PyTransform
+      name: AnomalyScoring
+      config:
+        constructor: __constructor__
+        kwargs:
+          source: |
+            import apache_beam as beam
+            from apache_beam.ml.anomaly.detectors.pyod_adapter import 
PyODFactory
+            from apache_beam.ml.anomaly.transforms import AnomalyDetection
+            
+            class KNN(beam.PTransform):
+              def __init__(self, model_artifact_path):
+                self.model_artifact_path = model_artifact_path
+                self.model = PyODFactory.create_detector(
+                   self.model_artifact_path,
+                   model_id="knn",
+                )
+            
+              def expand(self, pcoll):
+                return (
+                  pcoll
+                  | beam.Map(lambda x: x.embedding)
+                  | AnomalyDetection(detector=self.model)
+                  | beam.Map(lambda x: beam.Row(
+                      example=x.example,
+                      predictions=[pred.__dict__ for pred in x.predictions]))
+              )
+
+          model_artifact_path: "{{ WAREHOUSE }}/knn_model.pkl"
+
+    - type: MapToFields
+      name: ResultSchemaMapping
+      config:
+        language: python
+        fields:
+          anomaly_score:
+            callable: "lambda row: row.predictions[0]['score']"
+            output_type: number
+          anomaly_label:
+            callable: "lambda row: row.predictions[0]['label']"
+            output_type: integer
+          threshold:
+            callable: "lambda row: row.predictions[0]['threshold']"
+            output_type: number
+
+    - type: WriteToIceberg
+      name: WriteToIceberg
+      config:
+        table: "logs_analytics.hdfs_anomaly"
+        catalog_name: "rest_catalog"
+        catalog_properties:
+          warehouse: "{{ WAREHOUSE }}"
+          catalog-impl: 
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
+          io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
+          gcp_project: "{{ PROJECT }}"
+          gcp_location: "{{ REGION }}"
+
+# Expected:
+#  Row(anomaly_score=0.65, anomaly_label=0, threshold=0.8)
+#  Row(anomaly_score=0.65, anomaly_label=0, threshold=0.8)
+#  Row(anomaly_score=0.65, anomaly_label=0, threshold=0.8)
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.sh
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.sh
new file mode 100755
index 00000000000..6709419076a
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/batch_log_analysis.sh
@@ -0,0 +1,108 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Usage:
+# With DirectRunner:
+# ./batch_log_analysis.sh --runner DirectRunner \
+#   --project YOUR_PROJECT \
+#   --region YOUR_REGION \
+#   --warehouse gs://YOUR-BUCKET \
+#   --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+
+# With DataflowRunner:
+# ./batch_log_analysis.sh --runner DataflowRunner \
+#   --project YOUR_PROJECT \
+#   --region YOUR_REGION \
+#   --temp_location gs://YOUR-BUCKET/temp \
+#   --num_workers 1 \
+#   --worker_machine_type n1-standard-2 \
+#   --warehouse gs://YOUR-BUCKET \
+#   --bq_table YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE
+
+set -e
+
+while [[ $# -gt 0 ]]; do
+  case "$1" in
+    --runner) RUNNER="$2"; shift 2 ;;
+    --project) PROJECT="$2"; shift 2 ;;
+    --region) REGION="$2"; shift 2 ;;
+    --temp_location) TEMP_LOCATION="$2"; shift 2 ;;
+    --num_workers) NUM_WORKERS="$2"; shift 2 ;;
+    --worker_machine_type) WORKER_MACHINE_TYPE="$2"; shift 2 ;;
+    --warehouse) WAREHOUSE="$2"; shift 2 ;;
+    --bq_table) BQ_TABLE="$2"; shift 2 ;;
+    *) echo "Unknown argument: $1"; exit 1 ;;
+  esac
+done
+
+if [[ "$RUNNER" == "DataflowRunner" ]]; then
+  DATAFLOW_COMMON_ARGS="--job_name batch-log-analysis-$(date +%Y%m%d-%H%M%S)
+    --project $PROJECT --region $REGION
+    --temp_location $TEMP_LOCATION
+    --num_workers $NUM_WORKERS --worker_machine_type $WORKER_MACHINE_TYPE
+    --disk_size_gb 50"
+else
+  DATAFLOW_COMMON_ARGS=""
+fi
+
+if ! command -v python &> /dev/null; then
+  echo "Error: Python is not installed. Please install Python to continue."
+  exit 1
+fi
+
+if ! command -v gcloud &> /dev/null; then
+  echo "Error: gcloud CLI is not installed. Please install gcloud to continue."
+  exit 1
+fi
+
+if [ -d "./beam-ml-artifacts" ]; then
+  echo "Removing existing MLTransform's artifact directory..."
+  rm -rf ./beam-ml-artifacts
+fi
+
+echo "Running iceberg_migration.yaml pipeline..."
+python -m apache_beam.yaml.main --yaml_pipeline_file iceberg_migration.yaml \
+  --runner $RUNNER \
+  --jinja_variables '{ "WAREHOUSE":"'$WAREHOUSE'", "PROJECT":"'$PROJECT'", 
"REGION":"'$REGION'" }' \
+  $DATAFLOW_COMMON_ARGS
+
+echo "Running ml_preprocessing.yaml pipeline..."
+python -m apache_beam.yaml.main --yaml_pipeline_file ml_preprocessing.yaml \
+  --runner $RUNNER \
+  --jinja_variables '{ "WAREHOUSE":"'$WAREHOUSE'", "PROJECT":"'$PROJECT'", 
"REGION":"'$REGION'", "BQ_TABLE":"'$BQ_TABLE'" }' \
+  $DATAFLOW_COMMON_ARGS \
+  --requirements_file requirements.txt
+
+echo "Running train.py..."
+python train.py --bq_table $BQ_TABLE
+
+if [ ! -f "./knn_model.pkl" ]; then
+  echo "Error: Model artifact 'knn_model.pkl' not found. Ensure model training 
is successful."
+  exit 1
+fi
+
+echo "Uploading trained model to GCS..."
+gcloud storage cp "./knn_model.pkl" "$WAREHOUSE/knn_model.pkl"
+
+echo "Running anomaly_scoring.yaml pipeline..."
+python -m apache_beam.yaml.main --yaml_pipeline_file anomaly_scoring.yaml \
+  --runner $RUNNER \
+  --jinja_variables '{ "WAREHOUSE":"'$WAREHOUSE'", "PROJECT":"'$PROJECT'", 
"REGION":"'$REGION'", "BQ_TABLE":"'$BQ_TABLE'" }' \
+  $DATAFLOW_COMMON_ARGS \
+  --requirements_file requirements.txt
+
+echo "All steps completed."
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/iceberg_migration.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/iceberg_migration.yaml
new file mode 100644
index 00000000000..8d0518c3635
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/iceberg_migration.yaml
@@ -0,0 +1,45 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads structured logs from a CSV file and writes them to an
+# Iceberg table on GCS with BigLake metastore for catalog.
+
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromCsv
+      name: ReadLogs
+      config:
+        path: "{{ WAREHOUSE }}/Hadoop_2k.log_structured.csv"
+
+    - type: WriteToIceberg
+      name: WriteToIceberg
+      config:
+        table: "logs_dataset.logs_hdfs"
+        catalog_name: "rest_catalog"
+        catalog_properties:
+          warehouse: "{{ WAREHOUSE }}"
+          catalog-impl: 
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
+          io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
+          gcp_project: "{{ PROJECT }}"
+          gcp_location: "{{ REGION }}"
+
+# Expected:
+#  Row(LineId=1, Date='2024-10-01', Time='12:00:00', Level='INFO', 
Process='Main', Component='ComponentA', Content='System started successfully')
+#  Row(LineId=2, Date='2024-10-01', Time='12:00:05', Level='WARN', 
Process='Main', Component='ComponentA', Content='Memory usage is high')
+#  Row(LineId=3, Date='2024-10-01', Time='12:00:10', Level='ERROR', 
Process='Main', Component='ComponentA', Content='Task failed due to timeout')
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/ml_preprocessing.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/ml_preprocessing.yaml
new file mode 100644
index 00000000000..c83eb19e648
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/ml_preprocessing.yaml
@@ -0,0 +1,124 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline reads structured logs from an Iceberg table, applied
+# ML-specific transformations before writing them to a BigQuery table.
+
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromIceberg
+      name: ReadFromIceberg
+      config:
+        table: "logs_dataset.logs_hdfs"
+        catalog_name: "rest_catalog"
+        catalog_properties:
+          warehouse: "{{ WAREHOUSE }}"
+          catalog-impl: 
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
+          io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
+          gcp_project: "{{ PROJECT }}"
+          gcp_location: "{{ REGION }}"
+
+    - type: MapToFields
+      name: MapToFields
+      config:
+        language: python
+        append: true
+        fields:
+          embedding:
+            callable: |
+              def fn(row):
+                line = [row.Date, row.Time, row.Level, row.Process,
+                    row.Component, row.Content]
+                return " ".join(line)
+
+    - type: MLTransform
+      name: Embedding
+      config:
+        write_artifact_location: "./beam-ml-artifacts"
+        transforms:
+          - type: SentenceTransformerEmbeddings
+            config: { model_name: all-MiniLM-L6-v2, columns: [ embedding ] }
+
+    - type: MapToFields
+      name: SchemaMapping
+      config:
+        language: python
+        fields:
+          id:
+            callable: "lambda row: row.LineId"
+            output_type: integer
+          date:
+            callable: "lambda row: row.Date"
+            output_type: string
+          time:
+            callable: "lambda row: row.Time"
+            output_type: string
+          level:
+            callable: "lambda row: row.Level"
+            output_type: string
+          process:
+            callable: "lambda row: row.Process"
+            output_type: string
+          component:
+            callable: "lambda row: row.Component"
+            output_type: string
+          content:
+            callable: "lambda row: row.Content"
+            output_type: string
+          embedding:
+            callable: "lambda row: row.embedding"
+            output_type:
+              type: array
+              items:
+                type: number
+
+    - type: MapToFields
+      name: Normalize
+      config:
+        language: python
+        append: true
+        drop: [embedding]
+        fields:
+          embedding:
+            callable: |
+              import numpy as np
+              
+              def normalize(row):
+                  embedding = row.embedding
+                  norm = np.linalg.norm(embedding)
+                  return embedding / norm
+            output_type:
+              type: array
+              items:
+                type: number
+
+    - type: WriteToBigQuery
+      name: WriteToBigQuery
+      config:
+        table: "{{ BQ_TABLE }}"
+        write_disposition: "WRITE_TRUNCATE"
+        create_disposition: "CREATE_IF_NEEDED"
+
+options:
+  yaml_experimental_features: [ 'ML' ]
+
+# Expected:
+#  Row(id=1, date='2024-10-01', time='12:00:00', level='INFO', process='Main', 
component='ComponentA', content='System started successfully', 
embedding=[0.13483997249264842, 0.26967994498529685, 0.40451991747794525, 
0.5393598899705937, 0.674199862463242])
+#  Row(id=2, date='2024-10-01', time='12:00:05', level='WARN', process='Main', 
component='ComponentA', content='Memory usage is high', 
embedding=[0.13483997249264842, 0.26967994498529685, 0.40451991747794525, 
0.5393598899705937, 0.674199862463242])
+#  Row(id=3, date='2024-10-01', time='12:00:10', level='ERROR', 
process='Main', component='ComponentA', content='Task failed due to timeout', 
embedding=[0.13483997249264842, 0.26967994498529685, 0.40451991747794525, 
0.5393598899705937, 0.674199862463242])
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/requirements.txt
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/requirements.txt
new file mode 100644
index 00000000000..2a06309044e
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/requirements.txt
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Additional dependencies needed for the pipelines in the log analysis
+# ML workflow.
+# These can be installed on local machine to run the pipelines with local
+# runner, or submitted as part of Dataflow job for the workers' execution
+# environment.
+sentence-transformers~=5.0.0
+pyod~=2.0.5
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/train.py 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/train.py
new file mode 100644
index 00000000000..f0f957aa7ba
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/log_analysis/train.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A script that trains a k-Nearest Neighbors (KNN) model on vector embeddings
+data queried from BigQuery.
+"""
+
+import argparse
+import logging
+import pickle
+
+import numpy as np
+from google.cloud import bigquery
+
+from pyod.models.knn import KNN
+
+
+def parse_arguments():
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bq_table',
+      required=True,
+      help='BigQuery fully qualified table name that contains vector '
+      'embeddings for training, '
+      'specified as `YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE`.')
+
+  return parser.parse_known_args()
+
+
+class ModelHelper():
+  def __init__(self):
+    args, _ = parse_arguments()
+    self.n_neighbors = 8
+    self.method = 'largest'
+    self.metric = 'euclidean'
+    self.contamination = 0.1
+    self.bq_table = args.bq_table
+    self.dataset = None
+
+  def load_data(self):
+    logging.info("Querying vector embeddings from BigQuery...")
+
+    client = bigquery.Client()
+    sql = f"""
+      SELECT *
+      FROM `{self.bq_table}`
+      """
+    df = client.query_and_wait(sql).to_dataframe()
+    self.dataset = np.stack(df['embedding'].to_numpy())
+
+  def train_model(self):
+    logging.info("Training KNN model...")
+
+    model = KNN(
+        n_neighbors=self.n_neighbors,
+        method=self.method,
+        metric=self.metric,
+        contamination=self.contamination,
+    )
+    model.fit(self.dataset)
+
+    logging.info("KNN model trained successfully! Saving model...")
+
+    model_pickled_filename = 'knn_model.pkl'
+    with open(model_pickled_filename, 'wb') as f:
+      pickle.dump(model, f)
+
+
+if __name__ == "__main__":
+  logging.getLogger().setLevel(logging.INFO)
+
+  helper = ModelHelper()
+  helper.load_data()
+  helper.train_model()

Reply via email to