[ 
https://issues.apache.org/jira/browse/BEAM-4132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510883#comment-17510883
 ] 

Valentyn Tymofieiev commented on BEAM-4132:
-------------------------------------------

Ok, looks like the issue is reproducible only on the default output after 
Partitioning, but not on other outputs. Although inference there also seems 
suboptimal. Repro:

{noformat}
import apache_beam as beam

PLAYER_1 = 'Team A Player 1'
PLAYER_2 = 'Team B Player 2'
PLAYER_3 = 'Player 3'

def split_fn(kv):
  k, v = kv
  if 'Team A' in k:
    yield beam.pvalue.TaggedOutput('TeamA', kv)
  elif 'Team B' in k:
    yield beam.pvalue.TaggedOutput('TeamB', kv)
  else:
    yield kv

with beam.Pipeline() as p:

  scores = p | beam.Create([(PLAYER_1, 15), (PLAYER_2, 10), (PLAYER_1, 100),
                    (PLAYER_3, 25), (PLAYER_2, 75)])

  partitioned = scores | beam.FlatMap(split_fn).with_outputs('TeamA', 'TeamB', 
main='UnknownTeam')

  team_a = partitioned['TeamA']
  print("Partitioned output element type:", team_a.element_type)

  unknown_team = partitioned['UnknownTeam']
  print("Default / main output element type:", unknown_team.element_type)

  team_a | "GBK passes" >> beam.GroupByKey()
  unknown_team | "GBK fails with type inference error" >> beam.GroupByKey()

{noformat}

Output:

{noformat}
$ python -m pipeline
 
Partitioned output element type: Any            <--  loss of typing info
Default / main output element type: Union[TaggedOutput, Tuple[str, int]]   <--  
Having TaggedOutput here likely not helpful.

# Second GBK fails with:

Traceback (most recent call last):
  File "/home/valentyn/.pyenv/versions/3.9.4/lib/python3.9/runpy.py", line 197, 
in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/home/valentyn/.pyenv/versions/3.9.4/lib/python3.9/runpy.py", line 87, 
in _run_code
    exec(code, run_globals)
  File "/tmp/pipe1.py", line 34, in <module>
    unknown_team | "GBK fails with type inference error" >> beam.GroupByKey()
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pvalue.py", 
line 137, in __or__
    return self.pipeline.apply(ptransform, self)
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py", 
line 651, in apply
    return self.apply(
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py", 
line 662, in apply
    return self.apply(transform, pvalueish)
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/pipeline.py", 
line 706, in apply
    transform.type_check_inputs(pvalueish)
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/ptransform.py",
 line 457, in type_check_inputs
    self.type_check_inputs_or_outputs(pvalueish, 'input')
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/ptransform.py",
 line 486, in type_check_inputs_or_outputs
    raise TypeCheckError(
apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at 
GBK fails with type inference error: expected Tuple[TypeVariable[K], 
TypeVariable[V]], got Union[TaggedOutput, Tuple[str, int]]
Full type hint:
IOTypeHints[inputs=((Tuple[TypeVariable[K], TypeVariable[V]],), {}), 
outputs=((Tuple[TypeVariable[K], Iterable[TypeVariable[V]]],), {})]
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 790, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/core.py",
 line 2546, in <module>
    class GroupByKey(PTransform):
File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/typehints/decorators.py",
 line 775, in annotate_input_types
    th = getattr(f, '_type_hints', IOTypeHints.empty()).with_input_types(

based on:
  IOTypeHints[inputs=None, outputs=((Tuple[TypeVariable[K], 
Iterable[TypeVariable[V]]],), {})]
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 790, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/transforms/core.py",
 line 2546, in <module>
      class GroupByKey(PTransform):
  File 
"/home/valentyn/projects/beam/beam/beam/sdks/python/apache_beam/typehints/decorators.py",
 line 863, in annotate_output_types
      f._type_hints = th.with_output_types(return_type_hint)  # pylint: 
disable=protected-access

{noformat}


> Element type inference doesn't work for multi-output DoFns
> ----------------------------------------------------------
>
>                 Key: BEAM-4132
>                 URL: https://issues.apache.org/jira/browse/BEAM-4132
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.4.0
>            Reporter: Chuan Yu Foo
>            Priority: P3
>              Labels: types
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> TLDR: if you have a multi-output DoFn, then the non-main PCollections with 
> incorrectly have their element types set to None. This affects type checking 
> for pipelines involving these PCollections.
> Minimal example:
> {code}
> import apache_beam as beam
> class TripleDoFn(beam.DoFn):
>   def process(self, elem):
>     yield_elem
>     if elem % 2 == 0:
>       yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
>     if elem % 3 == 0:
>       yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
>       
> @beam.typehints.with_input_types(int)
> @beam.typehints.with_output_types(int)
> class MultiplyBy(beam.DoFn):
>   def __init__(self, multiplier):
>     self._multiplier = multiplier
>   def process(self, elem):
>     return elem * self._multiplier
>   
> def main():
>   with beam.Pipeline() as p:
>     x, a, b = (
>       p
>       | 'Create' >> beam.Create([1, 2, 3])
>       | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
>         'ten_times', 'hundred_times', main='main_output'))
>     _ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))
> if __name__ == '__main__':
>   main()    
> {code}
> Running this yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
> 'MultiplyBy2': requires <type 'int'> but got None for elem
> {noformat}
> Replacing {{a}} with {{b}} yields the same error. Replacing {{a}} with {{x}} 
> instead yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
> 'MultiplyBy2': requires <type 'int'> but got Union[TaggedOutput, int] for elem
> {noformat}
> I would expect Beam to correctly infer that {{a}} and {{b}} have element 
> types of {{int}} rather than {{None}}, and I would also expect Beam to 
> correctly figure out that the element types of {{x}} are compatible with 
> {{int}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to