[ https://issues.apache.org/jira/browse/BEAM-4132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510883#comment-17510883 ]
Valentyn Tymofieiev commented on BEAM-4132: ------------------------------------------- Ok, looks like the issue is reproducible only on the default output after Partitioning, but not on other outputs. Although inference there also seems suboptimal. Repro: {noformat} import apache_beam as beam PLAYER_1 = 'Team A Player 1' PLAYER_2 = 'Team B Player 2' PLAYER_3 = 'Player 3' def split_fn(kv): k, v = kv if 'Team A' in k: yield beam.pvalue.TaggedOutput('TeamA', kv) elif 'Team B' in k: yield beam.pvalue.TaggedOutput('TeamB', kv) else: yield kv with beam.Pipeline() as p: scores = p | beam.Create([(PLAYER_1, 15), (PLAYER_2, 10), (PLAYER_1, 100), (PLAYER_3, 25), (PLAYER_2, 75)]) partitioned = scores | beam.FlatMap(split_fn).with_outputs('TeamA', 'TeamB', main='UnknownTeam') team_a = partitioned['TeamA'] print("Partitioned output element type:", team_a.element_type) unknown_team = partitioned['UnknownTeam'] print("Default / main output element type:", unknown_team.element_type) team_a | "GBK passes" >> beam.GroupByKey() unknown_team | "GBK fails with type inference error" >> beam.GroupByKey() {noformat} Output: {noformat} $ python -m pipeline Partitioned output element type: Any <-- loss of typing info Default / main output element type: Union[TaggedOutput, Tuple[str, int]] <-- Having TaggedOutput here likely not helpful. # Second GBK fails with: Traceback (most recent call last): File "/home/valentyn/.pyenv/versions/3.9.4/lib/python3.9/runpy.py", line 197, in _run_module_as_main return _run_code(code, main_globals, None, File "/home/valentyn/.pyenv/versions/3.9.4/lib/python3.9/runpy.py", line 87, in _run_code exec(code, run_globals) File "/tmp/pipe1.py", line 34, in <module> unknown_team | "GBK fails with type inference error" >> beam.GroupByKey() File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pvalue.py", line 137, in __or__ return self.pipeline.apply(ptransform, self) File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py", line 651, in apply return self.apply( File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py", line 662, in apply return self.apply(transform, pvalueish) File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py", line 706, in apply transform.type_check_inputs(pvalueish) File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/ptransform.py", line 457, in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input') File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/ptransform.py", line 486, in type_check_inputs_or_outputs raise TypeCheckError( apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GBK fails with type inference error: expected Tuple[TypeVariable[K], TypeVariable[V]], got Union[TaggedOutput, Tuple[str, int]] Full type hint: IOTypeHints[inputs=((Tuple[TypeVariable[K], TypeVariable[V]],), {}), outputs=((Tuple[TypeVariable[K], Iterable[TypeVariable[V]]],), {})] File "<frozen importlib._bootstrap>", line 680, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 790, in exec_module File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/core.py", line 2546, in <module> class GroupByKey(PTransform): File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/typehints/decorators.py", line 775, in annotate_input_types th = getattr(f, '_type_hints', IOTypeHints.empty()).with_input_types( based on: IOTypeHints[inputs=None, outputs=((Tuple[TypeVariable[K], Iterable[TypeVariable[V]]],), {})] File "<frozen importlib._bootstrap>", line 680, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 790, in exec_module File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/core.py", line 2546, in <module> class GroupByKey(PTransform): File "/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/typehints/decorators.py", line 863, in annotate_output_types f._type_hints = th.with_output_types(return_type_hint) # pylint: disable=protected-access {noformat} > Element type inference doesn't work for multi-output DoFns > ---------------------------------------------------------- > > Key: BEAM-4132 > URL: https://issues.apache.org/jira/browse/BEAM-4132 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Affects Versions: 2.4.0 > Reporter: Chuan Yu Foo > Priority: P3 > Labels: types > Time Spent: 2h 50m > Remaining Estimate: 0h > > TLDR: if you have a multi-output DoFn, then the non-main PCollections with > incorrectly have their element types set to None. This affects type checking > for pipelines involving these PCollections. > Minimal example: > {code} > import apache_beam as beam > class TripleDoFn(beam.DoFn): > def process(self, elem): > yield_elem > if elem % 2 == 0: > yield beam.pvalue.TaggedOutput('ten_times', elem * 10) > if elem % 3 == 0: > yield beam.pvalue.TaggedOutput('hundred_times', elem * 100) > > @beam.typehints.with_input_types(int) > @beam.typehints.with_output_types(int) > class MultiplyBy(beam.DoFn): > def __init__(self, multiplier): > self._multiplier = multiplier > def process(self, elem): > return elem * self._multiplier > > def main(): > with beam.Pipeline() as p: > x, a, b = ( > p > | 'Create' >> beam.Create([1, 2, 3]) > | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs( > 'ten_times', 'hundred_times', main='main_output')) > _ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2)) > if __name__ == '__main__': > main() > {code} > Running this yields the following error: > {noformat} > apache_beam.typehints.decorators.TypeCheckError: Type hint violation for > 'MultiplyBy2': requires <type 'int'> but got None for elem > {noformat} > Replacing {{a}} with {{b}}Â yields the same error. Replacing {{a}} with {{x}} > instead yields the following error: > {noformat} > apache_beam.typehints.decorators.TypeCheckError: Type hint violation for > 'MultiplyBy2': requires <type 'int'> but got Union[TaggedOutput, int] for elem > {noformat} > I would expect Beam to correctly infer that {{a}} and {{b}} have element > types of {{int}} rather than {{None}}, and I would also expect Beam to > correctly figure out that the element types of {{x}} are compatible with > {{int}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)