[ https://issues.apache.org/jira/browse/BEAM-11741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17549109#comment-17549109 ]
Danny McCormick commented on BEAM-11741: ---------------------------------------- This issue has been migrated to https://github.com/apache/beam/issues/20717 > apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on duplicate > primary key > --------------------------------------------------------------------------------------- > > Key: BEAM-11741 > URL: https://issues.apache.org/jira/browse/BEAM-11741 > Project: Beam > Issue Type: Bug > Components: io-py-gcp > Affects Versions: 2.27.0 > Environment: Google Cloud Platform Dataflow > Reporter: Nahian-Al Hasan > Priority: P3 > Labels: GCP, newbie > > Actual Behaviour > The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on the > exception below and the entire pipeline crashes. > Expected Behaviour > The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner module handles > exceptions gracefully and does not crash the pipeline. > It isn't possible to implement error-handling in pipeline code. It would be > easier to just handle the exception inside the `process` function. > Please see the logs below for more information. > {code:java} > main.py:91: FutureWarning: ReadFromSpanner is experimental. No > backwards-compatibility guarantees.main.py:91: FutureWarning: ReadFromSpanner > is experimental. No backwards-compatibility guarantees. sql=sqlmain.py:102: > FutureWarning: WriteToSpanner is experimental. No backwards-compatibility > guarantees. database_id=importer_options.DEST_SPANNER_DATASET_ID,warning: > sdist: standard file not found: should have one of README, README.rst, > README.txt, README.md > WARNING:root:Make sure that locally built Python SDK docker image has Python > 3.7 interpreter.Traceback (most recent call last): File "main.py", line 110, > in <module> run() File "main.py", line 106, in run > result.wait_until_finish() File > "/home/notion/.local/share/virtualenvs/ods-to-ods-bigquery-EZMTrMjb/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", > line 1665, in wait_until_finish > self)apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: > Dataflow pipeline failed. State: FAILED, Error:Traceback (most recent call > last): File > "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", > line 57, in error_remapped_callable return callable_(*args, **kwargs) > File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in > __call__ return _end_unary_response_blocking(state, call, False, None) > File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 729, in > _end_unary_response_blocking raise > _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError > of RPC that terminated with: status = StatusCode.ALREADY_EXISTS details = > "Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already > exists" debug_error_string = > "{"created":"@1612247321.800986805","description":"Error received from peer > ipv4:XXX.XXX.XX.XX:XXX","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Row > [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already > exists","grpc_status":6}"> > The above exception was the direct cause of the following exception: > Traceback (most recent call last): File "apache_beam/runners/common.py", > line 1239, in apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 588, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py", > line 1098, in process batch_func(**m.kwargs) File > "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", > line 476, in __exit__ self._batch.commit() File > "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", > line 154, in commit metadata=metadata, File > "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py", > line 1556, in commit request, retry=retry, timeout=timeout, > metadata=metadata File > "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", > line 145, in __call__ return wrapped_func(*args, **kwargs) File > "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, > in retry_wrapped_func on_error=on_error, File > "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, > in retry_target return target() File > "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line > 214, in func_with_timeout return func(*args, **kwargs) File > "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", > line 59, in error_remapped_callable > six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>", line > 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row > [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists > During handling of the above exception, another exception occurred: > Traceback (most recent call last): File > "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line > 649, in do_work work_executor.execute() File > "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line > 179, in execute op.start() File "dataflow_worker/shuffle_operations.py", > line 63, in > dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File > "dataflow_worker/shuffle_operations.py", line 64, in > dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File > "dataflow_worker/shuffle_operations.py", line 79, in > dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File > "dataflow_worker/shuffle_operations.py", line 80, in > dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File > "dataflow_worker/shuffle_operations.py", line 84, in > dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File > "apache_beam/runners/worker/operations.py", line 359, in > apache_beam.runners.worker.operations.Operation.output File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "dataflow_worker/shuffle_operations.py", line 261, in > dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process > File "dataflow_worker/shuffle_operations.py", line 268, in > dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process > File "apache_beam/runners/worker/operations.py", line 359, in > apache_beam.runners.worker.operations.Operation.output File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1306, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "apache_beam/runners/common.py", line 1401, in > apache_beam.runners.common._OutputProcessor.process_outputs File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1306, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "apache_beam/runners/common.py", line 1401, in > apache_beam.runners.common._OutputProcessor.process_outputs File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1306, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 768, in > apache_beam.runners.common.PerWindowInvoker.invoke_process File > "apache_beam/runners/common.py", line 891, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File > "apache_beam/runners/common.py", line 1401, in > apache_beam.runners.common._OutputProcessor.process_outputs File > "apache_beam/runners/worker/operations.py", line 158, in > apache_beam.runners.worker.operations.ConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1306, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 768, in > apache_beam.runners.common.PerWindowInvoker.invoke_process File > "apache_beam/runners/common.py", line 886, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File > "apache_beam/runners/common.py", line 1401, in > apache_beam.runners.common._OutputProcessor.process_outputs File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1306, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "apache_beam/runners/common.py", line 1401, in > apache_beam.runners.common._OutputProcessor.process_outputs File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1306, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "apache_beam/runners/common.py", line 1401, in > apache_beam.runners.common._OutputProcessor.process_outputs File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1306, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "apache_beam/runners/common.py", line 1401, in > apache_beam.runners.common._OutputProcessor.process_outputs File > "apache_beam/runners/worker/operations.py", line 221, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive File > "apache_beam/runners/worker/operations.py", line 718, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/worker/operations.py", line 719, in > apache_beam.runners.worker.operations.DoOperation.process File > "apache_beam/runners/common.py", line 1241, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 1321, in > apache_beam.runners.common.DoFnRunner._reraise_augmented File > "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, > in raise_with_traceback raise exc.with_traceback(traceback) File > "apache_beam/runners/common.py", line 1239, in > apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 588, in > apache_beam.runners.common.SimpleInvoker.invoke_process File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py", > line 1098, in process batch_func(**m.kwargs) File > "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", > line 476, in __exit__ self._batch.commit() File > "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", > line 154, in commit metadata=metadata, File > "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py", > line 1556, in commit request, retry=retry, timeout=timeout, > metadata=metadata File > "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", > line 145, in __call__ return wrapped_func(*args, **kwargs) File > "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, > in retry_wrapped_func on_error=on_error, File > "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, > in retry_target return target() File > "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line > 214, in func_with_timeout return func(*args, **kwargs) File > "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", > line 59, in error_remapped_callable > six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>", line > 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row > [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists > [while running 'Write Mutations to destination Spanner/Writing to spanner'] > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)