[ https://issues.apache.org/jira/browse/BEAM-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16981828#comment-16981828 ]
Valentyn Tymofieiev edited comment on BEAM-8441 at 11/25/19 8:21 PM: --------------------------------------------------------------------- Since there is the pipeline main module includes a "super()" invocation, and there is "dill.load_session" in the stack trace, this error is caused by BEAM-6158. Can you please take a look at [1] and see if this can help address your issue? [1] https://issues.apache.org/jira/browse/BEAM-6158?focusedCommentId=16919945&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16919945 was (Author: tvalentyn): Since I see "dill.load_session" in stack trace, I suspect you hit BEAM-6158. Can you please take a look at [1] and see if this can help address your issue? [1] https://issues.apache.org/jira/browse/BEAM-6158?focusedCommentId=16919945&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16919945 > Side-Input in Python3 fails to pickle class > ------------------------------------------- > > Key: BEAM-8441 > URL: https://issues.apache.org/jira/browse/BEAM-8441 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Jannik Franz > Priority: Major > > When running Apache Beam with Python3 on Google Cloud Dataflow Sideinputs > don't work. > When testing it in the local/direct runner there seems to be no issue. > > > {code:java} > class FlattenCustomActions(beam.PTransform): > """ Transforms Facebook Day Actions Only retains actions with > custom_conversions > Flattens the actions > Adds custom conversions names using a side input > """ > def __init__(self, conversions): > super(FlattenCustomActions, self).__init__() > self.conversions = conversions def expand(self, input_or_inputs): > return ( > input_or_inputs > | "FlattenActions" >> beam.ParDo(flatten_filter_actions) > | "AddConversionName" >> beam.Map(add_conversion_name, > self.conversions) > ) > # ... > # in run(): > pipeline_options = PipelineOptions(pipeline_args) > pipeline_options.view_as(SetupOptions).save_main_session = True > p = beam.Pipeline(options=pipeline_options) > conversions_output = ( > p > | "ReadConversions" >> ReadFromText(known_args.input_conversions, > coder=JsonCoder()) > | TransformConversionMetadata() > ) ( > conversions_output > | "WriteConversions" > >> WriteCoerced( > known_args.output_conversions, > known_args.output_type, > schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH, > ) > ) ( > p > | ReadFacebookJson(known_args.input, retain_root_fields=True) > | FlattenCustomActions(beam.pvalue.AsList(conversions_output)) > | "WriteActions" > >> WriteCoerced( > known_args.output, known_args.output_type, > schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH > ) > ){code} > > I receive the following Traceback in Dataflow: > {code:java} > Traceback (most recent call last): File > "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line > 773, in run self._load_main_session(self.local_staging_directory) File > "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line > 489, in _load_main_session pickler.load_session(session_file) File > "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", > line 287, in load_session return dill.load_session(file_path) File > "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in > load_session module = unpickler.load() File > "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in > find_class return StockUnpickler.find_class(self, module, name) > AttributeError: Can't get attribute 'FlattenCustomActions' on <module > 'dataflow_worker.start' from > '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'> > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)