I tried a simple pipeline which is runner perfectly on local runner and the
same issue on dataflow. see below. Is there anything at the environment
that need to be updated that I am not aware of?

Many thanks for any reference.
Eila

import  apache_beam  as  beam
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'PROJECT-ID'
google_cloud_options.job_name = 'try-debug'
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
#'gs://archs4/staging'
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL #'gs://archs4/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

p1 = beam.Pipeline(options=options)

(p1 | 'read' >>
beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
    | 'write' >> beam.io.WriteToText('gs://bucket/test.txt', num_shards=1)
 )

p1.run().wait_until_finish()

will fire the following error:

CalledProcessErrorTraceback (most recent call last)
<ipython-input-17-b4be63f7802f> in <module>()
      5  )
      6
----> 7 p1.run().wait_until_finish()

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc
in run(self, test_runner_api)
    174       finally:
    175         shutil.rmtree(tmpdir)
--> 176     return self.runner.run(self)
    177
    178   def __enter__(self):

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
in run(self, pipeline)
    250     # Create the job
    251     result = DataflowPipelineResult(
--> 252         self.dataflow_client.create_job(self.job), self)
    253
    254     self._metrics = DataflowMetrics(self.dataflow_client,
result, self.job)

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/retry.pyc
in wrapper(*args, **kwargs)
    166       while True:
    167         try:
--> 168           return fun(*args, **kwargs)
    169         except Exception as exn:  # pylint: disable=broad-except
    170           if not retry_filter(exn):

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
in create_job(self, job)
    423   def create_job(self, job):
    424     """Creates job description. May stage and/or submit for
remote execution."""
--> 425     self.create_job_description(job)
    426
    427     # Stage and submit the job when necessary

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
in create_job_description(self, job)
    446     """Creates a job described by the workflow proto."""
    447     resources = dependency.stage_job_resources(
--> 448         job.options, file_copy=self._gcs_file_copy)
    449     job.proto.environment = Environment(
    450         packages=resources, options=job.options,

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in stage_job_resources(options, file_copy, build_setup_args, temp_dir,
populate_requirements_cache)
    377       else:
    378         sdk_remote_location = setup_options.sdk_location
--> 379       _stage_beam_sdk_tarball(sdk_remote_location,
staged_path, temp_dir)
    380       resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
    381     else:

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
    462   elif sdk_remote_location == 'pypi':
    463     logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
--> 464     _dependency_file_copy(_download_pypi_sdk_package(temp_dir),
staged_path)
    465   else:
    466     raise RuntimeError(

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
in _download_pypi_sdk_package(temp_dir)
    525       '--no-binary', ':all:', '--no-deps']
    526   logging.info('Executing command: %s', cmd_args)
--> 527   processes.check_call(cmd_args)
    528   zip_expected = os.path.join(
    529       temp_dir, '%s-%s.zip' % (package_name, version))

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/processes.pyc
in check_call(*args, **kwargs)
     42   if force_shell:
     43     kwargs['shell'] = True
---> 44   return subprocess.check_call(*args, **kwargs)
     45
     46

/usr/local/envs/py2env/lib/python2.7/subprocess.pyc in
check_call(*popenargs, **kwargs)
    188         if cmd is None:
    189             cmd = popenargs[0]
--> 190         raise CalledProcessError(retcode, cmd)
    191     return 0
    192

CalledProcessError: Command '['/usr/local/envs/py2env/bin/python',
'-m', 'pip', 'install', '--download', '/tmp/tmpyyiizo',
'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']'
returned non-zero exit status 2



On Wed, Aug 22, 2018 at 10:39 AM OrielResearch Eila Arich-Landkof <
[email protected]> wrote:

