pawelkopec opened a new issue, #22961:
URL: https://github.com/apache/beam/issues/22961

   ### What happened?
   
   ### Problem
   
   We run a batch job that reads data from GCS, transforms them and writes 
output to BigQuery. We used Python SDK with Dataflow runner and 
apache-beam==2.39.0 package.
   
   It seems like we are silently loosing data during write to BigQuery. In the 
job view in GCP we see that over 1.7 billion records going into WriteToBigQuery 
step:
   <img width="497" alt="image" 
src="https://user-images.githubusercontent.com/23619157/187429013-3556eef0-7b80-4c44-9a51-75c16f02c456.png";>
   
   However output table contains roughly 266 mln records.
   <img width="797" alt="image" 
src="https://user-images.githubusercontent.com/23619157/187429517-68798612-974e-430c-afd2-23945abb0408.png";>
   
   Although the job has finished properly, there are two error logs from 
workers. Maybe they are related to missing data:
   ```
   Error message from worker: Traceback (most recent call last):
     File "apache_beam/runners/common.py", line 1458, in 
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
     File "apache_beam/runners/common.py", line 561, in 
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
     File "apache_beam/runners/common.py", line 566, in 
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
     File "apache_beam/runners/common.py", line 1730, in 
apache_beam.runners.common._OutputHandler.finish_bundle_outputs
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
 line 273, in finish_bundle
       writer.close()
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_tools.py", 
line 1567, in close
       self._file_handle.close()
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/filesystemio.py", line 
215, in close
       self._uploader.finish()
     File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", 
line 788, in finish
       raise self._upload_thread.last_error  # pylint: disable=raising-bad-type
     File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", 
line 761, in _start_upload
       self._client.objects.Insert(self._insert_request, upload=self._upload)
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
 line 1152, in Insert
       return self._RunMethod(
     File 
"/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 
731, in _RunMethod
       return self.ProcessHttpResponse(method_config, http_response, request)
     File 
"/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 
737, in ProcessHttpResponse
       self.__ProcessHttpResponse(method_config, http_response, request))
     File 
"/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 
603, in __ProcessHttpResponse
       raise exceptions.HttpError.FromResponse(
   apitools.base.py.exceptions.HttpError: HttpError accessing 
<https://www.googleapis.com/resumable/upload/storage/v1/b/uni_test_bucket/o?alt=json&name=tmp5%2Fbq_load%2Fc412cea93eed45ef80d00bb57c14642e%2Funi-flifo-pipelines-dev.uni_flifo_pipelines_dev.flifo-parse-2022-08-22-12-11%2F0e61d8ac-f66f-47d9-b96e-93bdbd661743&uploadType=resumable&upload_id=ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW>:
 response: <{'content-type': 'text/plain; charset=utf-8', 
'x-guploader-uploadid': 
'ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW',
 'content-length': '0', 'date': 'Mon, 22 Aug 2022 15:17:50 GMT', 'server': 
'UploadServer', 'status': '503'}>, content <>
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 284, in _execute
       response = task()
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 357, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 597, in do_instruction
       return getattr(self, request_type)(
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 635, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1009, in process_bundle
       op.finish()
     File "apache_beam/runners/worker/operations.py", line 939, in 
apache_beam.runners.worker.operations.DoOperation.finish
     File "apache_beam/runners/worker/operations.py", line 942, in 
apache_beam.runners.worker.operations.DoOperation.finish
     File "apache_beam/runners/worker/operations.py", line 943, in 
apache_beam.runners.worker.operations.DoOperation.finish
     File "apache_beam/runners/common.py", line 1479, in 
apache_beam.runners.common.DoFnRunner.finish
     File "apache_beam/runners/common.py", line 1460, in 
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
     File "apache_beam/runners/common.py", line 1507, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1458, in 
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
     File "apache_beam/runners/common.py", line 561, in 
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
     File "apache_beam/runners/common.py", line 566, in 
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
     File "apache_beam/runners/common.py", line 1730, in 
apache_beam.runners.common._OutputHandler.finish_bundle_outputs
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
 line 273, in finish_bundle
       writer.close()
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_tools.py", 
line 1567, in close
       self._file_handle.close()
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/filesystemio.py", line 
215, in close
       self._uploader.finish()
     File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", 
line 788, in finish
       raise self._upload_thread.last_error  # pylint: disable=raising-bad-type
     File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", 
line 761, in _start_upload
       self._client.objects.Insert(self._insert_request, upload=self._upload)
     File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
 line 1152, in Insert
       return self._RunMethod(
     File 
"/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 
731, in _RunMethod
       return self.ProcessHttpResponse(method_config, http_response, request)
     File 
"/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 
737, in ProcessHttpResponse
       self.__ProcessHttpResponse(method_config, http_response, request))
     File 
"/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 
603, in __ProcessHttpResponse
       raise exceptions.HttpError.FromResponse(
   RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing 
<https://www.googleapis.com/resumable/upload/storage/v1/b/uni_test_bucket/o?alt=json&name=tmp5%2Fbq_load%2Fc412cea93eed45ef80d00bb57c14642e%2Funi-flifo-pipelines-dev.uni_flifo_pipelines_dev.flifo-parse-2022-08-22-12-11%2F0e61d8ac-f66f-47d9-b96e-93bdbd661743&uploadType=resumable&upload_id=ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW>:
 response: <{'content-type': 'text/plain; charset=utf-8', 
'x-guploader-uploadid': 
'ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW',
 'content-length': '0', 'date': 'Mon, 22 Aug 2022 15:17:50 GMT', 'server': 
'UploadServer', 'status': '503'}>, content <> [while running 'Save FLIFO 
updates/Write to 
BQ/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-119']
   ```
   
   It looks like a problem with access to a GCS bucket which we use as both tmp 
and staging location. This error appears only sometimes and so far it occured 
only at the certain data volume (although I am not sure if that's related).
   
   Unfortunatelly I am not allowed to provide exact reproducible code from my 
employee, but transforms are fairly simple - reading from GCS, xml parsing, 
filtering broken records, mapping to json and uploading to BQ. Also schema is 
auto-generated from Pydantic objects, so we are quite certain that this part is 
working.
   
   From what I found [in the 
code](https://github.com/apache/beam/blob/6db98fd459a5e8b9fe7b0634fc66265e00ac9c57/sdks/python/apache_beam/io/gcp/bigquery.py#L2271-L2272),
 the default compute method for batch is WriteToBigQuery.Method.FILE_LOADS, 
which means that WriteToBigQuery uses 
[BigQueryBatchFileLoads](https://github.com/apache/beam/blob/6db98fd459a5e8b9fe7b0634fc66265e00ac9c57/sdks/python/apache_beam/io/gcp/bigquery.py#L2363).
 Unlike [retry 
strategy](https://github.com/apache/beam/blob/6db98fd459a5e8b9fe7b0634fc66265e00ac9c57/sdks/python/apache_beam/io/gcp/bigquery.py#L1920-L1948)
 used by _StreamToBigQuery in streaming pipelines, BigQueryBatchFileLoads does 
not have take an argument for failure strategy. At this point it's a bit hard 
for me to debug it further without going deep into the implementation.
   
   Any help will be appreciated.
   
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: io-py-gcp


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to