You will need to set the save_main_session pipeline option to True. Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote: > Hello. > > I would like to ask question for ParDo . > > I am getting below error inside TaskManager when running code on Apache > Flink using Portable Runner. > ===================================================== > .... > File > "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 1078, in _create_pardo_operation > dofn_data = pickler.loads(serialized_fn) > File > "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", > line 265, in loads > return dill.loads(s) > File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, > in loads > return load(file, ignore) > File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, > in load > obj = pik.load() > File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, > in find_class > return StockUnpickler.find_class(self, module, name) > AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module > 'apache_beam.runners.worker.sdk_worker_main' from > '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> > ===================================================== > > " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below. > ===================================================== > frames, counts = ({'pre': pcollPre, 'post': pcollPost} > | 'combined:cogroup' >> beam.CoGroupByKey() > | 'combined:exclude' >> beam.Filter(lambda x: > (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0)) > | 'combined:flat' >> > beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds)) > .with_outputs('counts', > main='frames')) > ===================================================== > > In the same file I have defined the class as below. > ===================================================== > class FlattenTagFilesFn(beam.DoFn): > def __init__(self, s3Bucket, s3Creds, maxKeys=1000): > self.s3Bucket = s3Bucket > self.s3Creds = s3Creds > self.maxKeys = maxKeys > ===================================================== > > This is not a problem when running pipeline using DirectRunner. > May I ask , how should I import class for ParDo when running on Flink ? > > Thanks, > Yu Watanabe > > -- > Yu Watanabe > Weekend Freelancer who loves to challenge building data platform > yu.w.ten...@gmail.com > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: > Twitter icon] <https://twitter.com/yuwtennis> >