This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch revert-15969-combine-type-hints
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d71203a01307008114238a8b4506e67ae24a5ad7
Author: tvalentyn <tvalen...@users.noreply.github.com>
AuthorDate: Mon Nov 29 15:19:04 2021 -0800

    Revert "Key-inferable type hints for CombinePerKey."
---
 sdks/python/apache_beam/pipeline.py                |  4 +---
 sdks/python/apache_beam/transforms/core.py         | 24 ++++++++++------------
 .../apache_beam/transforms/ptransform_test.py      | 13 ------------
 3 files changed, 12 insertions(+), 29 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index a512a18..25c02d4 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -892,9 +892,7 @@ class Pipeline(object):
                   pcoll.element_type)
             if (isinstance(output.element_type,
                            typehints.TupleHint.TupleConstraint) and
-                len(output.element_type.tuple_types) == 2 and
-                pcoll.element_type.tuple_types[0] ==
-                output.element_type.tuple_types[0]):
+                len(output.element_type.tuple_types) == 2):
               output.requires_deterministic_key_coder = (
                   deterministic_key_coders and transform_node.full_label)
         for side_input in transform_node.transform.side_inputs:
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index bd999f8..c71c00c 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2306,20 +2306,18 @@ class CombinePerKey(PTransformWithSideInputs):
         self.fn, *args, **kwargs)
 
   def default_type_hints(self):
-    result = self.fn.get_type_hints()
-    k = typehints.TypeVariable('K')
-    if result.input_types:
-      args, kwargs = result.input_types
-      args = (typehints.Tuple[k, args[0]], ) + args[1:]
-      result = result.with_input_types(*args, **kwargs)
-    else:
-      result = result.with_input_types(typehints.Tuple[k, typehints.Any])
-    if result.output_types:
-      main_output_type = result.simple_output_type('')
-      result = result.with_output_types(typehints.Tuple[k, main_output_type])
+    hints = self.fn.get_type_hints()
+    if hints.input_types:
+      K = typehints.TypeVariable('K')
+      args, kwargs = hints.input_types
+      args = (typehints.Tuple[K, args[0]], ) + args[1:]
+      hints = hints.with_input_types(*args, **kwargs)
     else:
-      result = result.with_output_types(typehints.Tuple[k, typehints.Any])
-    return result
+      K = typehints.Any
+    if hints.output_types:
+      main_output_type = hints.simple_output_type('')
+      hints = hints.with_output_types(typehints.Tuple[K, main_output_type])
+    return hints
 
   def to_runner_api_parameter(
       self,
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 6a139eb..ac4f632 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1839,19 +1839,6 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     assert_that(d, equal_to([6]))
     self.p.run()
 
-  def test_combine_properly_pipeline_type_checks_without_decorator(self):
-    def sum_ints(ints):
-      return sum(ints)
-
-    d = (
-        self.p
-        | beam.Create([1, 2, 3])
-        | beam.Map(lambda x: ('key', x))
-        | beam.CombinePerKey(sum_ints))
-
-    self.assertEqual(typehints.Tuple[str, typehints.Any], d.element_type)
-    self.p.run()
-
   def test_combine_func_type_hint_does_not_take_iterable_using_decorator(self):
     @with_output_types(int)
     @with_input_types(a=int)

Reply via email to