Hello all,

Exploring that issue (Local runner - works great and Dataflow fails), there
might be a mismatch between the apache_beam version and the dataflow version

Please let me know what your thoughts are. if it is a version issue, what
updates should be executed? how do I cover the installation on the datalab
VM and the Google Cloud Platform.

Running the following command / or a different command on the shell? on
datalab?

I tried running this on the datalab and it didnt solve the issue (*see
below the full logs report*)

pip install --upgrade apache_beam google-cloud-dataflow

Please advice.

Thanks,
Eila


*All logs:*


INFO:root:Staging the SDK tarball from PyPI to
gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar
INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python',
'-m', 'pip', 'install', '--download', '/tmp/tmp5MM5wr',
'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']
INFO:root:file copy from
/tmp/tmp5MM5wr/google-cloud-dataflow-2.0.0.tar.gz to
gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar.
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Create job: <Job
 createTime: u'2018-06-21T16:31:51.304121Z'
 currentStateTime: u'1970-01-01T00:00:00Z'
 id: u'2018-06-21_09_31_50-17545183031487377678'
 location: u'us-central1'
 name: u'label-archs4-tsv'
 projectId: u'orielresearch-188115'
 stageStates: []
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:root:Created job with id: [2018-06-21_09_31_50-17545183031487377678]
INFO:root:To access the Dataflow monitoring console, please navigate
to 
https://console.developers.google.com/project/orielresearch-188115/dataflow/job/2018-06-21_09_31_50-17545183031487377678
INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state
JOB_STATE_PENDING
INFO:root:2018-06-21T16:31:50.476Z: JOB_MESSAGE_DETAILED: Autoscaling
is enabled for job 2018-06-21_09_31_50-17545183031487377678. The
number of workers will be between 1 and 1000.
INFO:root:2018-06-21T16:31:50.506Z: JOB_MESSAGE_DETAILED: Autoscaling
was automatically enabled for job
2018-06-21_09_31_50-17545183031487377678.
INFO:root:2018-06-21T16:31:53.079Z: JOB_MESSAGE_DETAILED: Checking
required Cloud APIs are enabled.
INFO:root:2018-06-21T16:31:53.385Z: JOB_MESSAGE_DETAILED: Checking
permissions granted to controller Service Account.
INFO:root:2018-06-21T16:31:54.161Z: JOB_MESSAGE_BASIC: Worker
configuration: n1-standard-1 in us-central1-b.
INFO:root:2018-06-21T16:31:54.910Z: JOB_MESSAGE_DETAILED: Expanding
CoGroupByKey operations into optimizable parts.
INFO:root:2018-06-21T16:31:54.936Z: JOB_MESSAGE_DEBUG: Combiner
lifting skipped for step writing to TSV
files/Write/WriteImpl/GroupByKey: GroupByKey not followed by a
combiner.
INFO:root:2018-06-21T16:31:54.968Z: JOB_MESSAGE_DETAILED: Expanding
GroupByKey operations into optimizable parts.
INFO:root:2018-06-21T16:31:54.992Z: JOB_MESSAGE_DETAILED: Lifting
ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:2018-06-21T16:31:55.056Z: JOB_MESSAGE_DEBUG: Annotating
graph with Autotuner information.
INFO:root:2018-06-21T16:31:55.168Z: JOB_MESSAGE_DETAILED: Fusing
adjacent ParDo, Read, Write, and Flatten operations
INFO:root:2018-06-21T16:31:55.195Z: JOB_MESSAGE_DETAILED: Fusing
consumer create more columns into Extract the rows from dataframe
INFO:root:2018-06-21T16:31:55.221Z: JOB_MESSAGE_DETAILED: Fusing
consumer writing to TSV files/Write/WriteImpl/GroupByKey/Reify into
writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)
INFO:root:2018-06-21T16:31:55.244Z: JOB_MESSAGE_DETAILED: Fusing
consumer writing to TSV files/Write/WriteImpl/GroupByKey/Write into
writing to TSV files/Write/WriteImpl/GroupByKey/Reify
INFO:root:2018-06-21T16:31:55.271Z: JOB_MESSAGE_DETAILED: Fusing
consumer writing to TSV files/Write/WriteImpl/WriteBundles/Do into
writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow
INFO:root:2018-06-21T16:31:55.303Z: JOB_MESSAGE_DETAILED: Fusing
consumer writing to TSV files/Write/WriteImpl/Map(<lambda at
iobase.py:895>) into create more columns
INFO:root:2018-06-21T16:31:55.328Z: JOB_MESSAGE_DETAILED: Fusing
consumer writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)
into writing to TSV files/Write/WriteImpl/Map(<lambda at
iobase.py:895>)
INFO:root:2018-06-21T16:31:55.341Z: JOB_MESSAGE_DETAILED: Fusing
consumer writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow
into writing to TSV files/Write/WriteImpl/GroupByKey/Read
INFO:root:2018-06-21T16:31:55.365Z: JOB_MESSAGE_DETAILED: Fusing
consumer writing to TSV files/Write/WriteImpl/InitializeWrite into
writing to TSV files/Write/WriteImpl/DoOnce/Read
INFO:root:2018-06-21T16:31:55.396Z: JOB_MESSAGE_DEBUG: Workflow config
is missing a default resource spec.
INFO:root:2018-06-21T16:31:55.432Z: JOB_MESSAGE_DEBUG: Adding
StepResource setup and teardown to workflow graph.
INFO:root:2018-06-21T16:31:55.461Z: JOB_MESSAGE_DEBUG: Adding workflow
start and stop steps.
INFO:root:2018-06-21T16:31:55.486Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
INFO:root:2018-06-21T16:31:55.641Z: JOB_MESSAGE_DEBUG: Executing wait
step start15
INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state
JOB_STATE_RUNNING
INFO:root:2018-06-21T16:31:55.701Z: JOB_MESSAGE_BASIC: Executing
operation writing to TSV files/Write/WriteImpl/DoOnce/Read+writing to
TSV files/Write/WriteImpl/InitializeWrite
INFO:root:2018-06-21T16:31:55.727Z: JOB_MESSAGE_BASIC: Executing
operation writing to TSV files/Write/WriteImpl/GroupByKey/Create
INFO:root:2018-06-21T16:31:55.739Z: JOB_MESSAGE_DEBUG: Starting worker
pool setup.
INFO:root:2018-06-21T16:31:55.753Z: JOB_MESSAGE_BASIC: Starting 1
workers in us-central1-b...
INFO:root:2018-06-21T16:31:55.839Z: JOB_MESSAGE_DEBUG: Value "writing
to TSV files/Write/WriteImpl/GroupByKey/Session" materialized.
INFO:root:2018-06-21T16:31:55.901Z: JOB_MESSAGE_BASIC: Executing
operation Extract the rows from dataframe+create more columns+writing
to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to
TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV
files/Write/WriteImpl/GroupByKey/Reify+writing to TSV
files/Write/WriteImpl/GroupByKey/Write
INFO:root:2018-06-21T16:31:56.332Z: JOB_MESSAGE_BASIC: BigQuery export
job "dataflow_job_576766793008965363" started. You can check its
status with the bq tool: "bq show -j --project_id=orielresearch-188115
dataflow_job_576766793008965363".
INFO:root:2018-06-21T16:32:03.683Z: JOB_MESSAGE_DETAILED: Autoscaling:
Raised the number of workers to 0 based on the rate of progress in the
currently running step(s).
INFO:root:2018-06-21T16:32:14.181Z: JOB_MESSAGE_DETAILED: Autoscaling:
Raised the number of workers to 1 based on the rate of progress in the
currently running step(s).
INFO:root:2018-06-21T16:32:26.827Z: JOB_MESSAGE_DETAILED: BigQuery
export job progress: "dataflow_job_576766793008965363" observed total
of 1 exported files thus far.
INFO:root:2018-06-21T16:32:26.850Z: JOB_MESSAGE_BASIC: BigQuery export
job finished: "dataflow_job_576766793008965363"
INFO:root:2018-06-21T16:32:33.078Z: JOB_MESSAGE_DETAILED: Workers have
started successfully.
INFO:root:2018-06-21T16:35:35.511Z: JOB_MESSAGE_ERROR: Traceback (most
recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line
423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:38.897Z: JOB_MESSAGE_ERROR: Traceback (most
recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line
423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:42.245Z: JOB_MESSAGE_ERROR: Traceback (most
recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line
423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:45.619Z: JOB_MESSAGE_ERROR: Traceback (most
recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in
dataflow_worker.operations.DoOperation.start
(dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py",
line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line
423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:45.668Z: JOB_MESSAGE_DEBUG: Executing
failure step failure14
INFO:root:2018-06-21T16:35:45.695Z: JOB_MESSAGE_ERROR: Workflow
failed. Causes: S04:Extract the rows from dataframe+create more
columns+writing to TSV files/Write/WriteImpl/Map(<lambda at
iobase.py:895>)+writing to TSV
files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV
files/Write/WriteImpl/GroupByKey/Reify+writing to TSV
files/Write/WriteImpl/GroupByKey/Write failed., A work item was
attempted 4 times without success. Each time the worker eventually
lost contact with the service. The work item was attempted on:
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz
INFO:root:2018-06-21T16:35:45.799Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:root:2018-06-21T16:35:46Z: JOB_MESSAGE_DEBUG: Starting worker
pool teardown.
INFO:root:2018-06-21T16:35:46.027Z: JOB_MESSAGE_BASIC: Stopping worker pool...



On Wed, Jun 20, 2018 at 5:02 PM, OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hello,
>
> I am running the following pipeline on the local runner with no issues.
>
> logging.info('Define the pipeline')
> p =  beam.Pipeline(options=options)
> samplePath = outputPath
> ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read(
> beam.io.BigQuerySource('archs4.Debug_annotation'))
>                  | "create more columns" >> beam.ParDo(
> CreateColForSampleFn(colListSubset,outputPath)))
> (ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs://
> archs4/output/dataExploration.txt',file_name_suffix='.tsv',
> num_shards=1,append_trailing_newlines=True,header=colListStrHeader))
>
>
> Running on Dataflow fires the below error. I don't have any idea where to
> look for the issue. The error is not pointing to my pipeline code but to
> apache beam modules.
> I will try debugging using elimination. Please let me know if you have any
> direction for me.
>
> Many thanks,
> Eila
>
>
> ======================================================
>
> DataflowRuntimeExceptionTraceback (most recent call 
> last)<ipython-input-151-1e5aeb8b7d9b> in <module>()----> 1 
> p.run().wait_until_finish()
> /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc
>  in wait_until_finish(self, duration)    776         raise 
> DataflowRuntimeException(    777             'Dataflow pipeline failed. 
> State: %s, Error:\n%s' %--> 778             (self.state, 
> getattr(self._runner, 'last_error_msg', None)), self)    779     return 
> self.state    780
> DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
>     op.start()
>   File "dataflow_worker/operations.py", line 283, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10680)
>     def start(self):
>   File "dataflow_worker/operations.py", line 284, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:10574)
>     with self.scoped_start_state:
>   File "dataflow_worker/operations.py", line 289, in 
> dataflow_worker.operations.DoOperation.start 
> (dataflow_worker/operations.c:9775)
>     pickler.loads(self.spec.serialized_fn))
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", 
> line 225, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in 
> loads
>     return load(file)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in 
> load
>     obj = pik.load()
>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>     dispatch[key](self)
>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>     klass = self.find_class(module, name)
>   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in 
> find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>     __import__(module)
> ImportError: No module named indexes.base
>
> ======================================================
>
> --
> Eila
> www.orielresearch.org
> https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>
> p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep-
> Learning-In-Production/
> <https://www.meetup.com/Deep-Learning-In-Production/>
>
>
>


-- 
Eila
www.orielresearch.org
https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>p.co
<https://www.meetup.com/Deep-Learning-In-Production/>
m/Deep-Learning-In-Production/
<https://www.meetup.com/Deep-Learning-In-Production/>

Reply via email to