[ https://issues.apache.org/jira/browse/BEAM-6580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-6580: -------------------------------- Labels: (was: stale-P2) > Beam 2.10.0 RC1 > --------------- > > Key: BEAM-6580 > URL: https://issues.apache.org/jira/browse/BEAM-6580 > Project: Beam > Issue Type: Test > Components: io-ideas > Affects Versions: 2.10.0 > Reporter: Raj Subramani > Priority: P3 > > I was testing beam.io.WriteToText on RC1 of 2.10 > This addresses the writing to CMEK GCS buckets (which currently fails due to > failure in the copy process from staging to CMEK bucket - remedy was to > change "copy" to "rewrite") > As per this commit changes: > [https://github.com/apache/beam/commit/e6e85edbeade3a4c038aca85821d2b265ac33909#diff-246c5b6386cc533fbabca3b397ca3c17] > StorageObjectsCopyRequest > was being replaced with > StorageObjectsRewriteRequest > > I downloaded RC1 of 2.10 and tried the following code: > def runPipeline(self): > options = > pipeline.PipelineOptions(self.__pipelineargs).view_as(TestPipelineOptions) > with (beam.Pipeline(options=options)) as p: > textPColl = p | \ > 'read text from GCS bucket' >> > beam.io.ReadFromText(options.testfile) > textPColl | 'write to bucket' >> > beam.io.WriteToText(options.outputfile) > But got the following exception > INFO:root:2019-02-04T09:17:13.452Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 642, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 172, in execute > op.start() > File "dataflow_worker/native_operations.py", line 38, in > dataflow_worker.native_operations.NativeReadOperation.start > def start(self): > File "dataflow_worker/native_operations.py", line 39, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.scoped_start_state: > File "dataflow_worker/native_operations.py", line 44, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.spec.source.reader() as reader: > File "dataflow_worker/native_operations.py", line 54, in > dataflow_worker.native_operations.NativeReadOperation.start > self.output(windowed_value) > File "apache_beam/runners/worker/operations.py", line 183, in > apache_beam.runners.worker.operations.Operation.output > cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) > File "apache_beam/runners/worker/operations.py", line 89, in > apache_beam.runners.worker.operations.ConsumerSet.receive > cython.cast(Operation, consumer).process(windowed_value) > File "apache_beam/runners/worker/operations.py", line 497, in > apache_beam.runners.worker.operations.DoOperation.process > with self.scoped_process_state: > File "apache_beam/runners/worker/operations.py", line 498, in > apache_beam.runners.worker.operations.DoOperation.process > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 680, in > apache_beam.runners.common.DoFnRunner.receive > self.process(windowed_value) > File "apache_beam/runners/common.py", line 686, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 724, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 684, in > apache_beam.runners.common.DoFnRunner.process > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 535, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_per_window( > File "apache_beam/runners/common.py", line 604, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > output_processor.process_outputs( > File "apache_beam/runners/common.py", line 755, in > apache_beam.runners.common._OutputProcessor.process_outputs > def process_outputs(self, windowed_input_element, results): > File "apache_beam/runners/common.py", line 770, in > apache_beam.runners.common._OutputProcessor.process_outputs > for result in results: > File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line > 1077, in <genexpr> > window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 321, in finalize_write > 'Encountered exceptions in finalize_write: %s' % all_exceptions) > Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), > HttpBadRequestError(), HttpBadRequestError()] [while running 'write to > bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite'] > INFO:root:2019-02-04T09:17:18.376Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 642, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 172, in execute > op.start() > File "dataflow_worker/native_operations.py", line 38, in > dataflow_worker.native_operations.NativeReadOperation.start > def start(self): > File "dataflow_worker/native_operations.py", line 39, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.scoped_start_state: > File "dataflow_worker/native_operations.py", line 44, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.spec.source.reader() as reader: > File "dataflow_worker/native_operations.py", line 54, in > dataflow_worker.native_operations.NativeReadOperation.start > self.output(windowed_value) > File "apache_beam/runners/worker/operations.py", line 183, in > apache_beam.runners.worker.operations.Operation.output > cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) > File "apache_beam/runners/worker/operations.py", line 89, in > apache_beam.runners.worker.operations.ConsumerSet.receive > cython.cast(Operation, consumer).process(windowed_value) > File "apache_beam/runners/worker/operations.py", line 497, in > apache_beam.runners.worker.operations.DoOperation.process > with self.scoped_process_state: > File "apache_beam/runners/worker/operations.py", line 498, in > apache_beam.runners.worker.operations.DoOperation.process > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 680, in > apache_beam.runners.common.DoFnRunner.receive > self.process(windowed_value) > File "apache_beam/runners/common.py", line 686, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 724, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 684, in > apache_beam.runners.common.DoFnRunner.process > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 535, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_per_window( > File "apache_beam/runners/common.py", line 604, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > output_processor.process_outputs( > File "apache_beam/runners/common.py", line 755, in > apache_beam.runners.common._OutputProcessor.process_outputs > def process_outputs(self, windowed_input_element, results): > File "apache_beam/runners/common.py", line 770, in > apache_beam.runners.common._OutputProcessor.process_outputs > for result in results: > File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line > 1077, in <genexpr> > window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 321, in finalize_write > 'Encountered exceptions in finalize_write: %s' % all_exceptions) > Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), > HttpBadRequestError(), HttpBadRequestError()] [while running 'write to > bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite'] > INFO:root:2019-02-04T09:17:23.304Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 642, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 172, in execute > op.start() > File "dataflow_worker/native_operations.py", line 38, in > dataflow_worker.native_operations.NativeReadOperation.start > def start(self): > File "dataflow_worker/native_operations.py", line 39, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.scoped_start_state: > File "dataflow_worker/native_operations.py", line 44, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.spec.source.reader() as reader: > File "dataflow_worker/native_operations.py", line 54, in > dataflow_worker.native_operations.NativeReadOperation.start > self.output(windowed_value) > File "apache_beam/runners/worker/operations.py", line 183, in > apache_beam.runners.worker.operations.Operation.output > cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) > File "apache_beam/runners/worker/operations.py", line 89, in > apache_beam.runners.worker.operations.ConsumerSet.receive > cython.cast(Operation, consumer).process(windowed_value) > File "apache_beam/runners/worker/operations.py", line 497, in > apache_beam.runners.worker.operations.DoOperation.process > with self.scoped_process_state: > File "apache_beam/runners/worker/operations.py", line 498, in > apache_beam.runners.worker.operations.DoOperation.process > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 680, in > apache_beam.runners.common.DoFnRunner.receive > self.process(windowed_value) > File "apache_beam/runners/common.py", line 686, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 724, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 684, in > apache_beam.runners.common.DoFnRunner.process > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 535, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_per_window( > File "apache_beam/runners/common.py", line 604, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > output_processor.process_outputs( > File "apache_beam/runners/common.py", line 755, in > apache_beam.runners.common._OutputProcessor.process_outputs > def process_outputs(self, windowed_input_element, results): > File "apache_beam/runners/common.py", line 770, in > apache_beam.runners.common._OutputProcessor.process_outputs > for result in results: > File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line > 1077, in <genexpr> > window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 321, in finalize_write > 'Encountered exceptions in finalize_write: %s' % all_exceptions) > Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), > HttpBadRequestError(), HttpBadRequestError()] [while running 'write to > bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite'] > INFO:root:2019-02-04T09:17:28.056Z: JOB_MESSAGE_ERROR: Traceback (most recent > call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 642, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 172, in execute > op.start() > File "dataflow_worker/native_operations.py", line 38, in > dataflow_worker.native_operations.NativeReadOperation.start > def start(self): > File "dataflow_worker/native_operations.py", line 39, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.scoped_start_state: > File "dataflow_worker/native_operations.py", line 44, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.spec.source.reader() as reader: > File "dataflow_worker/native_operations.py", line 54, in > dataflow_worker.native_operations.NativeReadOperation.start > self.output(windowed_value) > File "apache_beam/runners/worker/operations.py", line 183, in > apache_beam.runners.worker.operations.Operation.output > cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) > File "apache_beam/runners/worker/operations.py", line 89, in > apache_beam.runners.worker.operations.ConsumerSet.receive > cython.cast(Operation, consumer).process(windowed_value) > File "apache_beam/runners/worker/operations.py", line 497, in > apache_beam.runners.worker.operations.DoOperation.process > with self.scoped_process_state: > File "apache_beam/runners/worker/operations.py", line 498, in > apache_beam.runners.worker.operations.DoOperation.process > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 680, in > apache_beam.runners.common.DoFnRunner.receive > self.process(windowed_value) > File "apache_beam/runners/common.py", line 686, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 724, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 684, in > apache_beam.runners.common.DoFnRunner.process > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 535, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_per_window( > File "apache_beam/runners/common.py", line 604, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > output_processor.process_outputs( > File "apache_beam/runners/common.py", line 755, in > apache_beam.runners.common._OutputProcessor.process_outputs > def process_outputs(self, windowed_input_element, results): > File "apache_beam/runners/common.py", line 770, in > apache_beam.runners.common._OutputProcessor.process_outputs > for result in results: > File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line > 1077, in <genexpr> > window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 321, in finalize_write > 'Encountered exceptions in finalize_write: %s' % all_exceptions) > Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), > HttpBadRequestError(), HttpBadRequestError()] [while running 'write to > bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite'] > INFO:root:2019-02-04T09:17:28.086Z: JOB_MESSAGE_DEBUG: Executing failure step > failure15 > INFO:root:2019-02-04T09:17:28.101Z: JOB_MESSAGE_ERROR: Workflow failed. > Causes: S13:write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite > failed., A work item was attempted 4 times without success. Each time the > worker eventually lost contact with the service. The work item was attempted > on: > rajsubtest-02040113-nvra-harness-t62t, > rajsubtest-02040113-nvra-harness-t62t, > rajsubtest-02040113-nvra-harness-t62t, > rajsubtest-02040113-nvra-harness-t62t > INFO:root:2019-02-04T09:17:28.794Z: JOB_MESSAGE_DETAILED: Cleaning up. > INFO:root:2019-02-04T09:17:28.834Z: JOB_MESSAGE_DEBUG: Starting worker pool > teardown. > INFO:root:2019-02-04T09:17:28.847Z: JOB_MESSAGE_BASIC: Stopping worker pool... > INFO:root:2019-02-04T09:18:21.451Z: JOB_MESSAGE_DETAILED: Autoscaling: > Resized worker pool from 1 to 0. > INFO:root:2019-02-04T09:18:21.468Z: JOB_MESSAGE_DETAILED: Autoscaling: Would > further reduce the number of workers but reached the minimum number allowed > for the job. > INFO:root:2019-02-04T09:18:21.496Z: JOB_MESSAGE_BASIC: Worker pool stopped. > INFO:root:2019-02-04T09:18:21.514Z: JOB_MESSAGE_DEBUG: Tearing down pending > resources... > INFO:root:Job 2019-02-04_01_13_57-9915685959768429617 is in state > JOB_STATE_FAILED > Traceback (most recent call last): > File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 53, in > <module> > main() > File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 11, in main > testdf.runPipeline() > File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 29, in > runPipeline > textPColl | 'write to bucket' >> beam.io.WriteToText(options.outputfile) > File > "C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\pipeline.py", > line 425, in __exit__ > self.run().wait_until_finish() > File > "C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", > line 1186, in wait_until_finish > (self.state, getattr(self._runner, 'last_error_msg', None)), self) > apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: > Dataflow pipeline failed. State: FAILED, Error: > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 642, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 172, in execute > op.start() > File "dataflow_worker/native_operations.py", line 38, in > dataflow_worker.native_operations.NativeReadOperation.start > def start(self): > File "dataflow_worker/native_operations.py", line 39, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.scoped_start_state: > File "dataflow_worker/native_operations.py", line 44, in > dataflow_worker.native_operations.NativeReadOperation.start > with self.spec.source.reader() as reader: > File "dataflow_worker/native_operations.py", line 54, in > dataflow_worker.native_operations.NativeReadOperation.start > self.output(windowed_value) > File "apache_beam/runners/worker/operations.py", line 183, in > apache_beam.runners.worker.operations.Operation.output > cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) > File "apache_beam/runners/worker/operations.py", line 89, in > apache_beam.runners.worker.operations.ConsumerSet.receive > cython.cast(Operation, consumer).process(windowed_value) > File "apache_beam/runners/worker/operations.py", line 497, in > apache_beam.runners.worker.operations.DoOperation.process > with self.scoped_process_state: > File "apache_beam/runners/worker/operations.py", line 498, in > apache_beam.runners.worker.operations.DoOperation.process > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 680, in > apache_beam.runners.common.DoFnRunner.receive > self.process(windowed_value) > File "apache_beam/runners/common.py", line 686, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 724, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 684, in > apache_beam.runners.common.DoFnRunner.process > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 535, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_per_window( > File "apache_beam/runners/common.py", line 604, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > output_processor.process_outputs( > File "apache_beam/runners/common.py", line 755, in > apache_beam.runners.common._OutputProcessor.process_outputs > def process_outputs(self, windowed_input_element, results): > File "apache_beam/runners/common.py", line 770, in > apache_beam.runners.common._OutputProcessor.process_outputs > for result in results: > File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line > 1077, in <genexpr> > window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 321, in finalize_write > 'Encountered exceptions in finalize_write: %s' % all_exceptions) > Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), > HttpBadRequestError(), HttpBadRequestError()] [while running 'write to > bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite'] > > I presume this is still work in progress > -- This message was sent by Atlassian Jira (v8.3.4#803005)