chamikaramj commented on code in PR #35375: URL: https://github.com/apache/beam/pull/35375#discussion_r2199386982
########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ Review Comment: Ditto regarding secret managers ########## sdks/python/apache_beam/yaml/examples/testing/examples_test.py: ########## @@ -114,14 +114,31 @@ def _fn(row): @beam.ptransform.ptransform_fn def test_kafka_read( - pcoll, + pbegin, Review Comment: This should either be a valid PCollection or a pipeline object (not pbegin). ########## sdks/python/apache_beam/yaml/examples/testing/examples_test.py: ########## @@ -782,9 +853,67 @@ def _db_io_read_test_processor( return test_spec +@YamlExamplesTestSuite.register_test_preprocessor( Review Comment: Is it possible to add these as tests embedded in the YAML file under a "tests:" section instead of being implemented in Python ? @derrickaw might know more. ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" + pattern = re.compile(fr"[^{allowed}]") + return pattern.sub("", row.comment_text).strip() + likes: likes + replies: replies + + # Remove rows that have empty comment text + # after previously removing unexpected characters. + - type: Filter + name: FilterForProperComments + input: RemoveWeirdCharacters + config: + language: python + keep: + callable: | + def filter(row): + return len(row.comment_text) > 0 + + # HuggingFace's distilbert-base-uncased is used for inference, + # which accepts string with a maximum limit of 250 tokens. + # Some of the comment strings can be large and are well over + # this limit after tokenization. + # This transform truncates the comment string and ensure + # every comment satisfy the maximum token limit. + - type: MapToFields + name: Truncating + input: FilterForProperComments + config: + language: python + dependencies: + - 'transformers>=4.48.0,<4.49.0' + fields: + video_id: video_id + comment_text: + callable: | + from transformers import AutoTokenizer + + tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) + + def truncate_sentence(row): + tokens = tokenizer.tokenize(row.comment_text) + if len(tokens) >= 250: + tokens = tokens[:250] Review Comment: Is there a better approach than just dropping extra tokens here ? @damccorm may have input. ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" + pattern = re.compile(fr"[^{allowed}]") + return pattern.sub("", row.comment_text).strip() + likes: likes + replies: replies + + # Remove rows that have empty comment text + # after previously removing unexpected characters. + - type: Filter + name: FilterForProperComments + input: RemoveWeirdCharacters + config: + language: python + keep: + callable: | + def filter(row): + return len(row.comment_text) > 0 + + # HuggingFace's distilbert-base-uncased is used for inference, + # which accepts string with a maximum limit of 250 tokens. + # Some of the comment strings can be large and are well over + # this limit after tokenization. + # This transform truncates the comment string and ensure + # every comment satisfy the maximum token limit. + - type: MapToFields + name: Truncating + input: FilterForProperComments + config: + language: python + dependencies: + - 'transformers>=4.48.0,<4.49.0' + fields: + video_id: video_id + comment_text: + callable: | + from transformers import AutoTokenizer + + tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) + + def truncate_sentence(row): + tokens = tokenizer.tokenize(row.comment_text) + if len(tokens) >= 250: + tokens = tokens[:250] + truncated_sentence = tokenizer.convert_tokens_to_string(tokens) + else: + truncated_sentence = row.comment_text + + return truncated_sentence + likes: likes + replies: replies + + # HuggingFace's distilbert-base-uncased does not distinguish + # between 'english' and 'English'. + # This pipeline makes the same point by converting all words + # into lowercase. + - type: MapToFields + name: LowerCase + input: Truncating + config: + language: python + fields: + video_id: video_id + comment_text: "comment_text.lower()" + likes: likes + replies: replies + + # With VertexAIModelHandlerJSON model handler, + # RunInference transform performs remote inferences by + # sending POST requests to the Vertex AI endpoint that + # our distilbert-base-uncased model is being deployed to. + - type: RunInference + name: DistilBERTRemoteInference + input: LowerCase + config: + inference_tag: "inference" + model_handler: + type: "VertexAIModelHandlerJSON" + config: + endpoint_id: "{{ ENDPOINT }}" + project: "{{ PROJECT }}" + location: "{{ LOCATION }}" + preprocess: + callable: 'lambda x: x.comment_text' + + # Parse inference results output + - type: MapToFields + name: FormatInferenceOutput + input: DistilBERTRemoteInference + config: + language: python + fields: + video_id: + expression: video_id + output_type: string + comment_text: + callable: "lambda x: x.comment_text" + output_type: string + label: + callable: "lambda x: x.inference.inference[0]['label']" Review Comment: Does the inference list always return one index ? ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" + pattern = re.compile(fr"[^{allowed}]") + return pattern.sub("", row.comment_text).strip() + likes: likes + replies: replies + + # Remove rows that have empty comment text + # after previously removing unexpected characters. + - type: Filter + name: FilterForProperComments + input: RemoveWeirdCharacters + config: + language: python + keep: + callable: | + def filter(row): + return len(row.comment_text) > 0 + + # HuggingFace's distilbert-base-uncased is used for inference, + # which accepts string with a maximum limit of 250 tokens. + # Some of the comment strings can be large and are well over + # this limit after tokenization. + # This transform truncates the comment string and ensure + # every comment satisfy the maximum token limit. + - type: MapToFields + name: Truncating + input: FilterForProperComments + config: + language: python + dependencies: + - 'transformers>=4.48.0,<4.49.0' + fields: + video_id: video_id + comment_text: + callable: | + from transformers import AutoTokenizer + + tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) + + def truncate_sentence(row): + tokens = tokenizer.tokenize(row.comment_text) + if len(tokens) >= 250: + tokens = tokens[:250] + truncated_sentence = tokenizer.convert_tokens_to_string(tokens) + else: + truncated_sentence = row.comment_text + + return truncated_sentence + likes: likes + replies: replies + + # HuggingFace's distilbert-base-uncased does not distinguish + # between 'english' and 'English'. Review Comment: "does not distinguish between upper and lower case tokens" ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" + pattern = re.compile(fr"[^{allowed}]") + return pattern.sub("", row.comment_text).strip() + likes: likes + replies: replies + + # Remove rows that have empty comment text Review Comment: Ditto. This might be too restrictive and may drop valid data. ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: Review Comment: So we are filtering out records that has None or any field ? Can we include such records with default values for missing fields or some representation of None ? ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters Review Comment: May users of the pipeline expect to see such characters ? Can we preserve using a different character encoding ? ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket Review Comment: Can we make this part pluggable so that someone who has a true Kafka topic with valid values we read here can just use that to execute a true streaming pipeline ? ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" Review Comment: This might be too restrictive for someone using such a pipeline in practice. ########## sdks/python/apache_beam/yaml/examples/README.md: ########## @@ -231,8 +231,9 @@ gcloud dataflow yaml run $JOB_NAME \ ### ML -These examples leverage the built-in `Enrichment` transform for performing -ML enrichments. +These examples include the built-in `Enrichment` transform for performing Review Comment: Probably better to list the examples here since we'll add more. ########## sdks/python/apache_beam/yaml/examples/testing/examples_test.py: ########## @@ -114,14 +114,31 @@ def _fn(row): @beam.ptransform.ptransform_fn def test_kafka_read( - pcoll, + pbegin, format, topic, bootstrap_servers, auto_offset_reset_config, consumer_config): + """ + This PTransform simulates the behavior of the ReadFromKafka transform + with the RAW format by simply using some fixed sample text data and + encode it to raw bytes. + + Args: + pbegin: The input PCollection. + format: Review Comment: Pls add descriptions for other fields. ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ Review Comment: We should update this to use secret managers [1] when it's available. @damccorm is this at a state so that we can try (or is there an ETA) ? [1] https://docs.google.com/document/d/1Ng00Kw-vnG9kKd3RteC6Q5_YN8yDtXRDHkn570GPQrY/edit?tab=t.0#heading=h.c0uts5ftkk58 ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" + pattern = re.compile(fr"[^{allowed}]") + return pattern.sub("", row.comment_text).strip() + likes: likes + replies: replies + + # Remove rows that have empty comment text + # after previously removing unexpected characters. + - type: Filter + name: FilterForProperComments + input: RemoveWeirdCharacters + config: + language: python + keep: + callable: | + def filter(row): + return len(row.comment_text) > 0 + + # HuggingFace's distilbert-base-uncased is used for inference, + # which accepts string with a maximum limit of 250 tokens. + # Some of the comment strings can be large and are well over + # this limit after tokenization. + # This transform truncates the comment string and ensure + # every comment satisfy the maximum token limit. + - type: MapToFields + name: Truncating + input: FilterForProperComments + config: + language: python + dependencies: + - 'transformers>=4.48.0,<4.49.0' + fields: + video_id: video_id + comment_text: + callable: | + from transformers import AutoTokenizer + + tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) + + def truncate_sentence(row): + tokens = tokenizer.tokenize(row.comment_text) + if len(tokens) >= 250: + tokens = tokens[:250] + truncated_sentence = tokenizer.convert_tokens_to_string(tokens) + else: + truncated_sentence = row.comment_text + + return truncated_sentence + likes: likes + replies: replies + + # HuggingFace's distilbert-base-uncased does not distinguish + # between 'english' and 'English'. + # This pipeline makes the same point by converting all words + # into lowercase. + - type: MapToFields + name: LowerCase + input: Truncating + config: + language: python + fields: + video_id: video_id + comment_text: "comment_text.lower()" + likes: likes + replies: replies + + # With VertexAIModelHandlerJSON model handler, + # RunInference transform performs remote inferences by + # sending POST requests to the Vertex AI endpoint that + # our distilbert-base-uncased model is being deployed to. + - type: RunInference + name: DistilBERTRemoteInference + input: LowerCase + config: + inference_tag: "inference" + model_handler: + type: "VertexAIModelHandlerJSON" + config: + endpoint_id: "{{ ENDPOINT }}" + project: "{{ PROJECT }}" + location: "{{ LOCATION }}" + preprocess: + callable: 'lambda x: x.comment_text' + + # Parse inference results output + - type: MapToFields + name: FormatInferenceOutput + input: DistilBERTRemoteInference + config: + language: python + fields: + video_id: + expression: video_id + output_type: string + comment_text: + callable: "lambda x: x.comment_text" + output_type: string + label: + callable: "lambda x: x.inference.inference[0]['label']" + output_type: string + score: + callable: "lambda x: x.inference.inference[0]['score']" + output_type: number + likes: + expression: likes + output_type: integer + replies: + expression: replies + output_type: integer + + # Assign windows to each element of the unbounded PCollection. + - type: WindowInto + name: Windowing + input: FormatInferenceOutput + config: + windowing: + type: fixed + size: 30s + + # Write all inference results to a BigQuery table. + - type: WriteToBigQuery + name: WriteInferenceResultsToBQ + input: Windowing + config: + table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}" + create_disposition: CREATE_IF_NEEDED + write_disposition: WRITE_APPEND + +options: + yaml_experimental_features: ML + +# Expected: +# Row(video_id='XpVt6Z1Gjjo', comment_text='I AM HAPPY', likes=1, replies=1) +# Row(video_id='XpVt6Z1Gjjo', comment_text='I AM SAD', likes=1, replies=1) Review Comment: Pls delete commented values and probably move to actual expected values under a "tests:" section. ########## sdks/python/apache_beam/yaml/examples/README.md: ########## @@ -231,8 +231,9 @@ gcloud dataflow yaml run $JOB_NAME \ ### ML -These examples leverage the built-in `Enrichment` transform for performing -ML enrichments. +These examples include the built-in `Enrichment` transform for performing Review Comment: Probably better to list the examples here since we'll add more. ########## sdks/python/apache_beam/yaml/examples/testing/examples_test.py: ########## @@ -145,10 +162,57 @@ def test_pubsub_read( | beam.Map(lambda element: beam.Row(**element))) +@beam.ptransform.ptransform_fn Review Comment: Is it possible to test with a mock model hander instead of completely mocking the transform here ? cc: @damccorm ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" + pattern = re.compile(fr"[^{allowed}]") + return pattern.sub("", row.comment_text).strip() + likes: likes + replies: replies + + # Remove rows that have empty comment text + # after previously removing unexpected characters. + - type: Filter + name: FilterForProperComments + input: RemoveWeirdCharacters + config: + language: python + keep: + callable: | + def filter(row): + return len(row.comment_text) > 0 + + # HuggingFace's distilbert-base-uncased is used for inference, + # which accepts string with a maximum limit of 250 tokens. + # Some of the comment strings can be large and are well over + # this limit after tokenization. + # This transform truncates the comment string and ensure + # every comment satisfy the maximum token limit. + - type: MapToFields + name: Truncating + input: FilterForProperComments + config: + language: python + dependencies: + - 'transformers>=4.48.0,<4.49.0' + fields: + video_id: video_id + comment_text: + callable: | + from transformers import AutoTokenizer + + tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) + + def truncate_sentence(row): + tokens = tokenizer.tokenize(row.comment_text) + if len(tokens) >= 250: + tokens = tokens[:250] + truncated_sentence = tokenizer.convert_tokens_to_string(tokens) + else: + truncated_sentence = row.comment_text + + return truncated_sentence + likes: likes + replies: replies + + # HuggingFace's distilbert-base-uncased does not distinguish + # between 'english' and 'English'. + # This pipeline makes the same point by converting all words + # into lowercase. + - type: MapToFields + name: LowerCase + input: Truncating + config: + language: python + fields: + video_id: video_id + comment_text: "comment_text.lower()" + likes: likes + replies: replies + + # With VertexAIModelHandlerJSON model handler, + # RunInference transform performs remote inferences by + # sending POST requests to the Vertex AI endpoint that + # our distilbert-base-uncased model is being deployed to. + - type: RunInference + name: DistilBERTRemoteInference + input: LowerCase + config: + inference_tag: "inference" + model_handler: + type: "VertexAIModelHandlerJSON" + config: + endpoint_id: "{{ ENDPOINT }}" + project: "{{ PROJECT }}" + location: "{{ LOCATION }}" + preprocess: + callable: 'lambda x: x.comment_text' + + # Parse inference results output + - type: MapToFields + name: FormatInferenceOutput + input: DistilBERTRemoteInference + config: + language: python + fields: + video_id: + expression: video_id + output_type: string + comment_text: + callable: "lambda x: x.comment_text" + output_type: string + label: + callable: "lambda x: x.inference.inference[0]['label']" + output_type: string + score: + callable: "lambda x: x.inference.inference[0]['score']" + output_type: number + likes: + expression: likes + output_type: integer + replies: + expression: replies + output_type: integer + + # Assign windows to each element of the unbounded PCollection. + - type: WindowInto + name: Windowing + input: FormatInferenceOutput + config: + windowing: + type: fixed + size: 30s + + # Write all inference results to a BigQuery table. + - type: WriteToBigQuery + name: WriteInferenceResultsToBQ + input: Windowing + config: + table: "{{ PROJECT }}.{{ DATASET }}.{{ TABLE }}" + create_disposition: CREATE_IF_NEEDED + write_disposition: WRITE_APPEND + +options: + yaml_experimental_features: ML + +# Expected: Review Comment: Can we add some YAML tests by mocking the Kafka, GCS transforms ? RunInference also can be mocked or changed to use some local model handler. ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/README.md: ########## @@ -0,0 +1,95 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Inference Examples Catalog + +<!-- TOC --> + +* [Inference Examples Catalog](#examples-catalog) + * [Streaming Sentiment Analysis](#streaming-sentiment-analysis) + +<!-- TOC --> + +## Streaming Sentiment Analysis + +The example leverages the `RunInference` transform with Vertex AI +model handler [VertexAIModelHandlerJSON]( +https://beam.apache.org/releases/pydoc/current/apache_beam.yaml.yaml_ml#apache_beam.yaml.yaml_ml.VertexAIModelHandlerJSONProvider), +in addition to Kafka IO to demonstrate an end-to-end example of a +streaming sentiment analysis pipeline. The dataset to perform +sentiment analysis on is the YouTube video comments and can be found +on Kaggle [here]( +https://www.kaggle.com/datasets/datasnaek/youtube?select=UScomments.csv). + +Download the dataset and copy over to a GCS bucket: +```sh +gcloud storage cp /path/to/UScomments.csv gs://YOUR_BUCKET/UScomments.csv +``` + +For setting up Kafka, an option is to use [Click to Deploy]( Review Comment: Might be good to introduce a script that sets up resources for executing the pipeline if possible. Beam already has tests that startup a Docker-based Kafka in GCP and create BQ datasets. ########## sdks/python/apache_beam/yaml/examples/transforms/ml/inference/streaming_sentiment_analysis.yaml: ########## @@ -0,0 +1,257 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The pipeline first reads the YouTube comments .csv dataset from GCS bucket +# and performs necessary clean-up before writing it to a Kafka topic. +# The pipeline then reads from that Kafka topic and applies various transformation +# logic before RunInference transform performs remote inference with the Vertex AI +# model handler. +# The inference result is then written to a BigQuery table. + +pipeline: + transforms: + # The YouTube comments dataset contains rows that + # have unexpected schema (e.g. rows with more fields, + # rows with fields that contain string instead of + # integer, etc...). PyTransform helps construct + # the logic to properly read in the csv dataset as + # a schema'd PCollection. + - type: PyTransform + name: ReadFromGCS + input: {} + config: + constructor: __callable__ + kwargs: + source: | + def ReadYoutubeCommentsCsv(pcoll, file_pattern): + def _to_int(x): + try: + return int(x) + except (ValueError): + return None + + return ( + pcoll + | beam.io.ReadFromCsv( + file_pattern, + names=['video_id', 'comment_text', 'likes', 'replies'], + on_bad_lines='skip', + converters={'likes': _to_int, 'replies': _to_int}) + | beam.Filter(lambda row: + None not in list(row._asdict().values())) + | beam.Map(lambda row: beam.Row( + video_id=row.video_id, + comment_text=row.comment_text, + likes=int(row.likes), + replies=int(row.replies))) + ) + file_pattern: "{{ GCS_PATH }}" + + # Send the rows as Kafka records to an existing + # Kafka topic. + - type: WriteToKafka + name: SendRecordsToKafka + input: ReadFromGCS + config: + format: "JSON" + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + producer_config_updates: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Read Kafka records from an existing Kafka topic. + - type: ReadFromKafka + name: ReadFromMyTopic + config: + format: "JSON" + schema: | + { + "type": "object", + "properties": { + "video_id": { "type": "string" }, + "comment_text": { "type": "string" }, + "likes": { "type": "integer" }, + "replies": { "type": "integer" } + } + } + topic: "{{ TOPIC }}" + bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" + auto_offset_reset_config: earliest + consumer_config: + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ + username={{ USERNAME }} \ + password={{ PASSWORD }};" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "PLAIN" + + # Remove unexpected characters from the YouTube + # comment string, e.g. emojis, ascii characters outside + # the common day-to-day English. + - type: MapToFields + name: RemoveWeirdCharacters + input: ReadFromMyTopic + config: + language: python + fields: + video_id: video_id + comment_text: + callable: | + import re + def filter(row): + # Match letters, digits, whitespace and common punctuation + allowed = r"A-Za-z0-9\s.,;:!?\'\"()\[\]{}\-_" + pattern = re.compile(fr"[^{allowed}]") + return pattern.sub("", row.comment_text).strip() + likes: likes + replies: replies + + # Remove rows that have empty comment text + # after previously removing unexpected characters. + - type: Filter + name: FilterForProperComments + input: RemoveWeirdCharacters + config: + language: python + keep: + callable: | + def filter(row): + return len(row.comment_text) > 0 + + # HuggingFace's distilbert-base-uncased is used for inference, + # which accepts string with a maximum limit of 250 tokens. + # Some of the comment strings can be large and are well over + # this limit after tokenization. + # This transform truncates the comment string and ensure + # every comment satisfy the maximum token limit. + - type: MapToFields + name: Truncating + input: FilterForProperComments + config: + language: python + dependencies: + - 'transformers>=4.48.0,<4.49.0' + fields: + video_id: video_id + comment_text: + callable: | + from transformers import AutoTokenizer + + tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", use_fast=True) + + def truncate_sentence(row): + tokens = tokenizer.tokenize(row.comment_text) + if len(tokens) >= 250: + tokens = tokens[:250] + truncated_sentence = tokenizer.convert_tokens_to_string(tokens) + else: + truncated_sentence = row.comment_text + + return truncated_sentence + likes: likes + replies: replies + + # HuggingFace's distilbert-base-uncased does not distinguish + # between 'english' and 'English'. + # This pipeline makes the same point by converting all words + # into lowercase. + - type: MapToFields + name: LowerCase + input: Truncating + config: + language: python + fields: + video_id: video_id + comment_text: "comment_text.lower()" + likes: likes + replies: replies + + # With VertexAIModelHandlerJSON model handler, + # RunInference transform performs remote inferences by + # sending POST requests to the Vertex AI endpoint that + # our distilbert-base-uncased model is being deployed to. + - type: RunInference + name: DistilBERTRemoteInference + input: LowerCase + config: + inference_tag: "inference" + model_handler: + type: "VertexAIModelHandlerJSON" + config: + endpoint_id: "{{ ENDPOINT }}" + project: "{{ PROJECT }}" + location: "{{ LOCATION }}" + preprocess: + callable: 'lambda x: x.comment_text' + + # Parse inference results output + - type: MapToFields + name: FormatInferenceOutput + input: DistilBERTRemoteInference + config: + language: python + fields: + video_id: + expression: video_id + output_type: string + comment_text: + callable: "lambda x: x.comment_text" + output_type: string + label: + callable: "lambda x: x.inference.inference[0]['label']" + output_type: string + score: + callable: "lambda x: x.inference.inference[0]['score']" Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org