ahmedabu98 commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3095806474


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -425,18 +519,38 @@ public interface ThrowingBiFunction<FirstInputT, 
SecondInputT, OutputT> {
                   })
               .put(
                   TableFieldSchema.Type.STRING,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.JSON,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.GEOGRAPHY,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .build();
 
+  static final HashFunction SCHEMA_HASH_FUNCTION = Hashing.goodFastHash(32);
+
+  public static byte[] tableSchemaHash(TableSchema tableSchema) {
+    return tableSchemaHash("", tableSchema.getFieldsList()).asBytes();
+  }
+
+  public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema> 
fields) {

Review Comment:
   Can we also hash the field type? The current logic will see `{name: a, type: 
STRING}` and `{name: a, type: INTEGER}` as equal



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4202,9 +4206,19 @@ private <DestinationT> WriteResult continueExpandTyped(
         }
         return input.apply(batchLoads);
       } else if (method == Method.STORAGE_WRITE_API || method == 
Method.STORAGE_API_AT_LEAST_ONCE) {
+        boolean useSchemaUpdate =
+            getSchemaUpdateOptions() != null && 
!getSchemaUpdateOptions().isEmpty();
+        if (useSchemaUpdate) {
+          checkArgument(
+              !getAutoSchemaUpdate(),
+              "Schema update options are not supported when using auto schema 
update");
+          checkArgument(!getIgnoreUnknownValues());
+        }
         BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
         StorageApiDynamicDestinations<T, DestinationT> 
storageApiDynamicDestinations;
         if (getUseBeamSchema()) {
+          checkArgument(
+              !useSchemaUpdate, "SchemaUpdateOptions are not supported when 
using Beam schemas");

Review Comment:
   Could this break existing FileLoads pipelines?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4233,13 +4250,18 @@ private <DestinationT> WriteResult continueExpandTyped(
               !getIgnoreUnknownValues(),
               "ignoreUnknownValues not supported when using writeProtos."
                   + " Try setting withDirectWriteProtos(false)");
+          checkArgument(!useSchemaUpdate);
+

Review Comment:
   nit: duplicate check, already covered above



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaUpdateHoldingFn.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * This is a stateful DoFn that buffers elements that triggered table schema 
update. Once the table
+ * schema has been updated, this reprocesses the messages and allows them to 
continue on through the
+ * sink.
+ */

Review Comment:
   Can you also mention that a table update operation is triggered when 
`SchemaUpdateHoldingFn` sees a `null` value, produced by `PatchTableSchemaDoFn`
   
   Would help future readers make sense of the different moving pieces 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;
+        @Nullable
+        TableFieldSchema oldValue =
+            newFields
+                .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+                .put(
+                    name,
+                    TableFieldSchema.newBuilder()
+                        .setName(name)
+                        .setMode(mode)
+                        .setType(type)
+                        .build());
+        if (oldValue != null) {
+          // Duplicates are ok because we might run this over an entire 
bundle. However we must
+          // ensure that they are compatible.
+          if (!oldValue.getType().equals(type)) {
+            throw new TableRowToStorageApiProto.SchemaDoesntMatchException(
+                "Inconsistent types seen for field: " + e.getMissingField());
+          }
+        }
+      } else if (schemaConversionException
+          instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException) {
+        ((TableRowToStorageApiProto.SchemaMissingRequiredFieldException) 
schemaConversionException)
+            .getMissingFields()
+            .forEach(
+                f -> {
+                  List<String> components = 
Arrays.asList(f.toLowerCase().split("\\."));
+                  String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+                  String name = components.get(components.size() - 1);
+                  relaxedFields.computeIfAbsent(prefix, p -> 
Sets.newHashSet()).add(name);
+                });
+      } else {
+        throw new RuntimeException(
+            "Unexpected error " + schemaConversionException, 
schemaConversionException);
+      }
+    }
+    return TableSchema.newBuilder()
+        .addAllFields(
+            getIncrementalSchemaHelper(newFields, relaxedFields, 
oldSchema.getFieldsList(), ""))
+        .build();
+  }
+
+  private static List<TableFieldSchema> getIncrementalSchemaHelper(
+      Map<String, LinkedHashMap<String, TableFieldSchema>> newFields,
+      Map<String, Set<String>> relaxedFields,
+      List<TableFieldSchema> tableFields,
+      String prefix) {
+    List<TableFieldSchema> fields = Lists.newArrayList();
+
+    Set<String> fieldsToRelax = relaxedFields.getOrDefault(prefix, 
Collections.emptySet());
+    // Add existing fields in the same order.
+    for (TableFieldSchema fieldSchema : tableFields) {
+      String fieldName = fieldSchema.getName().toLowerCase();
+      TableFieldSchema.Builder clonedField = null;
+      if (fieldsToRelax.contains(fieldName)) {
+        // Since we're only generating the incremental schema, existing fields 
are only examined if
+        // they change -
+        // i.e. they are relaxed.
+        clonedField = fieldSchema.toBuilder();
+        clonedField.setMode(TableFieldSchema.Mode.NULLABLE);
+      }
+      if (fieldSchema.getType().equals(TableFieldSchema.Type.STRUCT)) {
+        // Recursively walk the schema, looking for more field relaxations.
+        String newPrefix = prefix.isEmpty() ? fieldName : String.join(".", 
prefix, fieldName);
+        List<TableFieldSchema> newSubfields =
+            getIncrementalSchemaHelper(
+                newFields, relaxedFields, fieldSchema.getFieldsList(), 
newPrefix);
+        if (!newSubfields.isEmpty()) {
+          if (clonedField == null) {
+            clonedField = fieldSchema.toBuilder();
+          }
+          clonedField.clearFields();
+          clonedField.addAllFields(newSubfields);
+        }
+      }
+      if (clonedField != null) {
+        fields.add(clonedField.build());
+      }
+    }
+
+    LinkedHashMap<String, TableFieldSchema> fieldsToAdd =
+        newFields.getOrDefault(prefix, new LinkedHashMap<>());
+    for (Map.Entry<String, TableFieldSchema> entry : fieldsToAdd.entrySet()) {
+      TableFieldSchema.Builder field = entry.getValue().toBuilder();
+      // We rely on the exception telling us intermediate struct fields.
+      if (field.getType().equals(TableFieldSchema.Type.STRUCT)) {
+        String fieldName = field.getName().toLowerCase();
+        String newPrefix = prefix.isEmpty() ? fieldName : String.join(".", 
prefix, fieldName);
+        field.addAllFields(
+            getIncrementalSchemaHelper(
+                newFields, relaxedFields, Collections.emptyList(), newPrefix));
+      }
+      fields.add(field.build());
+    }
+
+    return fields;
+  }
+
+  // Merge two schemas. schema1 is considered the primary schema, and will 
control what order
+  // overlapping fields
+  // are created in the final schema.
+  public static TableSchema mergeSchemas(TableSchema schema1, TableSchema 
schema2) {
+    List<TableFieldSchema> mergedFields =
+        mergeFields(schema1.getFieldsList(), schema2.getFieldsList());
+    return TableSchema.newBuilder().addAllFields(mergedFields).build();
+  }
+
+  private static List<TableFieldSchema> mergeFields(
+      List<TableFieldSchema> fields1, List<TableFieldSchema> fields2) {
+    // Use LinkedHashMap to preserve the order of fields
+    Map<String, TableFieldSchema> mergedFieldsMap = Maps.newLinkedHashMap();
+
+    // Add all fields from schema 1.
+    fields1.forEach(f -> mergedFieldsMap.put(f.getName().toLowerCase(), f));
+
+    // Merge or append fields from schema 2
+    for (TableFieldSchema f2 : fields2) {
+      String lowerName = f2.getName().toLowerCase();
+      mergedFieldsMap.compute(lowerName, (k, v) -> v == null ? f2 : 
mergeField(v, f2));
+    }
+    return Lists.newArrayList(mergedFieldsMap.values());
+  }
+
+  private static TableFieldSchema mergeField(TableFieldSchema f1, 
TableFieldSchema f2) {
+    if (!f1.getType().equals(f2.getType())) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Conflicting field types for field '%s': %s vs %s",
+              f1.getName(), f1.getType(), f2.getType()));
+    }
+
+    TableFieldSchema.Builder builder = f1.toBuilder().mergeFrom(f2);
+    builder.clearFields();
+
+    // Handle mode weakening (REPEATED > NULLABLE > REQUIRED)
+    TableFieldSchema.Mode mode1 =
+        f1.getMode() == TableFieldSchema.Mode.MODE_UNSPECIFIED
+            ? TableFieldSchema.Mode.NULLABLE
+            : f1.getMode();
+    TableFieldSchema.Mode mode2 =
+        f2.getMode() == TableFieldSchema.Mode.MODE_UNSPECIFIED
+            ? TableFieldSchema.Mode.NULLABLE
+            : f2.getMode();
+
+    if (mode1 == TableFieldSchema.Mode.REPEATED || mode2 == 
TableFieldSchema.Mode.REPEATED) {
+      // Any field can be relaxed into a repeated field.
+      builder.setMode(TableFieldSchema.Mode.REPEATED);

Review Comment:
   Is this true? AFAIK you can't change from/to a REPEATED field: 
https://docs.cloud.google.com/bigquery/docs/managing-table-schemas#change_a_columns_mode



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -833,6 +950,8 @@ public void process(
                     || statusCode.equals(Code.INVALID_ARGUMENT)
                     || statusCode.equals(Code.NOT_FOUND)
                     || statusCode.equals(Code.FAILED_PRECONDITION);
+            streamDoesNotExist = streamDoesNotExist && !schemaMismatchError;
+
             if (offsetMismatch || streamDoesNotExist) {
               appendOffsetFailures.inc();

Review Comment:
   Do we need to reset and create a new stream for schema mismatch errors?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4202,9 +4206,19 @@ private <DestinationT> WriteResult continueExpandTyped(
         }
         return input.apply(batchLoads);
       } else if (method == Method.STORAGE_WRITE_API || method == 
Method.STORAGE_API_AT_LEAST_ONCE) {
+        boolean useSchemaUpdate =
+            getSchemaUpdateOptions() != null && 
!getSchemaUpdateOptions().isEmpty();
+        if (useSchemaUpdate) {
+          checkArgument(
+              !getAutoSchemaUpdate(),
+              "Schema update options are not supported when using auto schema 
update");
+          checkArgument(!getIgnoreUnknownValues());

Review Comment:
   Merge `checkArgument` logic together with the same error message?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {

Review Comment:
   Can we add some unit tests for this class?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ConvertMessagesDoFn.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * DoFn that interacts with the StorageApiDynamicDestinations instance to 
convert messages to
+ * StorageApiWritePaylod. Messages that fail to convert are routed to the 
dead-letter PCollection.
+ * If schemaUpdateOptions are set, then messages that fail to convert due to 
missing columns are
+ * routed to a buffering transform that holds them until the table's schema 
has been updated.
+ */
+public class ConvertMessagesDoFn<DestinationT extends @NonNull Object, 
ElementT>
+    extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT, 
StorageApiWritePayload>> {
+  private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+  private TwoLevelMessageConverterCache<DestinationT, ElementT> 
messageConverters;
+  private final BigQueryServices bqServices;
+  private final TupleTag<BigQueryStorageApiInsertError> failedWritesTag;
+  private final TupleTag<KV<DestinationT, StorageApiWritePayload>> 
successfulWritesTag;
+  private final TupleTag<KV<@NonNull DestinationT, TableSchema>> 
patchTableSchemaTag;
+  private final TupleTag<KV<@NonNull DestinationT, ElementT>> 
retryElementsWaitingForSchemaTag;
+  private final @Nullable SerializableFunction<ElementT, 
RowMutationInformation> rowMutationFn;
+  private final BadRecordRouter badRecordRouter;
+  private final Coder<KV<DestinationT, ElementT>> elementCoder;
+  private final Map<DestinationT, BufferedCollectorInformation> 
errorCollectors = Maps.newHashMap();
+  private final boolean hasSchemaUpdateOptions;
+  private transient BigQueryServices.@Nullable DatasetService 
datasetServiceInternal = null;
+  private transient BigQueryServices.@Nullable WriteStreamService 
writeStreamServiceInternal = null;
+
+  static final class BufferedCollectorInformation {

Review Comment:
   Can we add a java-doc?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -89,6 +93,45 @@
  * with the Storage write API.
  */
 public class TableRowToStorageApiProto {
+
+  public static class ErrorCollector {
+    private final List<SchemaConversionException> exceptions = 
Lists.newArrayList();
+    private final Predicate<SchemaConversionException> shouldCollect;
+
+    public static final ErrorCollector DONT_COLLECT = new 
ErrorCollector(Predicates.alwaysFalse());
+
+    public ErrorCollector(Predicate<SchemaConversionException> shouldCollect) {
+      this.shouldCollect = shouldCollect;
+    }
+
+    // Returns true if the exception was collected.
+    void collect(SchemaConversionException exception) throws 
SchemaConversionException {

Review Comment:
   `// Returns true`
   
   outdated comment?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;
+        @Nullable
+        TableFieldSchema oldValue =
+            newFields
+                .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+                .put(
+                    name,
+                    TableFieldSchema.newBuilder()
+                        .setName(name)
+                        .setMode(mode)
+                        .setType(type)
+                        .build());
+        if (oldValue != null) {
+          // Duplicates are ok because we might run this over an entire 
bundle. However we must
+          // ensure that they are compatible.
+          if (!oldValue.getType().equals(type)) {
+            throw new TableRowToStorageApiProto.SchemaDoesntMatchException(
+                "Inconsistent types seen for field: " + e.getMissingField());

Review Comment:
   Can we also log what the conflicting types are



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -585,12 +699,25 @@ private SchemaInformation(
       }
     }
 
+    // Returns a SchemaInformation that descends from this one. Does not 
modify the current
+    // SchemaInformation -
+    // the new SchemaInformation is traversable upwards only.
+    public SchemaInformation createDescendent(TableFieldSchema 
tableFieldSchema) {
+      return new SchemaInformation(
+          tableFieldSchema, Iterables.concat(this.parentSchemas, 
ImmutableList.of(this)));
+    }
+
     public String getFullName() {
-      String prefix =
-          StreamSupport.stream(parentSchemas.spliterator(), false)
-              .map(SchemaInformation::getName)
-              .collect(Collectors.joining("."));
-      return prefix.isEmpty() ? getName() : prefix + "." + getName();
+      if (!Iterables.isEmpty(parentSchemas)) {
+        String prefix =
+            StreamSupport.stream(parentSchemas.spliterator(), false)
+                .skip(1)
+                .map(SchemaInformation::getName)
+                .collect(Collectors.joining("."));
+        return prefix.isEmpty() ? getName() : prefix + "." + getName();
+      } else {
+        return "";

Review Comment:
   Should we return `getName()` ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -46,6 +46,7 @@ public class BigQuerySinkMetrics {
 
   public static final String METRICS_NAMESPACE = "BigQuerySink";
 
+  // Status codes

Review Comment:
   nit: cleanup



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(

Review Comment:
   Can you add a small doc-string for this method



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;

Review Comment:
   Maybe add a TODO that we are defaulting to STRING types for now, but would 
like to add more robust handling?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   nit: cleanup duplicate license header



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaUpdateHoldingFn.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * This is a stateful DoFn that buffers elements that triggered table schema 
update. Once the table
+ * schema has been updated, this reprocesses the messages and allows them to 
continue on through the
+ * sink.
+ */
+public class SchemaUpdateHoldingFn<DestinationT extends @NonNull Object, 
ElementT>
+    extends DoFn<
+        KV<ShardedKey<DestinationT>, @Nullable ElementT>,
+        KV<DestinationT, StorageApiWritePayload>> {
+  private static final Duration POLL_DURATION = Duration.standardSeconds(1);
+
+  @StateId("bufferedElements")
+  private final StateSpec<BagState<TimestampedValue<ElementT>>> bufferedSpec;
+
+  @StateId("minBufferedTimestamp")
+  private final StateSpec<CombiningState<Long, long[], Long>> 
minBufferedTsSpec;
+
+  @StateId("timerTimestamp")
+  private final StateSpec<ValueState<Long>> timerTsSpec;
+
+  @TimerId("pollTimer")
+  private final TimerSpec pollTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final ConvertMessagesDoFn<DestinationT, ElementT> 
convertMessagesDoFn;
+
+  public SchemaUpdateHoldingFn(
+      Coder<ElementT> elementCoder,
+      ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn) {
+    this.convertMessagesDoFn = convertMessagesDoFn;
+    this.bufferedSpec = 
StateSpecs.bag(TimestampedValue.TimestampedValueCoder.of(elementCoder));
+    this.timerTsSpec = StateSpecs.value();
+
+    Combine.BinaryCombineLongFn minCombineFn =
+        new Combine.BinaryCombineLongFn() {
+          @Override
+          public long identity() {
+            return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+          }
+
+          @Override
+          public long apply(long left, long right) {
+            return Math.min(left, right);
+          }
+        };
+    this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);
+  }
+
+  @StartBundle
+  public void startBundle() {
+    convertMessagesDoFn.startBundle();
+    ;
+  }
+
+  @Teardown
+  public void onTeardown() {
+    convertMessagesDoFn.onTeardown();
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element KV<ShardedKey<DestinationT>, @Nullable ElementT> element,
+      @Timestamp Instant timestamp,
+      @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+      @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+      @StateId("timerTimestamp") ValueState<Long> timerTs,
+      @TimerId("pollTimer") Timer pollTimer,
+      ProcessContext context,
+      BoundedWindow window,
+      MultiOutputReceiver o)
+      throws Exception {
+    
convertMessagesDoFn.getDynamicDestinations().setSideInputAccessorFromProcessContext(context);

Review Comment:
   This is currently a no-op right?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -847,11 +896,40 @@ long flush(
                         + failedContext.offset);
               }
 
+              // Schema mismatched exceptions can happen if the table was 
recently updated. Since
+              // vortex caches schemas
+              // we might see the new schema before vortex does. In this case, 
we simply need to
+              // retry.
+              Exceptions.@Nullable StorageException storageException =
+                  (error == null) ? null : 
Exceptions.toStorageException(error);
+              boolean schemaMismatchError =
+                  (storageException instanceof 
Exceptions.SchemaMismatchedException);
+              if (!schemaMismatchError && error != null) {
+                // There's no special error code for missing required fields, 
and that can also
+                // happen due to vortex
+                // being delayed at seeing a new schema. We're forced to parse 
the description to
+                // determine that this
+                // has happened.
+                Status status = Status.fromThrowable(error);
+                if (status.getCode() == Status.Code.INVALID_ARGUMENT) {
+                  String description = status.getDescription();
+                  schemaMismatchError =
+                      description != null && 
description.contains("incompatible fields");
+                }
+              }
+              if (schemaMismatchError) {
+                LOG.info(
+                    "Vortex failed stream open due to incompatible fields. 
This is likely because the BigTable "
+                        + "schema was recently updated and Vortex hasn't 
noticed yet, so retrying. error {}",
+                    Preconditions.checkStateNotNull(error).toString());
+              }
+

Review Comment:
   nit: large part of this is duplicated between this and 
`StorageApiWritesShardedRecords`
   
   Side note: historically there's been many overlaps between the two classes. 
Would make things a lot easier to follow if there's a shared utility class



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java:
##########
@@ -32,17 +34,26 @@ public interface MessageConverter<T> {
     DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) 
throws Exception;
 
     StorageApiWritePayload toMessage(
-        T element, @Nullable RowMutationInformation rowMutationInformation) 
throws Exception;
+        T element,
+        @Nullable RowMutationInformation rowMutationInformation,
+        TableRowToStorageApiProto.ErrorCollector collectedExceptions)
+        throws Exception;
 
     TableRow toFailsafeTableRow(T element);
+
+    void updateSchemaFromTable() throws IOException, InterruptedException;

Review Comment:
   Can we have subclasses that don't implement this throw an 
UnsupportedOperationException? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -884,72 +1003,107 @@ public void process(
               }
             }
           };
-      Instant now = Instant.now();
-      List<AppendRowsContext> contexts = Lists.newArrayList();
-      RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
-          new RetryManager<>(
-              Duration.standardSeconds(1),
-              Duration.standardSeconds(20),
-              maxRetries,
-              
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
-      int numAppends = 0;
-      for (SplittingIterable.Value splitValue : messages) {
-        // Handle the case of a row that is too large.
-        if (splitValue.getProtoRows().getSerializedSize() >= maxRequestSize) {
-          if (splitValue.getProtoRows().getSerializedRowsCount() > 1) {
-            // TODO(reuvenlax): Is it worth trying to handle this case by 
splitting the protoRows?
-            // Given that we split
-            // the ProtoRows iterable at 2MB and the max request size is 10MB, 
this scenario seems
-            // nearly impossible.
-            LOG.error(
-                "A request containing more than one row is over the request 
size limit of {}. This is unexpected. All rows in the request will be sent to 
the failed-rows PCollection.",
-                maxRequestSize);
-          }
-          for (int i = 0; i < 
splitValue.getProtoRows().getSerializedRowsCount(); ++i) {
-            org.joda.time.Instant timestamp = 
splitValue.getTimestamps().get(i);
-            TableRow failedRow = splitValue.getFailsafeTableRows().get(i);
-            if (failedRow == null) {
-              ByteString rowBytes = 
splitValue.getProtoRows().getSerializedRows(i);
-              failedRow = appendClientInfo.get().toTableRow(rowBytes, 
Predicates.alwaysTrue());
-            }
-            o.get(failedRowsTag)
-                .outputWithTimestamp(
-                    new BigQueryStorageApiInsertError(
-                        failedRow,
-                        "Row payload too large. Maximum size " + 
maxRequestSize,
-                        tableReference),
-                    timestamp);
-          }
-          int numRowsFailed = 
splitValue.getProtoRows().getSerializedRowsCount();
-          rowsSentToFailedRowsCollection.inc(numRowsFailed);
-          BigQuerySinkMetrics.appendRowsRowStatusCounter(
-                  BigQuerySinkMetrics.RowStatus.FAILED,
-                  BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
-                  shortTableId)
-              .inc(numRowsFailed);
-        } else {
-          ++numAppends;
-          // RetryManager
-          AppendRowsContext context =
-              new AppendRowsContext(
-                  element.getKey(),
-                  splitValue.getProtoRows(),
-                  splitValue.getTimestamps(),
-                  splitValue.getFailsafeTableRows());
-          contexts.add(context);
-          retryManager.addOperation(runOperation, onError, onSuccess, context);
-          
recordsAppended.inc(splitValue.getProtoRows().getSerializedRowsCount());
-          
appendSizeDistribution.update(context.protoRows.getSerializedRowsCount());
+
+      BackOff backoff =
+          FluentBackoff.DEFAULT
+              .withInitialBackoff(Duration.standardSeconds(1))
+              .withMaxBackoff(Duration.standardMinutes(1))
+              .withMaxRetries(500)
+              .withThrottledTimeCounter(
+                  BigQuerySinkMetrics.throttledTimeCounter(
+                      BigQuerySinkMetrics.RpcMethod.OPEN_WRITE_STREAM))
+              .backoff();
+      CreateRetryManagerResult<DestinationT> createRetryManagerResult;
+      do {
+        // Each ProtoRows object contains at most 1MB of rows.
+        // TODO: Push messageFromTableRow up to top level. That we we cans 
skip TableRow entirely if
+        // already proto or already schema.
+        Iterable<SplittingIterable.Value> messages =
+            new SplittingIterable(
+                element.getValue(),
+                splitSize,
+                // Unknown field merger
+                (bytes, tableRow) ->
+                    appendClientInfo.get().mergeNewFields(bytes, tableRow, 
ignoreUnknownValues),
+                // Convert back to TableRow
+                bytes -> appendClientInfo.get().toTableRow(bytes, 
Predicates.alwaysTrue()),
+                // Failed rows consumer
+                (failedRow, errorMessage) -> {
+                  o.get(failedRowsTag)
+                      .outputWithTimestamp(
+                          new BigQueryStorageApiInsertError(
+                              failedRow.getValue(), errorMessage, 
tableReference),
+                          failedRow.getTimestamp());
+                  rowsSentToFailedRowsCollection.inc();
+                  BigQuerySinkMetrics.appendRowsRowStatusCounter(
+                          BigQuerySinkMetrics.RowStatus.FAILED,
+                          BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
+                          shortTableId)
+                      .inc(1);
+                },
+                // Get the currently-known TableSchema hash
+                () -> appendClientInfo.get().getTableSchemaHash(),
+                () ->
+                    TableRowToStorageApiProto.wrapDescriptorProto(
+                        messageConverter.getDescriptor(false)),
+                autoUpdateSchema,
+                elementTs);
+
+        createRetryManagerResult =

Review Comment:
   readability nit: declare it inline with initialization?
   ```suggestion
           CreateRetryManagerResult<DestinationT> createRetryManagerResult =
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;
+        @Nullable
+        TableFieldSchema oldValue =
+            newFields
+                .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+                .put(
+                    name,
+                    TableFieldSchema.newBuilder()
+                        .setName(name)
+                        .setMode(mode)
+                        .setType(type)
+                        .build());

Review Comment:
   Should we compare the current and new value, and keep the more "relaxed" 
version of the two?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -89,6 +93,45 @@
  * with the Storage write API.
  */
 public class TableRowToStorageApiProto {
+
+  public static class ErrorCollector {

Review Comment:
   Can we add a java-doc here too?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -425,18 +519,38 @@ public interface ThrowingBiFunction<FirstInputT, 
SecondInputT, OutputT> {
                   })
               .put(
                   TableFieldSchema.Type.STRING,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.JSON,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.GEOGRAPHY,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .build();
 
+  static final HashFunction SCHEMA_HASH_FUNCTION = Hashing.goodFastHash(32);
+
+  public static byte[] tableSchemaHash(TableSchema tableSchema) {
+    return tableSchemaHash("", tableSchema.getFieldsList()).asBytes();
+  }
+
+  public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema> 
fields) {

Review Comment:
   And can we add some unit testing? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to