[
https://issues.apache.org/jira/browse/BEAM-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548819#comment-17548819
]
Danny McCormick commented on BEAM-10384:
----------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20434
> Dataflow can't pickle WeakRefDictionary
> ---------------------------------------
>
> Key: BEAM-10384
> URL: https://issues.apache.org/jira/browse/BEAM-10384
> Project: Beam
> Issue Type: New Feature
> Components: runner-dataflow
> Affects Versions: 2.22.0
> Environment: Ubuntu LTS (direct runner)
> Reporter: Maurice Poirrier
> Priority: P3
> Labels: GCP
> Original Estimate: 120h
> Remaining Estimate: 120h
>
> we are trying to deploy an Streaming pipeline to Dataflow where we separate
> in few different "routes" that we manipulate differently the data.
> We did the complete development with the DirectRunner, and works smoothly as
> we tested but now, that we did deployed it to Dataflow, it does not work.
>
> {code:java}
> class SplitByRoute(beam.DoFn):
> OUTPUT_TAG_ROUTE_ONE= "route_one"
> OUTPUT_TAG_ROUTE_TWO = "route_two"
> OUTPUT_NOT_SUPPORTED = "not_supported"
> def _init_(self):
> beam.DoFn._init_(self)
> def process(self, elem):
> try:
> route = self.define_route(elem["param"]) # Just tag it depending
> on param
> except Exception:
> route = None
> logging.info(f"Routed to {route}")
> if route == self.OUTPUT_TAG_ROUTE_ONE:
> yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
> elif route == self.OUTPUT_TAG_ROUTE_TWO:
> logging.info(f"Element: {elem}")
> yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
> else:
> yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
> {code}
>
>
> The code fails when yielding on the following doFn
>
> It does log the element, yield the output and fails with the following error
> `AttributeError: Can't pickle local object
> 'WeakValueDictionary.__init__.<locals>.remove' [while running
> 'generatedPtransform-3196']`
> Other considerations are that we use taggedOutputs on the pipeline before
> this DoFn, and it works on Dataflow but this one in particularly fails with
> the error mentioned.
> Any suggestions so how we could manage this? It's been very frustrating error.
> Thank you!!! :)
>
>
> Edit:
> The problem is when we use DataflowRunner you can not modify an object that
> we were passing as message (element in the example) and then send it.
> For example
> {code:java}
> elem = {
> 'param': OurClass(),
> 'param2': 'stuf'
> }
> class OurClass:
> def __init__(self):
> self.something = None
> def dosomething(self):
> self.something = 1
> self.other = 2{code}
>
> So, on the code on the top we used, define_route to check which route the
> data will go. But on define_route we called dosomething method to define the
> route. So the instance of the class is modified and then, we it tried to
> pickle this message, the code fails.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)