Hello all, Exploring that issue (Local runner - works great and Dataflow fails), there might be a mismatch between the apache_beam version and the dataflow version
Please let me know what your thoughts are. if it is a version issue, what updates should be executed? how do I cover the installation on the datalab VM and the Google Cloud Platform. Running the following command / or a different command on the shell? on datalab? I tried running this on the datalab and it didnt solve the issue (*see below the full logs report*) pip install --upgrade apache_beam google-cloud-dataflow Please advice. Thanks, Eila *All logs:* INFO:root:Staging the SDK tarball from PyPI to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', '-m', 'pip', 'install', '--download', '/tmp/tmp5MM5wr', 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps'] INFO:root:file copy from /tmp/tmp5MM5wr/google-cloud-dataflow-2.0.0.tar.gz to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar. INFO:oauth2client.client:Attempting refresh to obtain initial access_token INFO:oauth2client.client:Attempting refresh to obtain initial access_token INFO:root:Create job: <Job createTime: u'2018-06-21T16:31:51.304121Z' currentStateTime: u'1970-01-01T00:00:00Z' id: u'2018-06-21_09_31_50-17545183031487377678' location: u'us-central1' name: u'label-archs4-tsv' projectId: u'orielresearch-188115' stageStates: [] steps: [] tempFiles: [] type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> INFO:root:Created job with id: [2018-06-21_09_31_50-17545183031487377678] INFO:root:To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/orielresearch-188115/dataflow/job/2018-06-21_09_31_50-17545183031487377678 INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_PENDING INFO:root:2018-06-21T16:31:50.476Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2018-06-21_09_31_50-17545183031487377678. The number of workers will be between 1 and 1000. INFO:root:2018-06-21T16:31:50.506Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2018-06-21_09_31_50-17545183031487377678. INFO:root:2018-06-21T16:31:53.079Z: JOB_MESSAGE_DETAILED: Checking required Cloud APIs are enabled. INFO:root:2018-06-21T16:31:53.385Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account. INFO:root:2018-06-21T16:31:54.161Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-1 in us-central1-b. INFO:root:2018-06-21T16:31:54.910Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey operations into optimizable parts. INFO:root:2018-06-21T16:31:54.936Z: JOB_MESSAGE_DEBUG: Combiner lifting skipped for step writing to TSV files/Write/WriteImpl/GroupByKey: GroupByKey not followed by a combiner. INFO:root:2018-06-21T16:31:54.968Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations into optimizable parts. INFO:root:2018-06-21T16:31:54.992Z: JOB_MESSAGE_DETAILED: Lifting ValueCombiningMappingFns into MergeBucketsMappingFns INFO:root:2018-06-21T16:31:55.056Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner information. INFO:root:2018-06-21T16:31:55.168Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo, Read, Write, and Flatten operations INFO:root:2018-06-21T16:31:55.195Z: JOB_MESSAGE_DETAILED: Fusing consumer create more columns into Extract the rows from dataframe INFO:root:2018-06-21T16:31:55.221Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/Reify into writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn) INFO:root:2018-06-21T16:31:55.244Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/Write into writing to TSV files/Write/WriteImpl/GroupByKey/Reify INFO:root:2018-06-21T16:31:55.271Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/WriteBundles/Do into writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow INFO:root:2018-06-21T16:31:55.303Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) into create more columns INFO:root:2018-06-21T16:31:55.328Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn) into writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) INFO:root:2018-06-21T16:31:55.341Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow into writing to TSV files/Write/WriteImpl/GroupByKey/Read INFO:root:2018-06-21T16:31:55.365Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/InitializeWrite into writing to TSV files/Write/WriteImpl/DoOnce/Read INFO:root:2018-06-21T16:31:55.396Z: JOB_MESSAGE_DEBUG: Workflow config is missing a default resource spec. INFO:root:2018-06-21T16:31:55.432Z: JOB_MESSAGE_DEBUG: Adding StepResource setup and teardown to workflow graph. INFO:root:2018-06-21T16:31:55.461Z: JOB_MESSAGE_DEBUG: Adding workflow start and stop steps. INFO:root:2018-06-21T16:31:55.486Z: JOB_MESSAGE_DEBUG: Assigning stage ids. INFO:root:2018-06-21T16:31:55.641Z: JOB_MESSAGE_DEBUG: Executing wait step start15 INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_RUNNING INFO:root:2018-06-21T16:31:55.701Z: JOB_MESSAGE_BASIC: Executing operation writing to TSV files/Write/WriteImpl/DoOnce/Read+writing to TSV files/Write/WriteImpl/InitializeWrite INFO:root:2018-06-21T16:31:55.727Z: JOB_MESSAGE_BASIC: Executing operation writing to TSV files/Write/WriteImpl/GroupByKey/Create INFO:root:2018-06-21T16:31:55.739Z: JOB_MESSAGE_DEBUG: Starting worker pool setup. INFO:root:2018-06-21T16:31:55.753Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-b... INFO:root:2018-06-21T16:31:55.839Z: JOB_MESSAGE_DEBUG: Value "writing to TSV files/Write/WriteImpl/GroupByKey/Session" materialized. INFO:root:2018-06-21T16:31:55.901Z: JOB_MESSAGE_BASIC: Executing operation Extract the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write INFO:root:2018-06-21T16:31:56.332Z: JOB_MESSAGE_BASIC: BigQuery export job "dataflow_job_576766793008965363" started. You can check its status with the bq tool: "bq show -j --project_id=orielresearch-188115 dataflow_job_576766793008965363". INFO:root:2018-06-21T16:32:03.683Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently running step(s). INFO:root:2018-06-21T16:32:14.181Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently running step(s). INFO:root:2018-06-21T16:32:26.827Z: JOB_MESSAGE_DETAILED: BigQuery export job progress: "dataflow_job_576766793008965363" observed total of 1 exported files thus far. INFO:root:2018-06-21T16:32:26.850Z: JOB_MESSAGE_BASIC: BigQuery export job finished: "dataflow_job_576766793008965363" INFO:root:2018-06-21T16:32:33.078Z: JOB_MESSAGE_DETAILED: Workers have started successfully. INFO:root:2018-06-21T16:35:35.511Z: JOB_MESSAGE_ERROR: Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute op.start() File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680) def start(self): File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574) with self.scoped_start_state: File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775) pickler.loads(self.spec.serialized_fn)) File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads return dill.loads(s) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads return load(file) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load obj = pik.load() File "/usr/lib/python2.7/pickle.py", line 858, in load dispatch[key](self) File "/usr/lib/python2.7/pickle.py", line 1090, in load_global klass = self.find_class(module, name) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class return StockUnpickler.find_class(self, module, name) File "/usr/lib/python2.7/pickle.py", line 1124, in find_class __import__(module) ImportError: No module named indexes.base INFO:root:2018-06-21T16:35:38.897Z: JOB_MESSAGE_ERROR: Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute op.start() File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680) def start(self): File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574) with self.scoped_start_state: File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775) pickler.loads(self.spec.serialized_fn)) File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads return dill.loads(s) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads return load(file) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load obj = pik.load() File "/usr/lib/python2.7/pickle.py", line 858, in load dispatch[key](self) File "/usr/lib/python2.7/pickle.py", line 1090, in load_global klass = self.find_class(module, name) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class return StockUnpickler.find_class(self, module, name) File "/usr/lib/python2.7/pickle.py", line 1124, in find_class __import__(module) ImportError: No module named indexes.base INFO:root:2018-06-21T16:35:42.245Z: JOB_MESSAGE_ERROR: Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute op.start() File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680) def start(self): File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574) with self.scoped_start_state: File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775) pickler.loads(self.spec.serialized_fn)) File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads return dill.loads(s) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads return load(file) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load obj = pik.load() File "/usr/lib/python2.7/pickle.py", line 858, in load dispatch[key](self) File "/usr/lib/python2.7/pickle.py", line 1090, in load_global klass = self.find_class(module, name) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class return StockUnpickler.find_class(self, module, name) File "/usr/lib/python2.7/pickle.py", line 1124, in find_class __import__(module) ImportError: No module named indexes.base INFO:root:2018-06-21T16:35:45.619Z: JOB_MESSAGE_ERROR: Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute op.start() File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680) def start(self): File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574) with self.scoped_start_state: File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775) pickler.loads(self.spec.serialized_fn)) File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads return dill.loads(s) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads return load(file) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load obj = pik.load() File "/usr/lib/python2.7/pickle.py", line 858, in load dispatch[key](self) File "/usr/lib/python2.7/pickle.py", line 1090, in load_global klass = self.find_class(module, name) File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class return StockUnpickler.find_class(self, module, name) File "/usr/lib/python2.7/pickle.py", line 1124, in find_class __import__(module) ImportError: No module named indexes.base INFO:root:2018-06-21T16:35:45.668Z: JOB_MESSAGE_DEBUG: Executing failure step failure14 INFO:root:2018-06-21T16:35:45.695Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: S04:Extract the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: label-archs4-tsv-06210931-a4r1-harness-rlqz, label-archs4-tsv-06210931-a4r1-harness-rlqz, label-archs4-tsv-06210931-a4r1-harness-rlqz, label-archs4-tsv-06210931-a4r1-harness-rlqz INFO:root:2018-06-21T16:35:45.799Z: JOB_MESSAGE_DETAILED: Cleaning up. INFO:root:2018-06-21T16:35:46Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown. INFO:root:2018-06-21T16:35:46.027Z: JOB_MESSAGE_BASIC: Stopping worker pool... On Wed, Jun 20, 2018 at 5:02 PM, OrielResearch Eila Arich-Landkof < e...@orielresearch.org> wrote: > Hello, > > I am running the following pipeline on the local runner with no issues. > > logging.info('Define the pipeline') > p = beam.Pipeline(options=options) > samplePath = outputPath > ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read( > beam.io.BigQuerySource('archs4.Debug_annotation')) > | "create more columns" >> beam.ParDo( > CreateColForSampleFn(colListSubset,outputPath))) > (ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs:// > archs4/output/dataExploration.txt',file_name_suffix='.tsv', > num_shards=1,append_trailing_newlines=True,header=colListStrHeader)) > > > Running on Dataflow fires the below error. I don't have any idea where to > look for the issue. The error is not pointing to my pipeline code but to > apache beam modules. > I will try debugging using elimination. Please let me know if you have any > direction for me. > > Many thanks, > Eila > > > ====================================================== > > DataflowRuntimeExceptionTraceback (most recent call > last)<ipython-input-151-1e5aeb8b7d9b> in <module>()----> 1 > p.run().wait_until_finish() > /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc > in wait_until_finish(self, duration) 776 raise > DataflowRuntimeException( 777 'Dataflow pipeline failed. > State: %s, Error:\n%s' %--> 778 (self.state, > getattr(self._runner, 'last_error_msg', None)), self) 779 return > self.state 780 > DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File "dataflow_worker/operations.py", line 283, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10680) > def start(self): > File "dataflow_worker/operations.py", line 284, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:10574) > with self.scoped_start_state: > File "dataflow_worker/operations.py", line 289, in > dataflow_worker.operations.DoOperation.start > (dataflow_worker/operations.c:9775) > pickler.loads(self.spec.serialized_fn)) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", > line 225, in loads > return dill.loads(s) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in > loads > return load(file) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in > load > obj = pik.load() > File "/usr/lib/python2.7/pickle.py", line 858, in load > dispatch[key](self) > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global > klass = self.find_class(module, name) > File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in > find_class > return StockUnpickler.find_class(self, module, name) > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > __import__(module) > ImportError: No module named indexes.base > > ====================================================== > > -- > Eila > www.orielresearch.org > https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/> > p.co <https://www.meetup.com/Deep-Learning-In-Production/>m/Deep- > Learning-In-Production/ > <https://www.meetup.com/Deep-Learning-In-Production/> > > > -- Eila www.orielresearch.org https://www.meetu <https://www.meetup.com/Deep-Learning-In-Production/>p.co <https://www.meetup.com/Deep-Learning-In-Production/> m/Deep-Learning-In-Production/ <https://www.meetup.com/Deep-Learning-In-Production/>