Thank you for the reply.

" save_main_session" did not work, however, situation had changed.

1. get_all_options() output. "save_main_session" set to True.
2019-09-26 09:04:11,586 DEBUG Pipeline Options:
{'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
None, 'dataflow_endpoint': '',
'environment_config': '',
'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
'default', 'profile_memory': False, 'max_num_workers': None,
'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
'setup_file': None, 'network': None, 'on_success_matcher': None,
'requirements_cache': None, 'service_account_email': None,
'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
'profile_location': None, 'direct_runner_use_stacked_bundle': True,
'use_public_ips': None, ***** 'save_main_session': True, *******
'direct_num_workers': 1, 'num_workers': None,
'worker_harness_container_image': None, 'template_location': None,
'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
'PortableRunner', 'project': None, 'dataflow_kms_key': None,
'job_endpoint': 'localhost:8099', 'extra_packages': None,
'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
None, 'staging_location': None, 'job_name': None, 'no_auth': False,
'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}

2. Error in Task Manager log did not change.
  File "/usr/local/lib/python3.5/site-packages/dill/", line 474, in
    return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
'apache_beam.runners.worker.sdk_worker_main' from


3. However, if I comment out "super().__init__()" in my code , error
line 1078, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
line 265, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.5/site-packages/dill/", line 317, in
    return load(file, ignore)
  File "/usr/local/lib/python3.5/site-packages/dill/", line 305, in
    obj = pik.load()
  File "/usr/local/lib/python3.5/site-packages/dill/", line 474, in
    return StockUnpickler.find_class(self, module, name)
ImportError: No module named 's3_credentials'

4. My whole class is below.
class FlattenTagFilesFn(beam.DoFn):
    def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
        self.s3Bucket = s3Bucket
        self.s3Creds  = s3Creds
        self.maxKeys  = maxKeys


    def process(self, elem):

        if not hasattr(self, 's3Client'):
            import boto3
            self.s3Client = boto3.client('s3',



        (key, info) = elem

        preFrm = {}
        resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
        yaml1 = yaml.load(resp1['Body'])

        for elem in yaml1['body']:
            preFrm[ elem['frame_tag']['frame_no'] ] = elem

        postFrm = {}
        resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
        yaml2 = yaml.load(resp2['Body'])

        for elem in yaml2['body']:
            postFrm[ elem['frame_tag']['frame_no'] ] = elem

        commonFrmNums =

        for f in commonFrmNums:
            frames = Frames(
                          info['pre'][0][0],            # Pre S3Key
                          info['post'][0][0],           # Post S3Key
                          yaml1['head']['operator_id'], # Pre OperatorId
                          yaml2['head']['operator_id'], # Post OperatorId
                          preFrm[f],                    # Pre Frame Line
                          postFrm[f],                   # Post Frame Line
                          info['pre'][0][1],            # Pre Last Modified
                          info['post'][0][1])           # Post Last
Modified Time

            yield (frames)

        tagCounts = TagCounts(
                         yaml1,               # Pre Yaml
                         yaml2,               # Post Yaml
                         info['pre'][0][0],   # Pre S3Key
                         info['post'][0][0],  # Post S3Key
                         info['pre'][0][1],   # Pre Last Modified Time
                         info['post'][0][1] ) # Post Last Modified Time

        yield beam.pvalue.TaggedOutput('counts', tagCounts)

I was using super() to define single instance of boto instance in ParDo.
May I ask, is there a way to call super() in the constructor of ParDo ?


On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver <> wrote:

> You will need to set the save_main_session pipeline option to True.
> Kyle Weaver | Software Engineer | |
> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe <> wrote:
>> Hello.
>> I would like to ask question for ParDo .
>> I am getting below error inside TaskManager when running code on Apache
>> Flink using Portable Runner.
>> =====================================================
>> ....
>>   File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/",
>> line 1078, in _create_pardo_operation
>>     dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/",
>> line 265, in loads
>>     return dill.loads(s)
>>   File "/usr/local/lib/python3.5/site-packages/dill/", line 317,
>> in loads
>>     return load(file, ignore)
>>   File "/usr/local/lib/python3.5/site-packages/dill/", line 305,
>> in load
>>     obj = pik.load()
>>   File "/usr/local/lib/python3.5/site-packages/dill/", line 474,
>> in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on <module
>> 'apache_beam.runners.worker.sdk_worker_main' from
>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/'>
>> =====================================================
>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as
>> below.
>> =====================================================
>>     frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>>                       | 'combined:cogroup' >> beam.CoGroupByKey()
>>                       | 'combined:exclude' >> beam.Filter(lambda x:
>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>>                       | 'combined:flat'    >>
>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>>                                               .with_outputs('counts',
>> main='frames'))
>> =====================================================
>> In the same file I have defined the class as below.
>> =====================================================
>> class FlattenTagFilesFn(beam.DoFn):
>>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>         self.s3Bucket = s3Bucket
>>         self.s3Creds  = s3Creds
>>         self.maxKeys  = maxKeys
>> =====================================================
>> This is not a problem when running  pipeline using DirectRunner.
>> May I ask , how should I import class for ParDo when running on Flink ?
>> Thanks,
>> Yu Watanabe
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> [image: LinkedIn icon] <>  [image:
>> Twitter icon] <>

Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[image: LinkedIn icon] <>  [image:
Twitter icon] <>

Reply via email to