Thank you for the help. I have chosen to remove the super().__init__() .
Thanks, Yu On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka <goe...@google.com> wrote: > super has some issues wile pickling in python3. Please refer > https://github.com/uqfoundation/dill/issues/300 for more details. > > Removing reference to super in your dofn should help. > > On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote: > >> Thank you for the reply. >> >> " save_main_session" did not work, however, situation had changed. >> >> 1. get_all_options() output. "save_main_session" set to True. >> >> ================================================================================= >> 2019-09-26 09:04:11,586 DEBUG Pipeline Options: >> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform': >> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com', >> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest', >> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location': >> 'default', 'profile_memory': False, 'max_num_workers': None, >> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False, >> 'setup_file': None, 'network': None, 'on_success_matcher': None, >> 'requirements_cache': None, 'service_account_email': None, >> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None, >> 'profile_location': None, 'direct_runner_use_stacked_bundle': True, >> 'use_public_ips': None, ***** 'save_main_session': True, ******* >> 'direct_num_workers': 1, 'num_workers': None, >> 'worker_harness_container_image': None, 'template_location': None, >> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False, >> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner': >> 'PortableRunner', 'project': None, 'dataflow_kms_key': None, >> 'job_endpoint': 'localhost:8099', 'extra_packages': None, >> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm': >> None, 'staging_location': None, 'job_name': None, 'no_auth': False, >> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0, >> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None, >> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism': >> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None, >> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file': >> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None} >> >> >> ================================================================================= >> >> 2. Error in Task Manager log did not change. >> >> ================================================================================== >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >> in find_class >> return StockUnpickler.find_class(self, module, name) >> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module >> 'apache_beam.runners.worker.sdk_worker_main' from >> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >> >> ================================================================================== >> >> 3. However, if I comment out "super().__init__()" in my code , error >> changes. >> >> ================================================================================== >> File >> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 1078, in _create_pardo_operation >> dofn_data = pickler.loads(serialized_fn) >> File >> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >> line 265, in loads >> return dill.loads(s) >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, >> in loads >> return load(file, ignore) >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, >> in load >> obj = pik.load() >> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, >> in find_class >> return StockUnpickler.find_class(self, module, name) >> ImportError: No module named 's3_credentials' >> ================================================================================== >> >> >> 4. My whole class is below. >> >> ================================================================================== >> class FlattenTagFilesFn(beam.DoFn): >> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >> self.s3Bucket = s3Bucket >> self.s3Creds = s3Creds >> self.maxKeys = maxKeys >> >> super().__init__() >> >> def process(self, elem): >> >> if not hasattr(self, 's3Client'): >> import boto3 >> self.s3Client = boto3.client('s3', >> >> aws_access_key_id=self.s3Creds.awsAccessKeyId, >> >> aws_secret_access_key=self.s3Creds.awsSecretAccessKey) >> >> (key, info) = elem >> >> preFrm = {} >> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket, >> Key=info['pre'][0][0]) >> yaml1 = yaml.load(resp1['Body']) >> >> for elem in yaml1['body']: >> preFrm[ elem['frame_tag']['frame_no'] ] = elem >> >> postFrm = {} >> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket, >> Key=info['post'][0][0]) >> yaml2 = yaml.load(resp2['Body']) >> >> for elem in yaml2['body']: >> postFrm[ elem['frame_tag']['frame_no'] ] = elem >> >> commonFrmNums = >> set(list(preFrm.keys())).intersection(list(postFrm.keys())) >> >> for f in commonFrmNums: >> frames = Frames( >> self.s3Bucket, >> info['pre'][0][0], # Pre S3Key >> info['post'][0][0], # Post S3Key >> yaml1['head']['operator_id'], # Pre OperatorId >> yaml2['head']['operator_id'], # Post OperatorId >> preFrm[f], # Pre Frame Line >> postFrm[f], # Post Frame Line >> info['pre'][0][1], # Pre Last >> Modified Time >> info['post'][0][1]) # Post Last >> Modified Time >> >> yield (frames) >> >> tagCounts = TagCounts( >> self.s3Bucket, >> yaml1, # Pre Yaml >> yaml2, # Post Yaml >> info['pre'][0][0], # Pre S3Key >> info['post'][0][0], # Post S3Key >> info['pre'][0][1], # Pre Last Modified Time >> info['post'][0][1] ) # Post Last Modified Time >> >> yield beam.pvalue.TaggedOutput('counts', tagCounts) >> >> ================================================================================== >> >> I was using super() to define single instance of boto instance in ParDo. >> May I ask, is there a way to call super() in the constructor of ParDo ? >> >> Thanks, >> Yu >> >> >> On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <kcwea...@google.com> wrote: >> >>> You will need to set the save_main_session pipeline option to True. >>> >>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com >>> >>> >>> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <yu.w.ten...@gmail.com> >>> wrote: >>> >>>> Hello. >>>> >>>> I would like to ask question for ParDo . >>>> >>>> I am getting below error inside TaskManager when running code on Apache >>>> Flink using Portable Runner. >>>> ===================================================== >>>> .... >>>> File >>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 1078, in _create_pardo_operation >>>> dofn_data = pickler.loads(serialized_fn) >>>> File >>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py", >>>> line 265, in loads >>>> return dill.loads(s) >>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line >>>> 317, in loads >>>> return load(file, ignore) >>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line >>>> 305, in load >>>> obj = pik.load() >>>> File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line >>>> 474, in find_class >>>> return StockUnpickler.find_class(self, module, name) >>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module >>>> 'apache_beam.runners.worker.sdk_worker_main' from >>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'> >>>> ===================================================== >>>> >>>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as >>>> below. >>>> ===================================================== >>>> frames, counts = ({'pre': pcollPre, 'post': pcollPost} >>>> | 'combined:cogroup' >> beam.CoGroupByKey() >>>> | 'combined:exclude' >> beam.Filter(lambda x: >>>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0)) >>>> | 'combined:flat' >> >>>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds)) >>>> .with_outputs('counts', >>>> main='frames')) >>>> ===================================================== >>>> >>>> In the same file I have defined the class as below. >>>> ===================================================== >>>> class FlattenTagFilesFn(beam.DoFn): >>>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >>>> self.s3Bucket = s3Bucket >>>> self.s3Creds = s3Creds >>>> self.maxKeys = maxKeys >>>> ===================================================== >>>> >>>> This is not a problem when running pipeline using DirectRunner. >>>> May I ask , how should I import class for ParDo when running on Flink ? >>>> >>>> Thanks, >>>> Yu Watanabe >>>> >>>> -- >>>> Yu Watanabe >>>> Weekend Freelancer who loves to challenge building data platform >>>> yu.w.ten...@gmail.com >>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>>> Twitter icon] <https://twitter.com/yuwtennis> >>>> >>> >> >> -- >> Yu Watanabe >> Weekend Freelancer who loves to challenge building data platform >> yu.w.ten...@gmail.com >> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >> Twitter icon] <https://twitter.com/yuwtennis> >> > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>