pabloem commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r907642663
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1544,112 @@ def writer(self, test_bigquery_client=None,
buffer_size=None):
buffer_size=buffer_size)
+class WriteResult:
+ """The result of a WriteToBigQuery transform.
+ """
+ def __init__(
+ self,
+ method=None,
+ destination_load_jobid_pairs=None,
+ destination_file_pairs=None,
+ destination_copy_jobid_pairs=None,
+ failed_rows=None,
+ failed_rows_with_errors=None):
+
+ self.method: str = method
+ self._destination_load_jobid_pairs: PCollection =
destination_load_jobid_pairs
+ self._destination_file_pairs: PCollection = destination_file_pairs
+ self._destination_copy_jobid_pairs: PCollection =
destination_copy_jobid_pairs
+ self._failed_rows: PCollection = failed_rows
+ self._failed_rows_with_errors: PCollection = failed_rows_with_errors
+
+ from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+ self.attributes = {
+ BigQueryWriteFn.FAILED_ROWS: WriteResult.failed_rows,
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: WriteResult.
+ failed_rows_with_errors,
+ BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: WriteResult.
+ destination_load_jobid_pairs,
+ BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: WriteResult.
+ destination_file_pairs,
+ BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: WriteResult.
+ destination_copy_jobid_pairs,
+ }
+
+ def validate(self, method, attribute):
+ if self.method != method:
+ raise AttributeError(
+ f'Cannot get {attribute} because it is not produced '
+ f'by {self.method} write method. Note: only {method} '
+ 'produces this attribute.')
+
+ @property
+ def destination_load_jobid_pairs(self):
+ """A ``FILE_LOADS`` method attribute
+
+ Returns: A PCollection of the table destinations that were successfully
+ loaded to using the batch load API, along with the load job IDs.
+
+ Raises: AttributeError: if accessed with a write method besides
``FILE_LOADS``."""
+ self.validate('FILE_LOADS', 'DESTINATION_JOBID_PAIRS')
+
+ return self._destination_load_jobid_pairs
+
+ @property
+ def destination_file_pairs(self):
+ """A ``FILE_LOADS`` method attribute
+
+ Returns: A PCollection of the table destinations along with the
+ temp files used as sources to load from.
+
+ Raises: AttributeError: if accessed with a write method besides
``FILE_LOADS``."""
+ self.validate('FILE_LOADS', 'DESTINATION_FILE_PAIRS')
+
+ return self._destination_file_pairs
+
+ @property
+ def destination_copy_jobid_pairs(self):
+ """A ``FILE_LOADS`` method attribute
+
+ Returns: A PCollection of the table destinations that were successfully
+ copied to, along with the copy job ID.
+
+ Raises: AttributeError: if accessed with a write method besides
``FILE_LOADS``."""
+ self.validate('FILE_LOADS', 'DESTINATION_COPY_JOBID_PAIRS')
+
+ return self._destination_copy_jobid_pairs
+
+ @property
+ def failed_rows(self):
+ """A ``STREAMING_INSERTS`` method attribute
+
+ Returns: A PCollection of rows that failed when inserting to BigQuery.
+
+ Raises: AttributeError: if accessed with a write method besides
``STREAMING_INSERTS``."""
+ self.validate('STREAMING_INSERTS', 'FAILED_ROWS')
+
+ return self._failed_rows
+
+ @property
+ def failed_rows_with_errors(self):
Review Comment:
Please also add the return types for these methods, since Python may not be
able to infer them from the type annotations on the attributes (i.e. `->
PCollection[...]`)
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -235,6 +235,54 @@ def compute_table_name(row):
[2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
[3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
+Output of the WriteToBigQuery transform
+---------------------------------------
+
+Writing to BigQuery returns a WriteResult object that includes metadata
+relating to the write you configured. This data can be used in later steps
+in your pipeline:::
+
+ schema = {'fields': [
+ {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+ error_schema = {'fields': [
+ {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+ with Pipeline() as p:
+ result = (p
+ | 'Create Columns' >> beam.Create([
+ {'column': 'value'},
+ {'bad_column': 'bad_value'}
+ ])
+ | 'Write Data' >> WriteToBigQuery(
+ method=WriteToBigQuery.Method.STREAMING_INSERTS,
+ table=my_table,
+ schema=schema,
+ insert_retry_strategy=RetryStrategy.RETRY_NEVER
+ ))
+
+ _ = (result.failed_rows_with_errors
+ | 'Get Errors' >> beam.Map(lambda e: {
+ "destination": e[0],
+ "row": json.dumps(e[1]),
+ "error_message": e[2][0]['message']
+ })
+ | 'Write Errors' >> WriteToBigQuery(
+ method=WriteToBigQuery.Method.STREAMING_INSERTS,
+ table=error_log_table,
+ schema=error_schema,
+ ))
+
+Attributes can be accessed using dot notation or bracket notation:
+
+result.failed_rows <--> result['FailedRows']
+result.failed_rows_with_errors <--> result['FailedRowsWithErrors']
+result.destination_load_jobid_pairs <--> result['destination_load_jobid_pairs']
+result.destination_file_pairs <--> result['destination_file_pairs']
+result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
Review Comment:
This looks great. Thanks! Do you know whether lines 280-284 show up as
code/monospace font? Maybe you need to add backticks or indentantion to make it
look as code?
Finally, let's add a sub-section that says something like this:
```
### Chaining of operations after WriteToBigQuery
WriteToBigQuery returns an object with several PCollections with metadata
about the write operations. These are
usfeul to inspect the write operation; however, often the simplest use case
is to chain an operation after writing data
to BigQuery.
To chain an operation, one can chain it after one of the output
PCollections. A generic way in which one could chain
this operation (independent of write method) would be:::
def chain_after(result):
try:
# This should work for FILE_LOADS where we run load and possibly copy
jobs.
return (result.load_jobid_pairs, result.copy_jobid_pairs) |
beam.Flatten()
except AttributeError:
return result.failed_rows
result = (pcoll | WriteToBigQuery(...))
_ = (chain_after(result)
| beam.Reshuffle() # Force a 'commit' of the intermediate date
| MyOperationAfterWriteToBQ())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]