I did not answer to Kyle's question. Sorry about that.

> Did you try moving the imports from the process function to the top of

  Yes . Consequently, I moved all "imports" to at top of each files unless
I needed to exclusively  import inside the function.

On Mon, Oct 7, 2019 at 4:49 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote:

> Thank you for the comment.
> I finally got this working. I would like to share my experience for people
> whom are beginner with portable runner.
> What I done was below items when calling functions and classes from
> external package.
> 1. As Kyle said, I needed 'save_main_session' for sys path to persist
> after pickling.
> 2. I needed to push all related files to worker nodes using
> "extra_package" option to resolve dependency.
> https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
> 3. I needed to write import syntax in clear fashion otherwise I got below
> error in task manager.
> Looks like external packages is pushed in to
> "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
> to work it out.
> ================================================================================================
> import utils
> ...
>   File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py",
> line 18, in <module>
>     import utils
> ImportError: No module named 'utils'
> ================================================================================================
> Below is my import syntax for external package in main.py. Other files
> also follow below syntax.
> ================================================================================================
> #
> # Local application/library specific imports
> #
> import pkg_aif.utils as ut
> from pkg_aif.beam_pardo import VerifyFn
> from pkg_aif.beam_pardo import FlattenTagFilesFn
> from pkg_aif.beam_states import StatefulBufferingFn
> from pkg_aif.pipeline_wrapper import pipelineWrapper
> from pkg_aif.frames import Frames
> from pkg_aif.tag_counts import TagCounts
> from pkg_aif.tags import Tags
> from pkg_aif.es_credentials import EsCredentials
> from pkg_aif.s3_credentials import S3Credentials
> ================================================================================================
> Below are related information to above.
> Full options for PipelineOptions.
> ================================================================================================
>         options = PipelineOptions([
>                       "--runner=PortableRunner",
>                       "--environment_config={0}".format(DOCKER_REGISTRY),
>                       "--environment_type=DOCKER",
>                       "--experiments=beam_fn_api",
>                       "--parallelism={0}".format(PARALLELISM),
>                       "--job_endpoint=localhost:8099",
>                       "--extra_package=PATH_TO_SDIST"
>                   ])
>         options.view_as(SetupOptions).save_main_session = True
>         return beam.Pipeline(options=options)
> ================================================================================================
> My setup.py is below.
> ================================================================================================
> import setuptools
>             'apache-beam==2.15.0',
>             'elasticsearch>=7.0.0,<8.0.0',
>             'urllib3',
>             'boto3'
>                 ]
> setuptools.setup(
>    author       = 'Yu Watanabe',
>    author_email = 'AUTHOR_EMAIL',
>    url          = 'URL',
>    name         = 'quality_validation',
>    version      = '0.1',
>    install_requires = REQUIRED_PACKAGES,
>    packages     = setuptools.find_packages(),
> )
> ================================================================================================
> Directory path to setup.py.
> ================================================================================================
> admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
> total 20
> drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 dist
> -rw-r--r-- 1 admin admin    0 Sep  5 21:21 __init__.py
> -rw-r--r-- 1 admin admin 3782 Oct  3 11:02 main.py
> drwxr-xr-x 3 admin admin 4096 Oct  3 15:41 pkg_aif
> drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 quality_validation.egg-info
> -rw-r--r-- 1 admin admin  517 Oct  1 15:21 setup.py
> ================================================================================================
> Thanks,
> Yu Watanabe
> On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <kcwea...@google.com> wrote:
>> Did you try moving the imports from the process function to the top of
>> main.py?
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <yu.w.ten...@gmail.com>
>> wrote:
>>> Hello.
>>> I would like to ask for help with resolving dependency issue for
>>> imported module.
>>> I have a directory structure as below and I am trying to import Frames
>>> class from frames.py to main.py.
>>> =========================================
>>> quality-validation/
>>>     bin/setup.py
>>>           main.py
>>>           modules/
>>>             frames.py
>>>            <TRIMMED>
>>> =========================================
>>> However, when I run pipeline, I get below error at TaskManager.
>>> =========================================
>>>   File "apache_beam/runners/common.py", line 942, in
>>> apache_beam.runners.common._OutputProcessor.process_outputs
>>>   File "apache_beam/runners/worker/operations.py", line 143, in
>>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>>   File "apache_beam/runners/worker/operations.py", line 593, in
>>> apache_beam.runners.worker.operations.DoOperation.process
>>>   File "apache_beam/runners/worker/operations.py", line 594, in
>>> apache_beam.runners.worker.operations.DoOperation.process
>>>   File "apache_beam/runners/common.py", line 799, in
>>> apache_beam.runners.common.DoFnRunner.receive
>>>   File "apache_beam/runners/common.py", line 805, in
>>> apache_beam.runners.common.DoFnRunner.process
>>>   File "apache_beam/runners/common.py", line 872, in
>>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>>>   File
>>> "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py", line
>>> 419, in raise_with_traceback
>>>     raise exc.with_traceback(traceback)
>>>   File "apache_beam/runners/common.py", line 803, in
>>> apache_beam.runners.common.DoFnRunner.process
>>>   File "apache_beam/runners/common.py", line 465, in
>>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>>   File "apache_beam/runners/common.py", line 918, in
>>> apache_beam.runners.common._OutputProcessor.process_outputs
>>>   File "/home/admin/quality-validation/bin/main.py", line 44, in process
>>> ImportError: No module named 'frames' [while running
>>> 'combined:flat/ParDo(FlattenTagFilesFn)']
>>> =========================================
>>> I  import modules at global context  and also at top of the process
>>> function .
>>> =========================================
>>> [main.py]
>>> #
>>> # Standard library imports
>>> #
>>> import logging
>>> import pprint
>>> import sys
>>> sys.path.append("{0}/modules".format(sys.path[0]))
>>> sys.path.append("{0}/modules/vendor".format(sys.path[0]))
>>> #
>>> # Related third party imports
>>> #
>>> import apache_beam as beam
>>> #
>>> # Local application/library specific imports
>>> #
>>> import utils
>>> from pipeline_wrapper import pipelineWrapper
>>> from tag_counts import TagCounts
>>> from tags import Tags
>>> class FlattenTagFilesFn(beam.DoFn):
>>>     def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>>>         beam.DoFn.__init__(self)
>>>         self.s3Bucket = s3Bucket
>>>         self.s3Creds  = s3Creds
>>>         self.maxKeys  = maxKeys
>>>     def process(self, elem):
>>>         import yaml
>>>         from frames import Frames
>>>         if not hasattr(self, 's3Client'):
>>>             import boto3
>>>             self.s3Client = boto3.client('s3',
>>>                                 aws_access_key_id=self.s3Creds[0],
>>>                                 aws_secret_access_key=self.s3Creds[1])
>>>         (key, info) = elem
>>>         preFrm = {}
>>>         resp1 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['pre'][0][0])
>>>         yaml1 = yaml.load(resp1['Body'])
>>>         for elem in yaml1['body']:
>>>             preFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>         postFrm = {}
>>>         resp2 = self.s3Client.get_object(Bucket=self.s3Bucket,
>>> Key=info['post'][0][0])
>>>         yaml2 = yaml.load(resp2['Body'])
>>>         for elem in yaml2['body']:
>>>             postFrm[ elem['frame_tag']['frame_no'] ] = elem
>>>         commonFrmNums =
>>> set(list(preFrm.keys())).intersection(list(postFrm.keys()))
>>>         for f in commonFrmNums:
>>>             frames = Frames(
>>>                           self.s3Bucket,
>>>                           info['pre'][0][0],            # Pre S3Key
>>>                           info['post'][0][0],           # Post S3Key
>>>                           yaml1['head']['operator_id'], # Pre OperatorId
>>>                           yaml2['head']['operator_id'], # Post OperatorId
>>>                           preFrm[f],                    # Pre Frame Line
>>>                           postFrm[f],                   # Post Frame Line
>>>                           info['pre'][0][1],            # Pre Last
>>> Modified Time
>>>                           info['post'][0][1])           # Post Last
>>> Modified Time
>>>             yield (frames)
>>>         tagCounts = TagCounts(
>>>                          self.s3Bucket,
>>>                          yaml1,               # Pre Yaml
>>>                          yaml2,               # Post Yaml
>>>                          info['pre'][0][0],   # Pre S3Key
>>>                          info['post'][0][0],  # Post S3Key
>>>                          info['pre'][0][1],   # Pre Last Modified Time
>>>                          info['post'][0][1] ) # Post Last Modified Time
>>>         yield beam.pvalue.TaggedOutput('counts', tagCounts)
>>> =========================================
>>> My pipeline options are below. I tried with and without "setup_file" but
>>> made no difference.
>>> =========================================
>>>         options = PipelineOptions([
>>>                       "--runner=PortableRunner",
>>> "--environment_config={0}".format(self.__docker_registry),
>>>                       "--environment_type=DOCKER",
>>>                       "--experiments=beam_fn_api",
>>>                       "--job_endpoint=localhost:8099"
>>>                   ])
>>>         options.view_as(SetupOptions).save_main_session = True
>>>         options.view_as(SetupOptions).setup_file =
>>> '/home/admin/quality-validation/bin/setup.py'
>>> =========================================
>>> Is it possible to solve dependency in ParDo linked to external module
>>> when using Apache Flink?
>>> Thanks,
>>> Yu Watanabe
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.ten...@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.ten...@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>

Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>

Reply via email to