This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7620a935280 Fix #37736: Allow composite transforms to use implicit 
input chaining (#37861)
7620a935280 is described below

commit 7620a935280b501eac4e82386beb0ce94926ee73
Author: liferoad <[email protected]>
AuthorDate: Sat Mar 21 09:09:28 2026 -0400

    Fix #37736: Allow composite transforms to use implicit input chaining 
(#37861)
    
    * Fix #37736: Allow composite transforms to use implicit input chaining
    
    When a composite transform has no explicit inputs/outputs on its
    sub-transforms, automatically chain them similar to how 'chain' type
    transforms work.
    
    Added test_composite_implicit_input_chaining to verify the fix.
    
    * Fix line-too-long lint error in yaml_transform.py
    
    * Fix yapf formatting in yaml_transform.py
    
    * Fix composite implicit input chaining logic
    
    * Fix composite implicit input chaining - delete empty input from spec
    
    * Fix composite implicit input - reference pipeline input directly
    
    * Fix composite implicit input - reference pipeline input
    
    * Revert "Fix composite implicit input - reference pipeline input"
    
    This reverts commit ceb0ac179750948bf96038e5661cb78d1f28989d.
    
    * Fix composite implicit input chaining from pipeline input
    
    This fix addresses the issue where composite transforms with no explicit
    input specification were failing to receive inputs from the pipeline.
    
    Key changes:
    1. Fixed has_explicit_io check to use is_empty() instead of just checking
       key presence - this properly treats {} as 'no explicit input'
    2. Added composite_has_input check to only do implicit chaining when
       the composite has an input to chain from
    3. Fixed inner_scope_inputs computation to use parent scope's inputs
       when the composite has no explicit input
    4. Fixed output handling to use is_empty() check (normalization sets {})
    5. Fixed final return to correctly resolve scope inputs vs transform outputs
    
    * Apply yapf formatting and fix pylint line length issues
    
    * Address review: revert unnecessary variable assignment
---
 sdks/python/apache_beam/yaml/yaml_transform.py     | 88 +++++++++++++++++++---
 .../python/apache_beam/yaml/yaml_transform_test.py | 20 +++++
 2 files changed, 99 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py 
b/sdks/python/apache_beam/yaml/yaml_transform.py
index ef065d8a3c4..2b745babad0 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -796,12 +796,72 @@ def _enforce_schema(pcoll, label, error_handling_spec, 
clean_schema):
 def expand_composite_transform(spec, scope):
   spec = normalize_inputs_outputs(normalize_source_sink(spec))
 
+  original_transforms = spec['transforms']
+  # Check if any transform has a NON-EMPTY explicit input or output.
+  # Note: {} (empty dict) means "no explicit input specified" and should
+  # NOT count as having explicit io.
+  # However, if the composite has no input, we can't do implicit chaining.
+  has_explicit_io = any(
+      io is not None and not is_empty(t.get(io, {}))
+      for t in original_transforms for io in ('input', 'output'))
+
+  # If the composite has no input, we can't do implicit chaining
+  composite_has_input = not is_empty(spec.get('input', {}))
+
+  # Only do implicit chaining if:
+  # 1. No transform has explicit io, AND
+  # 2. The composite has an input to chain from
+  if not has_explicit_io and composite_has_input:
+    new_transforms = []
+    for ix, transform in enumerate(original_transforms):
+      transform = dict(transform)
+      if ix == 0:
+        composite_input = spec.get('input', {})
+        if is_explicitly_empty(composite_input):
+          transform['input'] = composite_input
+        elif is_empty(composite_input):
+          # No explicit input - the composite input IS the pipeline input.
+          # Reference the 'input' key from the Scope's inputs.
+          transform['input'] = 'input'
+        else:
+          transform['input'] = {key: key for key in composite_input.keys()}
+      else:
+        transform['input'] = new_transforms[-1]['__uuid__']
+      new_transforms.append(transform)
+
+    if new_transforms:
+      spec = dict(spec, transforms=new_transforms)
+      # Check if output is empty, not just present (normalization sets it to 
{})
+      if is_empty(spec.get('output', {})):
+        spec['output'] = {
+            '__implicit_outputs__': new_transforms[-1]['__uuid__']
+        }
+
+  # Compute the inputs for the inner scope.
+  # If the composite has an empty input dict ({}), it means the composite
+  # should use the parent scope's inputs directly.
+  composite_input = spec.get('input', {})
+
+  if is_empty(composite_input):
+    # No explicit input - use the parent scope's inputs directly
+    inner_scope_inputs = dict(scope._inputs)
+  else:
+    # The composite has explicit input references
+    # They can reference either:
+    # 1. A parent scope input (e.g., 'input' key in scope._inputs)
+    # 2. A transform output (e.g., 'uuid' -> the output of a transform)
+    inner_scope_inputs = {}
+    for key, value in composite_input.items():
+      if isinstance(value, str) and value in scope._inputs:
+        # Reference to a parent scope input
+        inner_scope_inputs[key] = scope._inputs[value]
+      else:
+        # Reference to a transform output
+        inner_scope_inputs[key] = scope.get_pcollection(value)
+
   inner_scope = Scope(
       scope.root,
-      {
-          key: scope.get_pcollection(value)
-          for (key, value) in empty_if_explicitly_empty(spec['input']).items()
-      },
+      inner_scope_inputs,
       spec['transforms'],
       # TODO(robertwb): Are scoped providers ever used? Worth supporting?
       yaml_provider.merge_providers(
@@ -814,7 +874,8 @@ def expand_composite_transform(spec, scope):
     def expand(inputs):
       inner_scope.compute_all()
       if '__implicit_outputs__' in spec['output']:
-        return inner_scope.get_outputs(spec['output']['__implicit_outputs__'])
+        result = 
inner_scope.get_outputs(spec['output']['__implicit_outputs__'])
+        return result
       else:
         return {
             key: inner_scope.get_pcollection(value)
@@ -826,16 +887,25 @@ def expand_composite_transform(spec, scope):
     transform = transform.with_resource_hints(
         **SafeLineLoader.strip_metadata(spec['resource_hints']))
 
+  # Always set a name for the composite to ensure proper return value
   if 'name' not in spec:
     spec['name'] = 'Composite'
   if spec['name'] is None:  # top-level pipeline, don't nest
     return transform.expand(None)
   else:
     _LOGGER.info("Expanding %s ", identify_object(spec))
-    return ({
-        key: scope.get_pcollection(value)
-        for (key, value) in empty_if_explicitly_empty(spec['input']).items()
-    } or scope.root) | scope.unique_name(spec, None) >> transform
+    # When the input references a scope input (not a transform output),
+    # we need to use the scope's inputs directly
+    input_dict = {}
+    for key, value in empty_if_explicitly_empty(spec['input']).items():
+      if isinstance(value, str) and value in scope._inputs:
+        # Reference to a scope input
+        input_dict[key] = scope._inputs[value]
+      else:
+        # Reference to a transform output
+        input_dict[key] = scope.get_pcollection(value)
+    return (input_dict or
+            scope.root) | scope.unique_name(spec, None) >> transform
 
 
 def expand_chain_transform(spec, scope):
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index 2afb5e7d8e3..a4da97f7f50 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -122,6 +122,26 @@ class YamlTransformE2ETest(unittest.TestCase):
           providers=TEST_PROVIDERS)
       assert_that(result, equal_to([1, 4, 9, 1, 8, 27]))
 
+  def test_composite_implicit_input_chaining(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      elements = p | beam.Create([1, 2, 3])
+      result = elements | YamlTransform(
+          '''
+          type: composite
+          transforms:
+            - type: PyMap
+              name: Square
+              config:
+                  fn: "lambda x: x * x"
+            - type: PyMap
+              name: Increment
+              config:
+                  fn: "lambda x: x + 1"
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([2, 5, 10]))
+
   def test_chain_with_input(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle')) as p:

Reply via email to