carlpayne commented on issue #23291: URL: https://github.com/apache/beam/issues/23291#issuecomment-1322208525
@reuvenlax thanks for taking a look at this. From my experiments, it seems that this can occur when the schema set in BigQueryIO does not match the schema in BigQuery itself. For example, let's assume we have a table with two columns: 1. `required_field` - REQUIRED 2. `optional_field` - NULLABLE If we set the schema in BigQueryIO to only have the `optional_field` (not `required_field`), then we end up in the retry-loop I mentioned above, because the error is coming back from BigQuery itself rather than being detected up-front by BigQueryIO (which would correctly send it to getFailedStorageApiInserts as expected). In our use case this can easily happen, because the schemas are set externally in a schema registry (in our real app we use `DynamicDestinations` and `getSchema` makes a dynamic call to our schema registry). Overall, our goal is to update BigQuery to match the new schema as it changes (hence why I've also raised https://github.com/apache/beam/issues/24063). Rough working example below. Hopefully this makes sense? ``` import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; public class BQConnectDemo { private static final Logger log = LoggerFactory.getLogger(BQConnectDemo.class); static void runDemo(BQConnectOptions options) { Pipeline p = Pipeline.create(options); final List<String> rowText = Arrays.asList( "To be, or not to be: that is the question: ", "Whether 'tis nobler in the mind to suffer ", "The slings and arrows of outrageous fortune, ", "Or to take arms against a sea of troubles, "); final PCollection<String> rows = p.apply(Create.of(rowText).withCoder(StringUtf8Coder.of())); WriteResult writeResult = rows.apply( "WriteToBigQuery", BigQueryIO.<String>write() //[project_id]:[dataset_id].[table_id] .to("<PROJECT_REDACTED>.<DATASET_REDACTED>.required_fields_demo") .withSchema(new TableSchema() .setFields( List.of( // Note that schema does not contain required_field new TableFieldSchema() .setName("optional_field") .setType("STRING") .setMode("NULLABLE") ) ) ) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) .withFormatFunction(new SerializableFunction<String, TableRow>() { @Override public TableRow apply(String input) { // Note that row does not contain required_field return new TableRow().set("optional_field", "test_value"); } }) ); PCollection<BigQueryStorageApiInsertError> failedInserts = writeResult.getFailedStorageApiInserts(); failedInserts.apply("Log failures", ParDo.of(new LogErrors())); p.run(); } public static void main(String[] args) { BQConnectOptions options = PipelineOptionsFactory .fromArgs(args) .withValidation() .as(BQConnectOptions.class); runDemo(options); } static class LogErrors extends DoFn<BigQueryStorageApiInsertError, Void> { @ProcessElement public void processElement(ProcessContext context) { log.error(String.format("Failed to insert element to BigQuery: %s", context.element())); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
