[ 
https://issues.apache.org/jira/browse/BEAM-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548819#comment-17548819
 ] 

Danny McCormick commented on BEAM-10384:
----------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/20434

> Dataflow can't pickle WeakRefDictionary
> ---------------------------------------
>
>                 Key: BEAM-10384
>                 URL: https://issues.apache.org/jira/browse/BEAM-10384
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-dataflow
>    Affects Versions: 2.22.0
>         Environment: Ubuntu LTS (direct runner)
>            Reporter: Maurice Poirrier
>            Priority: P3
>              Labels: GCP
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> we are trying to deploy an Streaming pipeline to Dataflow where we separate 
> in few different "routes" that we manipulate differently the data.
>  We did the complete development with the DirectRunner, and works smoothly as 
> we tested but now, that we did deployed it to Dataflow, it does not work.
>  
> {code:java}
> class SplitByRoute(beam.DoFn):
>     OUTPUT_TAG_ROUTE_ONE= "route_one"
>     OUTPUT_TAG_ROUTE_TWO = "route_two"
>     OUTPUT_NOT_SUPPORTED = "not_supported"
>     def _init_(self):
>       beam.DoFn._init_(self)
>     def process(self, elem):
>         try:
>             route = self.define_route(elem["param"]) # Just tag it depending 
> on param 
>         except Exception: 
>             route = None
>         logging.info(f"Routed to {route}")
>         if route == self.OUTPUT_TAG_ROUTE_ONE:
>             yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
>         elif route == self.OUTPUT_TAG_ROUTE_TWO:
>             logging.info(f"Element: {elem}")
>             yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
>         else:
>             yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
> {code}
>  
>  
> The code fails when yielding on the following doFn
>  
> It does log the element, yield the output and fails with the following error
>  `AttributeError: Can't pickle local object 
> 'WeakValueDictionary.__init__.<locals>.remove' [while running 
> 'generatedPtransform-3196']`
> Other considerations are that we use taggedOutputs on the pipeline before 
> this DoFn, and it works on Dataflow but this one in particularly fails with 
> the error mentioned.
> Any suggestions so how we could manage this? It's been very frustrating error.
> Thank you!!! :)
>  
>  
> Edit:
> The problem is when we use DataflowRunner you can not modify an object that 
> we were passing as message (element in the example) and then send it. 
> For example
> {code:java}
> elem = {
>     'param': OurClass(),
>     'param2': 'stuf'
> }
> class OurClass:
>     def __init__(self):
>         self.something = None
>     def dosomething(self):
>         self.something = 1
>         self.other = 2{code}
>  
> So, on the code on the top we used, define_route to check which route the 
> data will go. But on define_route we called dosomething method to define the 
> route. So the instance of the class is modified and then, we it tried to 
> pickle this message, the code fails.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to