[ https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878137#comment-16878137 ]
Enrico Canzonieri commented on BEAM-7678: ----------------------------------------- >From my understanding when we assign a typehints to a DoFn, that should result >in {{output.element_type}} being set to the proper type in: >[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]{{}} I experimented with the following change, which seems to work: {code} diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index a0e8a72759..fa67242c36 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -640,7 +640,7 @@ class Pipeline(object): if len(transform_node.outputs) == 1: # The runner often has expectations about the output types as well. output, = transform_node.outputs.values() - output.element_type = transform_node.transform.infer_output_type( + output.element_type = output.element_type or transform_node.transform.infer_output_type( pcoll.element_type) {code} I'm not sure whether this will introduce any regression though. > typehints with_output_types annotation doesn't work for stateful DoFn > ---------------------------------------------------------------------- > > Key: BEAM-7678 > URL: https://issues.apache.org/jira/browse/BEAM-7678 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Affects Versions: 2.13.0 > Reporter: Enrico Canzonieri > Priority: Minor > > The output types typehints seem to be ignored when using a stateful DoFn, but > the same typehint works perfectly when used without state. This issue > prevents a custom Coder from being used because Beam will default to one of > the {{FastCoders}} (I believe Pickle). > Example code: > {code} > @typehints.with_output_types(Message) > class StatefulDoFn(DoFn): > COUNTER_STATE = BagStateSpec('counter', VarIntCoder()) > def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)): > (key, messages) = element > newMessage = Message() > return [newMessage] > {code} > The example code is just defining a stateful DoFn for python. The used runner > is the Flink 1.6.4 portable runner. > Finally, overriding {{infer_output_type}} to return a > {{typehints.List[Message]}} solves the issue. > Looking at the code, it seems to me that in > [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643] > we do not take the typehints into account. -- This message was sent by Atlassian JIRA (v7.6.3#76005)