You will need to set the save_main_session pipeline option to True.

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote:

> Hello.
>
> I would like to ask question for ParDo .
>
> I am getting below error inside TaskManager when running code on Apache
> Flink using Portable Runner.
> =====================================================
> ....
>   File
> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1078, in _create_pardo_operation
>     dofn_data = pickler.loads(serialized_fn)
>   File
> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
> line 265, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
> in loads
>     return load(file, ignore)
>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
> in load
>     obj = pik.load()
>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
> in find_class
>     return StockUnpickler.find_class(self, module, name)
> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
> 'apache_beam.runners.worker.sdk_worker_main' from
> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
> =====================================================
>
> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below.
> =====================================================
>     frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>                       | 'combined:cogroup' >> beam.CoGroupByKey()
>                       | 'combined:exclude' >> beam.Filter(lambda x:
> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>                       | 'combined:flat'    >>
> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>                                               .with_outputs('counts',
> main='frames'))
> =====================================================
>
> In the same file I have defined the class as below.
> =====================================================
> class FlattenTagFilesFn(beam.DoFn):
>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>         self.s3Bucket = s3Bucket
>         self.s3Creds  = s3Creds
>         self.maxKeys  = maxKeys
> =====================================================
>
> This is not a problem when running  pipeline using DirectRunner.
> May I ask , how should I import class for ParDo when running on Flink ?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.ten...@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>

Reply via email to