[ 
https://issues.apache.org/jira/browse/BEAM-11741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17549109#comment-17549109
 ] 

Danny McCormick commented on BEAM-11741:
----------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/20717

> apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on duplicate 
> primary key
> ---------------------------------------------------------------------------------------
>
>                 Key: BEAM-11741
>                 URL: https://issues.apache.org/jira/browse/BEAM-11741
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>    Affects Versions: 2.27.0
>         Environment: Google Cloud Platform Dataflow
>            Reporter: Nahian-Al Hasan
>            Priority: P3
>              Labels: GCP, newbie
>
> Actual Behaviour
>  The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on the 
> exception below and the entire pipeline crashes.
> Expected Behaviour
>  The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner module handles 
> exceptions gracefully and does not crash the pipeline.
> It isn't possible to implement error-handling in pipeline code. It would be 
> easier to just handle the exception inside the `process` function.
> Please see the logs below for more information.  
> {code:java}
> main.py:91: FutureWarning: ReadFromSpanner is experimental. No 
> backwards-compatibility guarantees.main.py:91: FutureWarning: ReadFromSpanner 
> is experimental. No backwards-compatibility guarantees.  sql=sqlmain.py:102: 
> FutureWarning: WriteToSpanner is experimental. No backwards-compatibility 
> guarantees.  database_id=importer_options.DEST_SPANNER_DATASET_ID,warning: 
> sdist: standard file not found: should have one of README, README.rst, 
> README.txt, README.md
> WARNING:root:Make sure that locally built Python SDK docker image has Python 
> 3.7 interpreter.Traceback (most recent call last):  File "main.py", line 110, 
> in <module>    run()  File "main.py", line 106, in run    
> result.wait_until_finish()  File 
> "/home/notion/.local/share/virtualenvs/ods-to-ods-bigquery-EZMTrMjb/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>  line 1665, in wait_until_finish    
> self)apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: 
> Dataflow pipeline failed. State: FAILED, Error:Traceback (most recent call 
> last):  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", 
> line 57, in error_remapped_callable    return callable_(*args, **kwargs)  
> File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in 
> __call__    return _end_unary_response_blocking(state, call, False, None)  
> File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 729, in 
> _end_unary_response_blocking    raise 
> _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError 
> of RPC that terminated with: status = StatusCode.ALREADY_EXISTS details = 
> "Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already 
> exists" debug_error_string = 
> "{"created":"@1612247321.800986805","description":"Error received from peer 
> ipv4:XXX.XXX.XX.XX:XXX","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Row
>  [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already 
> exists","grpc_status":6}">
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):  File "apache_beam/runners/common.py", 
> line 1239, in apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 588, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process  File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
>  line 1098, in process    batch_func(**m.kwargs)  File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", 
> line 476, in __exit__    self._batch.commit()  File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", 
> line 154, in commit    metadata=metadata,  File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
>  line 1556, in commit    request, retry=retry, timeout=timeout, 
> metadata=metadata  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", 
> line 145, in __call__    return wrapped_func(*args, **kwargs)  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, 
> in retry_wrapped_func    on_error=on_error,  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, 
> in retry_target    return target()  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 
> 214, in func_with_timeout    return func(*args, **kwargs)  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", 
> line 59, in error_remapped_callable    
> six.raise_from(exceptions.from_grpc_error(exc), exc)  File "<string>", line 
> 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row 
> [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):  File 
> "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 
> 649, in do_work    work_executor.execute()  File 
> "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 
> 179, in execute    op.start()  File "dataflow_worker/shuffle_operations.py", 
> line 63, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File 
> "dataflow_worker/shuffle_operations.py", line 64, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File 
> "dataflow_worker/shuffle_operations.py", line 79, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File 
> "dataflow_worker/shuffle_operations.py", line 80, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File 
> "dataflow_worker/shuffle_operations.py", line 84, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File 
> "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "dataflow_worker/shuffle_operations.py", line 261, in 
> dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process  
> File "dataflow_worker/shuffle_operations.py", line 268, in 
> dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process  
> File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process  File 
> "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process  File 
> "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 768, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process  File 
> "apache_beam/runners/common.py", line 891, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window  File 
> "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs  File 
> "apache_beam/runners/worker/operations.py", line 158, in 
> apache_beam.runners.worker.operations.ConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 768, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process  File 
> "apache_beam/runners/common.py", line 886, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window  File 
> "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process  File 
> "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process  File 
> "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process  File 
> "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs  File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process  File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented  File 
> "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, 
> in raise_with_traceback    raise exc.with_traceback(traceback)  File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process  File 
> "apache_beam/runners/common.py", line 588, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process  File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
>  line 1098, in process    batch_func(**m.kwargs)  File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", 
> line 476, in __exit__    self._batch.commit()  File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", 
> line 154, in commit    metadata=metadata,  File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
>  line 1556, in commit    request, retry=retry, timeout=timeout, 
> metadata=metadata  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", 
> line 145, in __call__    return wrapped_func(*args, **kwargs)  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, 
> in retry_wrapped_func    on_error=on_error,  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, 
> in retry_target    return target()  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 
> 214, in func_with_timeout    return func(*args, **kwargs)  File 
> "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", 
> line 59, in error_remapped_callable    
> six.raise_from(exceptions.from_grpc_error(exc), exc)  File "<string>", line 
> 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row 
> [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists 
> [while running 'Write Mutations to destination Spanner/Writing to spanner']
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to