I'm trying to write a robust pipeline that takes input from PubSub and writes to BigQuery. For every PubsubMessage that is not successfully written to BigQuery, I'd like to get the original PubsubMessage back and be able to write to an error output collection. I'm not sure this is quite possible, though.
The first issue is that BigQueryIO's withFormatFunction doesn't seem to provide any error handling. If my formatFunction raises an exception, it will bubble up to a PipelineExecutionException and kill the job. I'd like to be able to catch the exception instead and send the original payload to an error output collection. To get around this, we're using a pre-processing transform to parse our JSON payload into a TableRow as a separate step, then calling BigQueryIO.writeTableRows (which is documented as something to avoid). Similarly, I'd like to be able to recover the original message if a failure occurs after formatFunction and BigQuery rejects the insert. WriteResult.getFailedInserts() initially seemed to do this, but it looks to always return TableRow rather than the original pre-formatFunction message. Also, I found that failed inserts by default raise an exception that stops processing. I found that I had to set InsertRetryPolicy.retryTransientErrors() in order to avoid failed inserts bubbling up to PipelineExecutionException. Are there details I'm missing of the existing API that would allow me to do the kind of error handling I'm talking about here? Is setting a non-default InsertRetryPolicy required for getFailedInserts, or is that a bug? Do others see a need for changes to the BigQueryIO.Write API to enable better error handling?