charlespnh commented on code in PR #35375:
URL: https://github.com/apache/beam/pull/35375#discussion_r2222725097


##########
sdks/python/apache_beam/yaml/examples/testing/examples_test.py:
##########
@@ -782,9 +853,67 @@ def _db_io_read_test_processor(
   return test_spec
 
 
+@YamlExamplesTestSuite.register_test_preprocessor(
+    'test_streaming_sentiment_analysis_yaml')
+def _streaming_sentiment_analysis_test_preprocessor(
+    test_spec: dict, expected: List[str], env: TestEnvironment):
+  """
+  Preprocessor for tests that involve the streaming sentiment analysis example.
+
+  This preprocessor replaces several IO transforms and the RunInference 
transform.
+  This allows the test to verify the pipeline's correctness without relying on
+  external data sources and the model hosted on VertexAI.
+
+  Args:
+    test_spec: The dictionary representation of the YAML pipeline 
specification.
+    expected: A list of strings representing the expected output of the
+      pipeline.
+    env: The TestEnvironment object providing utilities for creating temporary
+      files.
+
+  Returns:
+    The modified test_spec dictionary with ... transforms replaced.
+  """
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'PyTransform' and transform.get(
+          'name', '') == 'ReadFromGCS':
+        transform['windowing'] = {'type': 'fixed', 'size': '30s'}
+
+        file_name = 'youtube-comments.csv'
+        local_path = env.input_file(file_name, INPUT_FILES[file_name])
+        transform['config']['kwargs']['file_pattern'] = local_path
+
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'ReadFromKafka':
+        config = transform['config']
+        transform['type'] = 'ReadFromCsv'
+        transform['config'] = {
+            k: v
+            for k, v in config.items() if k.startswith('__')
+        }
+        transform['config']['path'] = ""
+
+        file_name = 'youtube-comments.csv'
+        test_spec = replace_recursive(
+            test_spec,
+            transform['type'],
+            'path',
+            env.input_file(file_name, INPUT_FILES[file_name]))
+
+  if pipeline := test_spec.get('pipeline', None):
+    for transform in pipeline.get('transforms', []):
+      if transform.get('type', '') == 'RunInference':
+        transform['type'] = 'TestRunInference'
+

Review Comment:
   Yeah because we're using `replace_recursive` it returns a new `test_spec` 
reference. You therefore would need to get the new `pipeline` reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to