I did not answer to Kyle's question. Sorry about that. > Did you try moving the imports from the process function to the top of main.py?
Yes . Consequently, I moved all "imports" to at top of each files unless I needed to exclusively import inside the function. On Mon, Oct 7, 2019 at 4:49 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote: > Thank you for the comment. > > I finally got this working. I would like to share my experience for people > whom are beginner with portable runner. > What I done was below items when calling functions and classes from > external package. > > 1. As Kyle said, I needed 'save_main_session' for sys path to persist > after pickling. > > 2. I needed to push all related files to worker nodes using > "extra_package" option to resolve dependency. > https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ > > 3. I needed to write import syntax in clear fashion otherwise I got below > error in task manager. > Looks like external packages is pushed in to > "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format > to work it out. > > ================================================================================================ > import utils > > ... > > File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", > line 18, in <module> > import utils > ImportError: No module named 'utils' > > ================================================================================================ > > Below is my import syntax for external package in main.py. Other files > also follow below syntax. > > ================================================================================================ > # > # Local application/library specific imports > # > import pkg_aif.utils as ut > from pkg_aif.beam_pardo import VerifyFn > from pkg_aif.beam_pardo import FlattenTagFilesFn > from pkg_aif.beam_states import StatefulBufferingFn > from pkg_aif.pipeline_wrapper import pipelineWrapper > from pkg_aif.frames import Frames > from pkg_aif.tag_counts import TagCounts > from pkg_aif.tags import Tags > from pkg_aif.es_credentials import EsCredentials > from pkg_aif.s3_credentials import S3Credentials > > ================================================================================================ > > Below are related information to above. > > Full options for PipelineOptions. > > ================================================================================================ > options = PipelineOptions([ > "--runner=PortableRunner", > "--environment_config={0}".format(DOCKER_REGISTRY), > "--environment_type=DOCKER", > "--experiments=beam_fn_api", > "--parallelism={0}".format(PARALLELISM), > "--job_endpoint=localhost:8099", > "--extra_package=PATH_TO_SDIST" > ]) > options.view_as(SetupOptions).save_main_session = True > > return beam.Pipeline(options=options) > > ================================================================================================ > > My setup.py is below. > > ================================================================================================ > import setuptools > > REQUIRED_PACKAGES = [ > 'apache-beam==2.15.0', > 'elasticsearch>=7.0.0,<8.0.0', > 'urllib3', > 'boto3' > ] > > setuptools.setup( > author = 'Yu Watanabe', > author_email = 'AUTHOR_EMAIL', > url = 'URL', > name = 'quality_validation', > version = '0.1', > install_requires = REQUIRED_PACKAGES, > packages = setuptools.find_packages(), > > ) > > ================================================================================================ > > Directory path to setup.py. > > ================================================================================================ > admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l > total 20 > drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 dist > -rw-r--r-- 1 admin admin 0 Sep 5 21:21 __init__.py > -rw-r--r-- 1 admin admin 3782 Oct 3 11:02 main.py > drwxr-xr-x 3 admin admin 4096 Oct 3 15:41 pkg_aif > drwxr-xr-x 2 admin admin 4096 Oct 2 19:30 quality_validation.egg-info > -rw-r--r-- 1 admin admin 517 Oct 1 15:21 setup.py > > ================================================================================================ > > Thanks, > Yu Watanabe > > On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver <kcwea...@google.com> wrote: > >> Did you try moving the imports from the process function to the top of >> main.py? >> >> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com >> >> >> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe <yu.w.ten...@gmail.com> >> wrote: >> >>> Hello. >>> >>> I would like to ask for help with resolving dependency issue for >>> imported module. >>> >>> I have a directory structure as below and I am trying to import Frames >>> class from frames.py to main.py. >>> ========================================= >>> quality-validation/ >>> bin/setup.py >>> main.py >>> modules/ >>> frames.py >>> <TRIMMED> >>> ========================================= >>> >>> However, when I run pipeline, I get below error at TaskManager. >>> ========================================= >>> <TRIMMED> >>> File "apache_beam/runners/common.py", line 942, in >>> apache_beam.runners.common._OutputProcessor.process_outputs >>> File "apache_beam/runners/worker/operations.py", line 143, in >>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive >>> File "apache_beam/runners/worker/operations.py", line 593, in >>> apache_beam.runners.worker.operations.DoOperation.process >>> File "apache_beam/runners/worker/operations.py", line 594, in >>> apache_beam.runners.worker.operations.DoOperation.process >>> File "apache_beam/runners/common.py", line 799, in >>> apache_beam.runners.common.DoFnRunner.receive >>> File "apache_beam/runners/common.py", line 805, in >>> apache_beam.runners.common.DoFnRunner.process >>> File "apache_beam/runners/common.py", line 872, in >>> apache_beam.runners.common.DoFnRunner._reraise_augmented >>> File >>> "/usr/local/lib/python3.5/site-packages/future/utils/__init__.py", line >>> 419, in raise_with_traceback >>> raise exc.with_traceback(traceback) >>> File "apache_beam/runners/common.py", line 803, in >>> apache_beam.runners.common.DoFnRunner.process >>> File "apache_beam/runners/common.py", line 465, in >>> apache_beam.runners.common.SimpleInvoker.invoke_process >>> File "apache_beam/runners/common.py", line 918, in >>> apache_beam.runners.common._OutputProcessor.process_outputs >>> File "/home/admin/quality-validation/bin/main.py", line 44, in process >>> ImportError: No module named 'frames' [while running >>> 'combined:flat/ParDo(FlattenTagFilesFn)'] >>> ========================================= >>> >>> I import modules at global context and also at top of the process >>> function . >>> ========================================= >>> [main.py] >>> # >>> # Standard library imports >>> # >>> import logging >>> import pprint >>> import sys >>> sys.path.append("{0}/modules".format(sys.path[0])) >>> sys.path.append("{0}/modules/vendor".format(sys.path[0])) >>> >>> # >>> # Related third party imports >>> # >>> import apache_beam as beam >>> >>> # >>> # Local application/library specific imports >>> # >>> import utils >>> from pipeline_wrapper import pipelineWrapper >>> from tag_counts import TagCounts >>> from tags import Tags >>> >>> <TRIMMED> >>> >>> class FlattenTagFilesFn(beam.DoFn): >>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000): >>> beam.DoFn.__init__(self) >>> >>> self.s3Bucket = s3Bucket >>> self.s3Creds = s3Creds >>> self.maxKeys = maxKeys >>> >>> def process(self, elem): >>> import yaml >>> from frames import Frames >>> >>> if not hasattr(self, 's3Client'): >>> import boto3 >>> self.s3Client = boto3.client('s3', >>> aws_access_key_id=self.s3Creds[0], >>> aws_secret_access_key=self.s3Creds[1]) >>> >>> (key, info) = elem >>> >>> preFrm = {} >>> resp1 = self.s3Client.get_object(Bucket=self.s3Bucket, >>> Key=info['pre'][0][0]) >>> yaml1 = yaml.load(resp1['Body']) >>> >>> for elem in yaml1['body']: >>> preFrm[ elem['frame_tag']['frame_no'] ] = elem >>> >>> postFrm = {} >>> resp2 = self.s3Client.get_object(Bucket=self.s3Bucket, >>> Key=info['post'][0][0]) >>> yaml2 = yaml.load(resp2['Body']) >>> >>> for elem in yaml2['body']: >>> postFrm[ elem['frame_tag']['frame_no'] ] = elem >>> >>> commonFrmNums = >>> set(list(preFrm.keys())).intersection(list(postFrm.keys())) >>> >>> for f in commonFrmNums: >>> frames = Frames( >>> self.s3Bucket, >>> info['pre'][0][0], # Pre S3Key >>> info['post'][0][0], # Post S3Key >>> yaml1['head']['operator_id'], # Pre OperatorId >>> yaml2['head']['operator_id'], # Post OperatorId >>> preFrm[f], # Pre Frame Line >>> postFrm[f], # Post Frame Line >>> info['pre'][0][1], # Pre Last >>> Modified Time >>> info['post'][0][1]) # Post Last >>> Modified Time >>> >>> yield (frames) >>> >>> tagCounts = TagCounts( >>> self.s3Bucket, >>> yaml1, # Pre Yaml >>> yaml2, # Post Yaml >>> info['pre'][0][0], # Pre S3Key >>> info['post'][0][0], # Post S3Key >>> info['pre'][0][1], # Pre Last Modified Time >>> info['post'][0][1] ) # Post Last Modified Time >>> >>> yield beam.pvalue.TaggedOutput('counts', tagCounts) >>> ========================================= >>> >>> My pipeline options are below. I tried with and without "setup_file" but >>> made no difference. >>> ========================================= >>> options = PipelineOptions([ >>> "--runner=PortableRunner", >>> >>> "--environment_config={0}".format(self.__docker_registry), >>> "--environment_type=DOCKER", >>> "--experiments=beam_fn_api", >>> "--job_endpoint=localhost:8099" >>> ]) >>> options.view_as(SetupOptions).save_main_session = True >>> options.view_as(SetupOptions).setup_file = >>> '/home/admin/quality-validation/bin/setup.py' >>> ========================================= >>> >>> Is it possible to solve dependency in ParDo linked to external module >>> when using Apache Flink? >>> >>> Thanks, >>> Yu Watanabe >>> >>> -- >>> Yu Watanabe >>> Weekend Freelancer who loves to challenge building data platform >>> yu.w.ten...@gmail.com >>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: >>> Twitter icon] <https://twitter.com/yuwtennis> >>> >> > > -- > Yu Watanabe > Weekend Freelancer who loves to challenge building data platform > yu.w.ten...@gmail.com > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: > Twitter icon] <https://twitter.com/yuwtennis> > -- Yu Watanabe Weekend Freelancer who loves to challenge building data platform yu.w.ten...@gmail.com [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1> [image: Twitter icon] <https://twitter.com/yuwtennis>