[ 
https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878137#comment-16878137
 ] 

Enrico Canzonieri commented on BEAM-7678:
-----------------------------------------

>From my understanding when we assign a typehints to a DoFn, that should result 
>in {{output.element_type}} being set to the proper type in: 
>[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]{{}}

I experimented with the following change, which seems to work:
{code}
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index a0e8a72759..fa67242c36 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -640,7 +640,7 @@ class Pipeline(object):
           if len(transform_node.outputs) == 1:
             # The runner often has expectations about the output types as well.
             output, = transform_node.outputs.values()
-            output.element_type = transform_node.transform.infer_output_type(
+            output.element_type = output.element_type or 
transform_node.transform.infer_output_type(
                 pcoll.element_type)
{code}
I'm not sure whether this will introduce any regression though.

> typehints with_output_types annotation doesn't work for stateful DoFn 
> ----------------------------------------------------------------------
>
>                 Key: BEAM-7678
>                 URL: https://issues.apache.org/jira/browse/BEAM-7678
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.13.0
>            Reporter: Enrico Canzonieri
>            Priority: Minor
>
> The output types typehints seem to be ignored when using a stateful DoFn, but 
> the same typehint works perfectly when used without state. This issue 
> prevents a custom Coder from being used because Beam will default to one of 
> the {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
>     COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
>     def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
>       (key, messages) = element
>       newMessage = Message()
>       return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner 
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a 
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in 
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
>  we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to