> Hello all,
>
> I am running a pipeline that used to be executed on dataflow with no
> issues. I am using the datalab environment. See below the error. To my
> understanding happening before the pipeline code is being is being executed.
> Any idea what went wrong?
>
> Thanks,
> Eila
>
>
> Executing the pipeline:
>
> *p.run().wait_until_finish()*
>
> The following error is being fired:
>
> INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', 
> 'setup.py', 'sdist', '--dist-dir', '/tmp/tmp_B0gnK']
> INFO:root:Starting GCS upload to 
> gs://archs4/staging/label-archs4-annotation-15.1534948236.075799/workflow.tar.gz...
> INFO:oauth2client.client:Attempting refresh to obtain initial access_token
> INFO:root:Completed GCS upload to 
> gs://archs4/staging/label-archs4-annotation-15.1534948236.075799/workflow.tar.gz
> INFO:root:Staging the SDK tarball from PyPI to 
> gs://archs4/staging/label-archs4-annotation-15.1534948236.075799/dataflow_python_sdk.tar
> INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', '-m', 
> 'pip', 'install', '--download', '/tmp/tmp_B0gnK', 
> 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']
>
> CalledProcessErrorTraceback (most recent call 
> last)<ipython-input-27-1e5aeb8b7d9b> in <module>()----> 1 
> p.run().wait_until_finish()
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc 
> in run(self, test_runner_api)    174       finally:    175         
> shutil.rmtree(tmpdir)--> 176     return self.runner.run(self)    177     178  
>  def __enter__(self):
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
>  in run(self, pipeline)    250     # Create the job    251     result = 
> DataflowPipelineResult(--> 252         
> self.dataflow_client.create_job(self.job), self)    253     254     
> self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/retry.pyc
>  in wrapper(*args, **kwargs)    166       while True:    167         try:--> 
> 168           return fun(*args, **kwargs)    169         except Exception as 
> exn:  # pylint: disable=broad-except    170           if not 
> retry_filter(exn):
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
>  in create_job(self, job)    423   def create_job(self, job):    424     
> """Creates job description. May stage and/or submit for remote 
> execution."""--> 425     self.create_job_description(job)    426     427     
> # Stage and submit the job when necessary
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.pyc
>  in create_job_description(self, job)    446     """Creates a job described 
> by the workflow proto."""    447     resources = 
> dependency.stage_job_resources(--> 448         job.options, 
> file_copy=self._gcs_file_copy)    449     job.proto.environment = 
> Environment(    450         packages=resources, options=job.options,
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
>  in stage_job_resources(options, file_copy, build_setup_args, temp_dir, 
> populate_requirements_cache)    377       else:    378         
> sdk_remote_location = setup_options.sdk_location--> 379       
> _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)    380    
>    resources.append(names.DATAFLOW_SDK_TARBALL_FILE)    381     else:
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
>  in _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)    
> 462   elif sdk_remote_location == 'pypi':    463     logging.info('Staging 
> the SDK tarball from PyPI to %s', staged_path)--> 464     
> _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)    
> 465   else:    466     raise RuntimeError(
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/internal/dependency.pyc
>  in _download_pypi_sdk_package(temp_dir)    525       '--no-binary', ':all:', 
> '--no-deps']    526   logging.info('Executing command: %s', cmd_args)--> 527  
>  processes.check_call(cmd_args)    528   zip_expected = os.path.join(    529  
>      temp_dir, '%s-%s.zip' % (package_name, version))
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/utils/processes.pyc
>  in check_call(*args, **kwargs)     42   if force_shell:     43     
> kwargs['shell'] = True---> 44   return subprocess.check_call(*args, **kwargs) 
>     45      46
> /usr/local/envs/py2env/lib/python2.7/subprocess.pyc in check_call(*popenargs, 
> **kwargs)    188         if cmd is None:    189             cmd = 
> popenargs[0]--> 190         raise CalledProcessError(retcode, cmd)    191     
> return 0    192
> CalledProcessError: Command '['/usr/local/envs/py2env/bin/python', '-m', 
> 'pip', 'install', '--download', '/tmp/tmp_B0gnK', 
> 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']' 
> returned non-zero exit status 2
>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>
> p.co <https://www.meetup.com/Deep-Learning-In-Production/>
> m/Deep-Learning-In-Production/
> <https://www.meetup.com/Deep-Learning-In-Production/>
>
>
>

-- 
Eila
www.orielresearch.org
https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>p.co
<https://www.meetup.com/Deep-Learning-In-Production/>
m/Deep-Learning-In-Production/
<https://www.meetup.com/Deep-Learning-In-Production/>

Reply via email to