This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3415378fc13 [YAML] A Streaming Inference Pipeline - YouTube Comments 
Sentiment Analysis (#35375)
3415378fc13 is described below

commit 3415378fc134eb23bf714083815c041fce8db79f
Author: Charles Nguyen <[email protected]>
AuthorDate: Tue Jul 22 18:31:48 2025 -0400

    [YAML] A Streaming Inference Pipeline - YouTube Comments Sentiment Analysis 
(#35375)
    
    * Add YAML streaming sentiment analysis pipeline
    
    * WIP YAML example for streaming sentiment analysis pipeline
    
    * Clean up
    
    * Clean up
    
    * Clean up
    
    * Add comments and update README.md
    
    * Fix rebase
    
    * Fix test and lint
    
    * Fix test
    
    * Fix CI/CD
    
    * Address comments
    
    * Address comments and fix CI/CD
    
    * Fix CI/CD
---
 sdks/python/apache_beam/yaml/examples/README.md    |   9 +-
 .../yaml/examples/testing/examples_test.py         | 142 +++++++++++-
 .../yaml/examples/testing/input_data.py            |   9 +
 .../transforms/ml/sentiment_analysis/README.md     |  93 ++++++++
 .../streaming_sentiment_analysis.yaml              | 257 +++++++++++++++++++++
 5 files changed, 501 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/examples/README.md 
b/sdks/python/apache_beam/yaml/examples/README.md
index 70533655a6a..8f6decb6bb4 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -231,8 +231,13 @@ gcloud dataflow yaml run $JOB_NAME \
 
 ### ML
 
-These examples leverage the built-in `Enrichment` transform for performing
-ML enrichments.
+Examples that include the built-in `Enrichment` transform for performing
+ML enrichments:
+- [bigquery_enrichment.yaml](transforms/ml/enrichment/bigquery_enrichment.yaml)
+- [spanner_enrichment.yaml](transforms/ml/enrichment/spanner_enrichment.yaml)
+
+Examples that include the `RunInference` transform for ML inference:
+- 
[streaming_sentiment_analysis.yaml](transforms/ml/inference/streaming_sentiment_analysis.yaml)
 
 More information can be found about aggregation transforms
 [here](https://beam.apache.org/documentation/sdks/yaml-combine/).
diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py 
b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
index 3b46a9dda5d..205480b418a 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py
@@ -37,8 +37,10 @@ import yaml
 import apache_beam as beam
 from apache_beam import PCollection
 from apache_beam.examples.snippets.util import assert_matches_stdout
+from apache_beam.ml.inference.base import PredictionResult
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.typehints.row_type import RowTypeConstraint
 from apache_beam.utils import subprocess_server
 from apache_beam.yaml import yaml_provider
 from apache_beam.yaml import yaml_transform
@@ -120,6 +122,23 @@ def test_kafka_read(
     bootstrap_servers,
     auto_offset_reset_config,
     consumer_config):
+  """
+  This PTransform simulates the behavior of the ReadFromKafka transform
+  with the RAW format by simply using some fixed sample text data and
+  encode it to raw bytes.
+
+  Args:
+    pcoll: The input PCollection.
+    format: The format of the Kafka messages (e.g., 'RAW').
+    topic: The name of Kafka topic to read from.
+    bootstrap_servers: A list of Kafka bootstrap servers to connect to.
+    auto_offset_reset_config: A configuration for the auto offset reset
+    consumer_config: A dictionary containing additional consumer configurations
+
+  Returns:
+    A PCollection containing the sample text data in bytes.
+  """
+
   return (
       pcoll | beam.Create(input_data.text_data().split('\n'))
       | beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))
@@ -127,7 +146,7 @@ def test_kafka_read(
 
 @beam.ptransform.ptransform_fn
 def test_pubsub_read(
-    pbegin,
+    pcoll,
     topic: Optional[str] = None,
     subscription: Optional[str] = None,
     format: Optional[str] = None,
@@ -140,15 +159,58 @@ def test_pubsub_read(
   pubsub_messages = input_data.pubsub_messages_data()
 
   return (
-      pbegin
+      pcoll
       | beam.Create([json.loads(msg.data) for msg in pubsub_messages])
       | beam.Map(lambda element: beam.Row(**element)))
 
 
[email protected]_fn
+def test_run_inference(pcoll, inference_tag, model_handler):
+  """
+  This PTransform simulates the behavior of the RunInference transform.
+
+  Args:
+    pcoll: The input PCollection.
+    inference_tag: The tag to use for the returned inference.
+    model_handler: A configuration for the respective ML model handler
+
+  Returns:
+    A PCollection containing the enriched data.
+  """
+  def _fn(row):
+    input = row._asdict()
+
+    row = {
+        inference_tag: PredictionResult(
+            input['comment_text'],
+            [{
+                'label': 'POSITIVE'
+                if 'happy' in input['comment_text'] else 'NEGATIVE',
+                'score': 0.95
+            }]),
+        **input
+    }
+
+    return beam.Row(**row)
+
+  user_type = RowTypeConstraint.from_user_type(pcoll.element_type.user_type)
+  user_schema_fields = [(name, type(typ) if not isinstance(typ, type) else typ)
+                        for (name,
+                             typ) in user_type._fields] if user_type else []
+  inference_output_type = RowTypeConstraint.from_fields([
+      ('example', Any), ('inference', Any), ('model_id', Optional[str])
+  ])
+  schema = RowTypeConstraint.from_fields(
+      user_schema_fields + [(str(inference_tag), inference_output_type)])
+
+  return pcoll | beam.Map(_fn).with_output_types(schema)
+
+
 TEST_PROVIDERS = {
     'TestEnrichment': test_enrichment,
     'TestReadFromKafka': test_kafka_read,
-    'TestReadFromPubSub': test_pubsub_read
+    'TestReadFromPubSub': test_pubsub_read,
+    'TestRunInference': test_run_inference
 }
 """
 Transforms not requiring inputs.
@@ -238,7 +300,12 @@ def create_test_method(
               actual += list(transform.outputs.values())
         check_output(expected)(actual)
 
-  if 'deps' in pipeline_spec_file:
+  def _python_deps_involved(spec_filename):
+    return any(
+        substr in spec_filename
+        for substr in ['deps', 'streaming_sentiment_analysis'])
+
+  if _python_deps_involved(pipeline_spec_file):
     test_yaml_example = pytest.mark.no_xdist(test_yaml_example)
     test_yaml_example = unittest.skipIf(
         sys.platform == 'win32', "Github virtualenv permissions issues.")(
@@ -457,7 +524,9 @@ def _kafka_test_preprocessor(
     'test_pubsub_to_iceberg_yaml',
     'test_oracle_to_bigquery_yaml',
     'test_mysql_to_bigquery_yaml',
-    'test_spanner_to_bigquery_yaml'
+    'test_spanner_to_bigquery_yaml',
+    'test_streaming_sentiment_analysis_yaml',
+    'test_enrich_spanner_with_bigquery_yaml'
 ])
 def _io_write_test_preprocessor(
     test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -782,9 +851,68 @@ def _db_io_read_test_processor(
   return test_spec
 
 
[email protected]_test_preprocessor(
+    'test_streaming_sentiment_analysis_yaml')
+def _streaming_sentiment_analysis_test_preprocessor(
+    test_spec: dict, expected: List[str], env: TestEnvironment):
+  """
+  Preprocessor for tests that involve the streaming sentiment analysis example.
+
+  This preprocessor replaces several IO transforms and the RunInference
+  transform.
+  This allows the test to verify the pipeline's correctness without relying on
+  external data sources and the model hosted on VertexAI.
+
+  Args:
+    test_spec: The dictionary representation of the YAML pipeline 
specification.
+    expected: A list of strings representing the expected output of the
+      pipeline.
+    env: The TestEnvironment object providing utilities for creating temporary
+      files.
+
+  Returns:
+    The modified test_spec dictionary with ... transforms replaced.
+  """
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'PyTransform' and transform.get(
+          'name', '') == 'ReadFromGCS':
+        transform['windowing'] = {'type': 'fixed', 'size': '30s'}
+
+        file_name = 'youtube-comments.csv'
+        local_path = env.input_file(file_name, INPUT_FILES[file_name])
+        transform['config']['kwargs']['file_pattern'] = local_path
+
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'ReadFromKafka':
+        config = transform['config']
+        transform['type'] = 'ReadFromCsv'
+        transform['config'] = {
+            k: v
+            for k, v in config.items() if k.startswith('__')
+        }
+        transform['config']['path'] = ""
+
+        file_name = 'youtube-comments.csv'
+        test_spec = replace_recursive(
+            test_spec,
+            transform['type'],
+            'path',
+            env.input_file(file_name, INPUT_FILES[file_name]))
+
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'RunInference':
+        transform['type'] = 'TestRunInference'
+
+  return test_spec
+
+
 INPUT_FILES = {
     'products.csv': input_data.products_csv(),
-    'kinglear.txt': input_data.text_data()
+    'kinglear.txt': input_data.text_data(),
+    'youtube-comments.csv': input_data.youtube_comments_csv()
 }
 
 INPUT_TABLES = {
@@ -819,7 +947,7 @@ IOTest = YamlExamplesTestSuite(
                                    '../transforms/io/*.yaml')).run()
 MLTest = YamlExamplesTestSuite(
     'MLExamplesTest', os.path.join(YAML_DOCS_DIR,
-                                   '../transforms/ml/*.yaml')).run()
+                                   '../transforms/ml/**/*.yaml')).run()
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py 
b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
index 0c601f67816..34a58c47923 100644
--- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py
+++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py
@@ -54,6 +54,15 @@ def products_csv():
   ])
 
 
+def youtube_comments_csv():
+  return '\n'.join([
+      'video_id,comment_text,likes,replies',
+      'XpVt6Z1Gjjo,I AM HAPPY,1,1',
+      'XpVt6Z1Gjjo,I AM SAD,1,1',
+      'XpVt6Z1Gjjo,§ÁĐ,1,1'
+  ])
+
+
 def spanner_orders_data():
   return [{
       'order_id': 1,
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
new file mode 100644
index 00000000000..ad44d433017
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/README.md
@@ -0,0 +1,93 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+## Streaming Sentiment Analysis
+
+The example leverages the `RunInference` transform with Vertex AI
+model handler [VertexAIModelHandlerJSON](
+https://beam.apache.org/releases/pydoc/current/apache_beam.yaml.yaml_ml#apache_beam.yaml.yaml_ml.VertexAIModelHandlerJSONProvider),
+in addition to Kafka IO to demonstrate an end-to-end example of a
+streaming sentiment analysis pipeline. The dataset to perform
+sentiment analysis on is the YouTube video comments and can be found
+on Kaggle [here](
+https://www.kaggle.com/datasets/datasnaek/youtube?select=UScomments.csv).
+
+Download the dataset and copy over to a GCS bucket:
+```sh
+gcloud storage cp /path/to/UScomments.csv gs://YOUR_BUCKET/UScomments.csv
+```
+
+For setting up Kafka, an option is to use [Click to Deploy](
+https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?)
+to quickly launch a Kafka cluster on GCE. See [here](
+../../../README.md#kafka) for more context around using Kafka
+with Dataflow.
+
+A hosted model on Vertex AI is needed before being able to use
+the Vertex AI model handler. One of the current state-of-the-art
+NLP models is HuggingFace's DistilBERT, a distilled version of
+BERT model and is faster at inference. To deploy DistilBERT on
+Vertex AI, run this [notebook](
+https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/model_garden/model_garden_huggingface_pytorch_inference_deployment.ipynb)
 in Colab Enterprise.
+
+BigQuery is the pipeline's sink for the inference result output.
+A BigQuery dataset needs to exist first before the pipeline can
+create/write to a table. Run the following command to create
+a BigQuery dataset:
+
+```sh
+bq --location=us-central1 mk \
+  --dataset DATASET_ID
+```
+See also [here](
+https://cloud.google.com/bigquery/docs/datasets) for more details on
+how to create BigQuery datasets
+
+The pipeline first reads the YouTube comments .csv dataset from
+GCS bucket and performs some clean-up before writing it to a Kafka
+topic. The pipeline then reads from that Kafka topic and applies
+various transformation logic before `RunInference` transform performs
+remote inference with the Vertex AI model handler and DistilBERT
+deployed to a Vertex AI endpoint. The inference result is then
+parsed and written to a BigQuery table.
+
+Run the pipeline (replace with appropriate variables in the command
+below):
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://YOUR-BUCKET/tmp"
+export REGION="us-central1"
+export JOB_NAME="streaming-sentiment-analysis-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="3"
+
+python -m apache_beam.yaml.main \
+  --yaml_pipeline_file 
transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml \
+  --runner DataflowRunner \
+  --temp_location $TEMP_LOCATION \
+  --project $PROJECT \
+  --region $REGION \
+  --num_workers $NUM_WORKERS \
+  --job_name $JOB_NAME \
+  --jinja_variables '{ "GCS_PATH": "gs://YOUR-BUCKET/USComments.csv",
+  "BOOTSTRAP_SERVERS": "BOOTSTRAP_IP_ADD:9092",
+  "TOPIC": "YOUR_TOPIC", "USERNAME": "KAFKA_USERNAME", "PASSWORD": 
"KAFKA_PASSWORD",
+  "ENDPOINT": "ENDPOINT_ID", "PROJECT": "PROJECT_ID", "LOCATION": "LOCATION",
+  "DATASET": "DATASET_ID", "TABLE": "TABLE_ID" }'
+```
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
new file mode 100644
index 00000000000..63521208daa
--- /dev/null
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml
@@ -0,0 +1,257 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The pipeline first reads the YouTube comments .csv dataset from GCS bucket
+# and performs necessary clean-up before writing it to a Kafka topic.
+# The pipeline then reads from that Kafka topic and applies various 
transformation
+# logic before RunInference transform performs remote inference with the 
Vertex AI
+# model handler.
+# The inference result is then written to a BigQuery table.
+
+pipeline:
+  transforms:
+    # The YouTube comments dataset contains rows that
+    # have unexpected schema (e.g. rows with more fields,
+    # rows with fields that contain string instead of
+    # integer, etc...). PyTransform helps construct
+    # the logic to properly read in the csv dataset as
+    # a schema'd PCollection.
+    - type: PyTransform
+      name: ReadFromGCS
+      input: {}
+      config:
+        constructor: __callable__
+        kwargs:
+          source: |
+            def ReadYoutubeCommentsCsv(pcoll, file_pattern):
+              def _to_int(x):
+                try:
+                  return int(x)
+                except (ValueError):
+                  return None
+            
+              return (
+                  pcoll 
+                  | beam.io.ReadFromCsv(
+                        file_pattern,
+                        names=['video_id', 'comment_text', 'likes', 'replies'],
+                        on_bad_lines='skip',
+                        converters={'likes': _to_int, 'replies': _to_int})
+                  | beam.Filter(lambda row: 
+                        None not in list(row._asdict().values()))
+                  | beam.Map(lambda row: beam.Row(
+                        video_id=row.video_id,
+                        comment_text=row.comment_text,
+                        likes=int(row.likes),
+                        replies=int(row.replies)))
+              )
+          file_pattern: "{{ GCS_PATH }}"
+
+    # Send the rows as Kafka records to an existing
+    # Kafka topic.
+    - type: WriteToKafka
+      name: SendRecordsToKafka
+      input: ReadFromGCS
+      config:
+        format: "JSON"
+        topic: "{{ TOPIC }}"
+        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+        producer_config_updates:
+          sasl.jaas.config: 
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+            username={{ USERNAME }} \
+            password={{ PASSWORD }};"
+          security.protocol: "SASL_PLAINTEXT"
+          sasl.mechanism: "PLAIN"
+
+    # Read Kafka records from an existing Kafka topic.
+    - type: ReadFromKafka
+      name: ReadFromMyTopic
+      config:
+        format: "JSON"
+        schema: |
+          {
+            "type": "object",
+            "properties": {
+              "video_id": { "type": "string" },
+              "comment_text": { "type": "string" },
+              "likes": { "type": "integer" },
+              "replies": { "type": "integer" }
+            }
+          }
+        topic: "{{ TOPIC }}"
+        bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
+        auto_offset_reset_config: earliest
+        consumer_config:
+          sasl.jaas.config: 
"org.apache.kafka.common.security.plain.PlainLoginModule required \
+            username={{ USERNAME }} \
+            password={{ PASSWORD }};"
+          security.protocol: "SASL_PLAINTEXT"
+          sasl.mechanism: "PLAIN"
+
+    # Remove unexpected characters from the YouTube
+    # comment string, e.g. emojis, ascii characters
+    # outside the common day-to-day English.
+    - type: MapToFields
+      name: RemoveWeirdCharacters
+      input: ReadFromMyTopic
+      config:
+        language: python
+        fields:
+          video_id: video_id
+          comment_text:
+            callable: |
+              import re
+              def filter(row):
+                # regex match and keep letters, digits, whitespace and common 
punctuations,
+                # i.e. remove non printable ASCII characters (character codes 
not in
+                # the range 32 - 126, or \x20 - \x7E).
+                return re.sub(r'[^\x20-\x7E]', '', row.comment_text).strip()
+          likes: likes
+          replies: replies
+
+    # Remove rows that have empty comment text
+    # after previously removing unexpected characters.
+    - type: Filter
+      name: FilterForProperComments
+      input: RemoveWeirdCharacters
+      config:
+        language: python
+        keep:
+          callable: |
+            def filter(row):
+              return len(row.comment_text) > 0
+
+    # HuggingFace's distilbert-base-uncased is used for inference,
+    # which accepts string with a maximum limit of 250 tokens.
+    # Some of the comment strings can be large and are well over
+    # this limit after tokenization.
+    # This transform truncates the comment string and ensure
+    # every comment satisfy the maximum token limit.
+    - type: MapToFields
+      name: Truncating
+      input: FilterForProperComments
+      config:
+        language: python
+        dependencies:
+          - 'transformers>=4.48.0,<4.49.0'
+        fields:
+          video_id: video_id
+          comment_text:
+            callable: |
+              from transformers import AutoTokenizer
+              
+              tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", 
use_fast=True)
+              
+              def truncate_sentence(row):
+                tokens = tokenizer.tokenize(row.comment_text)
+                if len(tokens) >= 250:
+                  tokens = tokens[:250]
+                  truncated_sentence = 
tokenizer.convert_tokens_to_string(tokens)
+                else:
+                  truncated_sentence = row.comment_text
+
+                return truncated_sentence
+          likes: likes
+          replies: replies
+
+    # HuggingFace's distilbert-base-uncased does not distinguish
+    # between upper and lower case tokens.
+    # This pipeline makes the same point by converting all words
+    # into lowercase.
+    - type: MapToFields
+      name: LowerCase
+      input: Truncating
+      config:
+        language: python
+        fields:
+          video_id: video_id
+          comment_text: "comment_text.lower()"
+          likes: likes
+          replies: replies
+
+    # With VertexAIModelHandlerJSON model handler,
+    # RunInference transform performs remote inferences by
+    # sending POST requests to the Vertex AI endpoint that
+    # our distilbert-base-uncased model is being deployed to.
+    - type: RunInference
+      name: DistilBERTRemoteInference
+      input: LowerCase
+      config:
+        inference_tag: "inference"
+        model_handler:
+          type: "VertexAIModelHandlerJSON"
+          config:
+            endpoint_id: "{{ ENDPOINT }}"
+            project: "{{ PROJECT }}"
+            location: "{{ LOCATION }}"
+            preprocess:
+              callable: 'lambda x: x.comment_text'
+
+    # Parse inference results output
+    - type: MapToFields
+      name: FormatInferenceOutput
+      input: DistilBERTRemoteInference
+      config:
+        language: python
+        fields:
+          video_id:
+            expression: video_id
+            output_type: string
+          comment_text:
+            callable: "lambda x: x.comment_text"
+            output_type: string
+          label:
+            callable: "lambda x: x.inference.inference[0]['label']"
+            output_type: string
+          score:
+            callable: "lambda x: x.inference.inference[0]['score']"
+            output_type: number
+          likes:
+            expression: likes
+            output_type: integer
+          replies:
+            expression: replies
+            output_type: integer
+
+    # Assign windows to each element of the unbounded PCollection.
+    - type: WindowInto
+      name: Windowing
+      input: FormatInferenceOutput
+      config:
+        windowing:
+          type: fixed
+          size: 30s
+
+    # Write all inference results to a BigQuery table.
+    - type: WriteToBigQuery
+      name: WriteInferenceResultsToBQ
+      input: Windowing
+      config:
+        table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}"
+        create_disposition: CREATE_IF_NEEDED
+        write_disposition: WRITE_APPEND
+
+options:
+  yaml_experimental_features: ML
+
+# Expected:
+#  Row(video_id='XpVt6Z1Gjjo', comment_text='I AM HAPPY', likes=1, replies=1)
+#  Row(video_id='XpVt6Z1Gjjo', comment_text='I AM SAD', likes=1, replies=1)
+#  Row(video_id='XpVt6Z1Gjjo', comment_text='§ÁĐ', likes=1, replies=1)
+#  Row(video_id='XpVt6Z1Gjjo', comment_text='i am happy', label='POSITIVE', 
score=0.95, likes=1, replies=1)
+#  Row(video_id='XpVt6Z1Gjjo', comment_text='i am sad', label='NEGATIVE', 
score=0.95, likes=1, replies=1)

Reply via email to