carlpayne commented on issue #23291:
URL: https://github.com/apache/beam/issues/23291#issuecomment-1322208525

   @reuvenlax thanks for taking a look at this. From my experiments, it seems 
that this can occur when the schema set in BigQueryIO does not match the schema 
in BigQuery itself. For example, let's assume we have a table with two columns:
   
   1. `required_field` - REQUIRED
   2. `optional_field` - NULLABLE
   
   If we set the schema in BigQueryIO to only have the `optional_field` (not 
`required_field`), then we end up in the retry-loop I mentioned above, because 
the error is coming back from BigQuery itself rather than being detected 
up-front by BigQueryIO (which would correctly send it to 
getFailedStorageApiInserts as expected). 
   
   In our use case this can easily happen, because the schemas are set 
externally in a schema registry (in our real app we use `DynamicDestinations` 
and `getSchema` makes a dynamic call to our schema registry). Overall, our goal 
is to update BigQuery to match the new schema as it changes (hence why I've 
also raised https://github.com/apache/beam/issues/24063).
   
   Rough working example below. Hopefully this makes sense?
   
   ```
   import com.google.api.services.bigquery.model.TableFieldSchema;
   import com.google.api.services.bigquery.model.TableRow;
   import com.google.api.services.bigquery.model.TableSchema;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.coders.StringUtf8Coder;
   import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
   import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
   import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
   import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.transforms.Create;
   import org.apache.beam.sdk.transforms.DoFn;
   import org.apache.beam.sdk.transforms.ParDo;
   import org.apache.beam.sdk.transforms.SerializableFunction;
   import org.apache.beam.sdk.values.PCollection;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.util.Arrays;
   import java.util.List;
   
   public class BQConnectDemo {
       private static final Logger log = 
LoggerFactory.getLogger(BQConnectDemo.class);
   
       static void runDemo(BQConnectOptions options) {
           Pipeline p = Pipeline.create(options);
   
           final List<String> rowText = Arrays.asList(
                   "To be, or not to be: that is the question: ",
                   "Whether 'tis nobler in the mind to suffer ",
                   "The slings and arrows of outrageous fortune, ",
                   "Or to take arms against a sea of troubles, ");
   
           final PCollection<String> rows = 
p.apply(Create.of(rowText).withCoder(StringUtf8Coder.of()));
   
           WriteResult writeResult = rows.apply(
                   "WriteToBigQuery",
                   BigQueryIO.<String>write()
                           //[project_id]:[dataset_id].[table_id]
                           
.to("<PROJECT_REDACTED>.<DATASET_REDACTED>.required_fields_demo")
                           .withSchema(new TableSchema()
                                   .setFields(
                                           List.of(
                                                   // Note that schema does not 
contain required_field
                                                   new TableFieldSchema()
                                                           
.setName("optional_field")
                                                           .setType("STRING")
                                                           .setMode("NULLABLE")
                                           )
                                   )
                           )
                           
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                           
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                           .withExtendedErrorInfo()
                           
.withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
                           
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                           .withFormatFunction(new SerializableFunction<String, 
TableRow>() {
                               @Override
                               public TableRow apply(String input) {
                                   // Note that row does not contain 
required_field
                                   return new TableRow().set("optional_field", 
"test_value");
                               }
                           })
           );
   
           PCollection<BigQueryStorageApiInsertError> failedInserts = 
writeResult.getFailedStorageApiInserts();
   
           failedInserts.apply("Log failures", ParDo.of(new LogErrors()));
   
           p.run();
       }
   
       public static void main(String[] args) {
           BQConnectOptions options =
                   PipelineOptionsFactory
                           .fromArgs(args)
                           .withValidation()
                           .as(BQConnectOptions.class);
   
           runDemo(options);
       }
   
       static class LogErrors extends DoFn<BigQueryStorageApiInsertError, Void> 
{
           @ProcessElement
           public void processElement(ProcessContext context) {
               log.error(String.format("Failed to insert element to BigQuery: 
%s", context.element()));
           }
       }
   
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to