pawelkopec opened a new issue, #22961: URL: https://github.com/apache/beam/issues/22961
### What happened? ### Problem We run a batch job that reads data from GCS, transforms them and writes output to BigQuery. We used Python SDK with Dataflow runner and apache-beam==2.39.0 package. It seems like we are silently loosing data during write to BigQuery. In the job view in GCP we see that over 1.7 billion records going into WriteToBigQuery step: <img width="497" alt="image" src="https://user-images.githubusercontent.com/23619157/187429013-3556eef0-7b80-4c44-9a51-75c16f02c456.png"> However output table contains roughly 266 mln records. <img width="797" alt="image" src="https://user-images.githubusercontent.com/23619157/187429517-68798612-974e-430c-afd2-23945abb0408.png"> Although the job has finished properly, there are two error logs from workers. Maybe they are related to missing data: ``` Error message from worker: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1458, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 561, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 1730, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 273, in finish_bundle writer.close() File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1567, in close self._file_handle.close() File "/usr/local/lib/python3.9/site-packages/apache_beam/io/filesystemio.py", line 215, in close self._uploader.finish() File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 788, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 761, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1152, in Insert return self._RunMethod( File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/uni_test_bucket/o?alt=json&name=tmp5%2Fbq_load%2Fc412cea93eed45ef80d00bb57c14642e%2Funi-flifo-pipelines-dev.uni_flifo_pipelines_dev.flifo-parse-2022-08-22-12-11%2F0e61d8ac-f66f-47d9-b96e-93bdbd661743&uploadType=resumable&upload_id=ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': 'ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW', 'content-length': '0', 'date': 'Mon, 22 Aug 2022 15:17:50 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1009, in process_bundle op.finish() File "apache_beam/runners/worker/operations.py", line 939, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/worker/operations.py", line 942, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/worker/operations.py", line 943, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/common.py", line 1479, in apache_beam.runners.common.DoFnRunner.finish File "apache_beam/runners/common.py", line 1460, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1458, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 561, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 1730, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 273, in finish_bundle writer.close() File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1567, in close self._file_handle.close() File "/usr/local/lib/python3.9/site-packages/apache_beam/io/filesystemio.py", line 215, in close self._uploader.finish() File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 788, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 761, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1152, in Insert return self._RunMethod( File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/uni_test_bucket/o?alt=json&name=tmp5%2Fbq_load%2Fc412cea93eed45ef80d00bb57c14642e%2Funi-flifo-pipelines-dev.uni_flifo_pipelines_dev.flifo-parse-2022-08-22-12-11%2F0e61d8ac-f66f-47d9-b96e-93bdbd661743&uploadType=resumable&upload_id=ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': 'ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW', 'content-length': '0', 'date': 'Mon, 22 Aug 2022 15:17:50 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> [while running 'Save FLIFO updates/Write to BQ/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-119'] ``` It looks like a problem with access to a GCS bucket which we use as both tmp and staging location. This error appears only sometimes and so far it occured only at the certain data volume (although I am not sure if that's related). Unfortunatelly I am not allowed to provide exact reproducible code from my employee, but transforms are fairly simple - reading from GCS, xml parsing, filtering broken records, mapping to json and uploading to BQ. Also schema is auto-generated from Pydantic objects, so we are quite certain that this part is working. From what I found [in the code](https://github.com/apache/beam/blob/6db98fd459a5e8b9fe7b0634fc66265e00ac9c57/sdks/python/apache_beam/io/gcp/bigquery.py#L2271-L2272), the default compute method for batch is WriteToBigQuery.Method.FILE_LOADS, which means that WriteToBigQuery uses [BigQueryBatchFileLoads](https://github.com/apache/beam/blob/6db98fd459a5e8b9fe7b0634fc66265e00ac9c57/sdks/python/apache_beam/io/gcp/bigquery.py#L2363). Unlike [retry strategy](https://github.com/apache/beam/blob/6db98fd459a5e8b9fe7b0634fc66265e00ac9c57/sdks/python/apache_beam/io/gcp/bigquery.py#L1920-L1948) used by _StreamToBigQuery in streaming pipelines, BigQueryBatchFileLoads does not have take an argument for failure strategy. At this point it's a bit hard for me to debug it further without going deep into the implementation. Any help will be appreciated. ### Issue Priority Priority: 2 ### Issue Component Component: io-py-gcp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
