ahmedabu98 commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3095806474
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -425,18 +519,38 @@ public interface ThrowingBiFunction<FirstInputT,
SecondInputT, OutputT> {
})
.put(
TableFieldSchema.Type.STRING,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.put(
TableFieldSchema.Type.JSON,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.put(
TableFieldSchema.Type.GEOGRAPHY,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.build();
+ static final HashFunction SCHEMA_HASH_FUNCTION = Hashing.goodFastHash(32);
+
+ public static byte[] tableSchemaHash(TableSchema tableSchema) {
+ return tableSchemaHash("", tableSchema.getFieldsList()).asBytes();
+ }
+
+ public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema>
fields) {
Review Comment:
Can we also hash the field type? The current logic will see `{name: a, type:
STRING}` and `{name: a, type: INTEGER}` as equal
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4202,9 +4206,19 @@ private <DestinationT> WriteResult continueExpandTyped(
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method ==
Method.STORAGE_API_AT_LEAST_ONCE) {
+ boolean useSchemaUpdate =
+ getSchemaUpdateOptions() != null &&
!getSchemaUpdateOptions().isEmpty();
+ if (useSchemaUpdate) {
+ checkArgument(
+ !getAutoSchemaUpdate(),
+ "Schema update options are not supported when using auto schema
update");
+ checkArgument(!getIgnoreUnknownValues());
+ }
BigQueryOptions bqOptions =
input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT>
storageApiDynamicDestinations;
if (getUseBeamSchema()) {
+ checkArgument(
+ !useSchemaUpdate, "SchemaUpdateOptions are not supported when
using Beam schemas");
Review Comment:
Could this break existing FileLoads pipelines?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4233,13 +4250,18 @@ private <DestinationT> WriteResult continueExpandTyped(
!getIgnoreUnknownValues(),
"ignoreUnknownValues not supported when using writeProtos."
+ " Try setting withDirectWriteProtos(false)");
+ checkArgument(!useSchemaUpdate);
+
Review Comment:
nit: duplicate check, already covered above
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaUpdateHoldingFn.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * This is a stateful DoFn that buffers elements that triggered table schema
update. Once the table
+ * schema has been updated, this reprocesses the messages and allows them to
continue on through the
+ * sink.
+ */
Review Comment:
Can you also mention that a table update operation is triggered when
`SchemaUpdateHoldingFn` sees a `null` value, produced by `PatchTableSchemaDoFn`
Would help future readers make sense of the different moving pieces
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
+ TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema
oldSchema)
+ throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+ // This isn't the most efficient, especially if we have deeply-nested
schemas. However we don't
+ // expect to be
+ // upgrading schemas very regularly - if we do then we have other problems!
+ Map<String, LinkedHashMap<String, TableFieldSchema>> newFields =
Maps.newHashMap();
+ Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+ for (TableRowToStorageApiProto.SchemaConversionException
schemaConversionException :
+ errorCollector.getExceptions()) {
+ if (schemaConversionException instanceof
TableRowToStorageApiProto.SchemaTooNarrowException) {
+ TableRowToStorageApiProto.SchemaTooNarrowException e =
+ (TableRowToStorageApiProto.SchemaTooNarrowException)
schemaConversionException;
+ List<String> components =
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+ String prefix = String.join(".", components.subList(0,
components.size() - 1));
+ String name = components.get(components.size() - 1);
+ TableFieldSchema.Mode mode =
+ e.isRepeated() ? TableFieldSchema.Mode.REPEATED :
TableFieldSchema.Mode.NULLABLE;
+ TableFieldSchema.Type type =
+ e.isStruct() ? TableFieldSchema.Type.STRUCT :
TableFieldSchema.Type.STRING;
+ @Nullable
+ TableFieldSchema oldValue =
+ newFields
+ .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+ .put(
+ name,
+ TableFieldSchema.newBuilder()
+ .setName(name)
+ .setMode(mode)
+ .setType(type)
+ .build());
+ if (oldValue != null) {
+ // Duplicates are ok because we might run this over an entire
bundle. However we must
+ // ensure that they are compatible.
+ if (!oldValue.getType().equals(type)) {
+ throw new TableRowToStorageApiProto.SchemaDoesntMatchException(
+ "Inconsistent types seen for field: " + e.getMissingField());
+ }
+ }
+ } else if (schemaConversionException
+ instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException) {
+ ((TableRowToStorageApiProto.SchemaMissingRequiredFieldException)
schemaConversionException)
+ .getMissingFields()
+ .forEach(
+ f -> {
+ List<String> components =
Arrays.asList(f.toLowerCase().split("\\."));
+ String prefix = String.join(".", components.subList(0,
components.size() - 1));
+ String name = components.get(components.size() - 1);
+ relaxedFields.computeIfAbsent(prefix, p ->
Sets.newHashSet()).add(name);
+ });
+ } else {
+ throw new RuntimeException(
+ "Unexpected error " + schemaConversionException,
schemaConversionException);
+ }
+ }
+ return TableSchema.newBuilder()
+ .addAllFields(
+ getIncrementalSchemaHelper(newFields, relaxedFields,
oldSchema.getFieldsList(), ""))
+ .build();
+ }
+
+ private static List<TableFieldSchema> getIncrementalSchemaHelper(
+ Map<String, LinkedHashMap<String, TableFieldSchema>> newFields,
+ Map<String, Set<String>> relaxedFields,
+ List<TableFieldSchema> tableFields,
+ String prefix) {
+ List<TableFieldSchema> fields = Lists.newArrayList();
+
+ Set<String> fieldsToRelax = relaxedFields.getOrDefault(prefix,
Collections.emptySet());
+ // Add existing fields in the same order.
+ for (TableFieldSchema fieldSchema : tableFields) {
+ String fieldName = fieldSchema.getName().toLowerCase();
+ TableFieldSchema.Builder clonedField = null;
+ if (fieldsToRelax.contains(fieldName)) {
+ // Since we're only generating the incremental schema, existing fields
are only examined if
+ // they change -
+ // i.e. they are relaxed.
+ clonedField = fieldSchema.toBuilder();
+ clonedField.setMode(TableFieldSchema.Mode.NULLABLE);
+ }
+ if (fieldSchema.getType().equals(TableFieldSchema.Type.STRUCT)) {
+ // Recursively walk the schema, looking for more field relaxations.
+ String newPrefix = prefix.isEmpty() ? fieldName : String.join(".",
prefix, fieldName);
+ List<TableFieldSchema> newSubfields =
+ getIncrementalSchemaHelper(
+ newFields, relaxedFields, fieldSchema.getFieldsList(),
newPrefix);
+ if (!newSubfields.isEmpty()) {
+ if (clonedField == null) {
+ clonedField = fieldSchema.toBuilder();
+ }
+ clonedField.clearFields();
+ clonedField.addAllFields(newSubfields);
+ }
+ }
+ if (clonedField != null) {
+ fields.add(clonedField.build());
+ }
+ }
+
+ LinkedHashMap<String, TableFieldSchema> fieldsToAdd =
+ newFields.getOrDefault(prefix, new LinkedHashMap<>());
+ for (Map.Entry<String, TableFieldSchema> entry : fieldsToAdd.entrySet()) {
+ TableFieldSchema.Builder field = entry.getValue().toBuilder();
+ // We rely on the exception telling us intermediate struct fields.
+ if (field.getType().equals(TableFieldSchema.Type.STRUCT)) {
+ String fieldName = field.getName().toLowerCase();
+ String newPrefix = prefix.isEmpty() ? fieldName : String.join(".",
prefix, fieldName);
+ field.addAllFields(
+ getIncrementalSchemaHelper(
+ newFields, relaxedFields, Collections.emptyList(), newPrefix));
+ }
+ fields.add(field.build());
+ }
+
+ return fields;
+ }
+
+ // Merge two schemas. schema1 is considered the primary schema, and will
control what order
+ // overlapping fields
+ // are created in the final schema.
+ public static TableSchema mergeSchemas(TableSchema schema1, TableSchema
schema2) {
+ List<TableFieldSchema> mergedFields =
+ mergeFields(schema1.getFieldsList(), schema2.getFieldsList());
+ return TableSchema.newBuilder().addAllFields(mergedFields).build();
+ }
+
+ private static List<TableFieldSchema> mergeFields(
+ List<TableFieldSchema> fields1, List<TableFieldSchema> fields2) {
+ // Use LinkedHashMap to preserve the order of fields
+ Map<String, TableFieldSchema> mergedFieldsMap = Maps.newLinkedHashMap();
+
+ // Add all fields from schema 1.
+ fields1.forEach(f -> mergedFieldsMap.put(f.getName().toLowerCase(), f));
+
+ // Merge or append fields from schema 2
+ for (TableFieldSchema f2 : fields2) {
+ String lowerName = f2.getName().toLowerCase();
+ mergedFieldsMap.compute(lowerName, (k, v) -> v == null ? f2 :
mergeField(v, f2));
+ }
+ return Lists.newArrayList(mergedFieldsMap.values());
+ }
+
+ private static TableFieldSchema mergeField(TableFieldSchema f1,
TableFieldSchema f2) {
+ if (!f1.getType().equals(f2.getType())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Conflicting field types for field '%s': %s vs %s",
+ f1.getName(), f1.getType(), f2.getType()));
+ }
+
+ TableFieldSchema.Builder builder = f1.toBuilder().mergeFrom(f2);
+ builder.clearFields();
+
+ // Handle mode weakening (REPEATED > NULLABLE > REQUIRED)
+ TableFieldSchema.Mode mode1 =
+ f1.getMode() == TableFieldSchema.Mode.MODE_UNSPECIFIED
+ ? TableFieldSchema.Mode.NULLABLE
+ : f1.getMode();
+ TableFieldSchema.Mode mode2 =
+ f2.getMode() == TableFieldSchema.Mode.MODE_UNSPECIFIED
+ ? TableFieldSchema.Mode.NULLABLE
+ : f2.getMode();
+
+ if (mode1 == TableFieldSchema.Mode.REPEATED || mode2 ==
TableFieldSchema.Mode.REPEATED) {
+ // Any field can be relaxed into a repeated field.
+ builder.setMode(TableFieldSchema.Mode.REPEATED);
Review Comment:
Is this true? AFAIK you can't change from/to a REPEATED field:
https://docs.cloud.google.com/bigquery/docs/managing-table-schemas#change_a_columns_mode
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -833,6 +950,8 @@ public void process(
|| statusCode.equals(Code.INVALID_ARGUMENT)
|| statusCode.equals(Code.NOT_FOUND)
|| statusCode.equals(Code.FAILED_PRECONDITION);
+ streamDoesNotExist = streamDoesNotExist && !schemaMismatchError;
+
if (offsetMismatch || streamDoesNotExist) {
appendOffsetFailures.inc();
Review Comment:
Do we need to reset and create a new stream for schema mismatch errors?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -4202,9 +4206,19 @@ private <DestinationT> WriteResult continueExpandTyped(
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method ==
Method.STORAGE_API_AT_LEAST_ONCE) {
+ boolean useSchemaUpdate =
+ getSchemaUpdateOptions() != null &&
!getSchemaUpdateOptions().isEmpty();
+ if (useSchemaUpdate) {
+ checkArgument(
+ !getAutoSchemaUpdate(),
+ "Schema update options are not supported when using auto schema
update");
+ checkArgument(!getIgnoreUnknownValues());
Review Comment:
Merge `checkArgument` logic together with the same error message?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
Review Comment:
Can we add some unit tests for this class?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ConvertMessagesDoFn.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * DoFn that interacts with the StorageApiDynamicDestinations instance to
convert messages to
+ * StorageApiWritePaylod. Messages that fail to convert are routed to the
dead-letter PCollection.
+ * If schemaUpdateOptions are set, then messages that fail to convert due to
missing columns are
+ * routed to a buffering transform that holds them until the table's schema
has been updated.
+ */
+public class ConvertMessagesDoFn<DestinationT extends @NonNull Object,
ElementT>
+ extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT,
StorageApiWritePayload>> {
+ private final StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations;
+ private TwoLevelMessageConverterCache<DestinationT, ElementT>
messageConverters;
+ private final BigQueryServices bqServices;
+ private final TupleTag<BigQueryStorageApiInsertError> failedWritesTag;
+ private final TupleTag<KV<DestinationT, StorageApiWritePayload>>
successfulWritesTag;
+ private final TupleTag<KV<@NonNull DestinationT, TableSchema>>
patchTableSchemaTag;
+ private final TupleTag<KV<@NonNull DestinationT, ElementT>>
retryElementsWaitingForSchemaTag;
+ private final @Nullable SerializableFunction<ElementT,
RowMutationInformation> rowMutationFn;
+ private final BadRecordRouter badRecordRouter;
+ private final Coder<KV<DestinationT, ElementT>> elementCoder;
+ private final Map<DestinationT, BufferedCollectorInformation>
errorCollectors = Maps.newHashMap();
+ private final boolean hasSchemaUpdateOptions;
+ private transient BigQueryServices.@Nullable DatasetService
datasetServiceInternal = null;
+ private transient BigQueryServices.@Nullable WriteStreamService
writeStreamServiceInternal = null;
+
+ static final class BufferedCollectorInformation {
Review Comment:
Can we add a java-doc?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -89,6 +93,45 @@
* with the Storage write API.
*/
public class TableRowToStorageApiProto {
+
+ public static class ErrorCollector {
+ private final List<SchemaConversionException> exceptions =
Lists.newArrayList();
+ private final Predicate<SchemaConversionException> shouldCollect;
+
+ public static final ErrorCollector DONT_COLLECT = new
ErrorCollector(Predicates.alwaysFalse());
+
+ public ErrorCollector(Predicate<SchemaConversionException> shouldCollect) {
+ this.shouldCollect = shouldCollect;
+ }
+
+ // Returns true if the exception was collected.
+ void collect(SchemaConversionException exception) throws
SchemaConversionException {
Review Comment:
`// Returns true`
outdated comment?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
+ TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema
oldSchema)
+ throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+ // This isn't the most efficient, especially if we have deeply-nested
schemas. However we don't
+ // expect to be
+ // upgrading schemas very regularly - if we do then we have other problems!
+ Map<String, LinkedHashMap<String, TableFieldSchema>> newFields =
Maps.newHashMap();
+ Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+ for (TableRowToStorageApiProto.SchemaConversionException
schemaConversionException :
+ errorCollector.getExceptions()) {
+ if (schemaConversionException instanceof
TableRowToStorageApiProto.SchemaTooNarrowException) {
+ TableRowToStorageApiProto.SchemaTooNarrowException e =
+ (TableRowToStorageApiProto.SchemaTooNarrowException)
schemaConversionException;
+ List<String> components =
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+ String prefix = String.join(".", components.subList(0,
components.size() - 1));
+ String name = components.get(components.size() - 1);
+ TableFieldSchema.Mode mode =
+ e.isRepeated() ? TableFieldSchema.Mode.REPEATED :
TableFieldSchema.Mode.NULLABLE;
+ TableFieldSchema.Type type =
+ e.isStruct() ? TableFieldSchema.Type.STRUCT :
TableFieldSchema.Type.STRING;
+ @Nullable
+ TableFieldSchema oldValue =
+ newFields
+ .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+ .put(
+ name,
+ TableFieldSchema.newBuilder()
+ .setName(name)
+ .setMode(mode)
+ .setType(type)
+ .build());
+ if (oldValue != null) {
+ // Duplicates are ok because we might run this over an entire
bundle. However we must
+ // ensure that they are compatible.
+ if (!oldValue.getType().equals(type)) {
+ throw new TableRowToStorageApiProto.SchemaDoesntMatchException(
+ "Inconsistent types seen for field: " + e.getMissingField());
Review Comment:
Can we also log what the conflicting types are
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -585,12 +699,25 @@ private SchemaInformation(
}
}
+ // Returns a SchemaInformation that descends from this one. Does not
modify the current
+ // SchemaInformation -
+ // the new SchemaInformation is traversable upwards only.
+ public SchemaInformation createDescendent(TableFieldSchema
tableFieldSchema) {
+ return new SchemaInformation(
+ tableFieldSchema, Iterables.concat(this.parentSchemas,
ImmutableList.of(this)));
+ }
+
public String getFullName() {
- String prefix =
- StreamSupport.stream(parentSchemas.spliterator(), false)
- .map(SchemaInformation::getName)
- .collect(Collectors.joining("."));
- return prefix.isEmpty() ? getName() : prefix + "." + getName();
+ if (!Iterables.isEmpty(parentSchemas)) {
+ String prefix =
+ StreamSupport.stream(parentSchemas.spliterator(), false)
+ .skip(1)
+ .map(SchemaInformation::getName)
+ .collect(Collectors.joining("."));
+ return prefix.isEmpty() ? getName() : prefix + "." + getName();
+ } else {
+ return "";
Review Comment:
Should we return `getName()` ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -46,6 +46,7 @@ public class BigQuerySinkMetrics {
public static final String METRICS_NAMESPACE = "BigQuerySink";
+ // Status codes
Review Comment:
nit: cleanup
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
Review Comment:
Can you add a small doc-string for this method
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
+ TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema
oldSchema)
+ throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+ // This isn't the most efficient, especially if we have deeply-nested
schemas. However we don't
+ // expect to be
+ // upgrading schemas very regularly - if we do then we have other problems!
+ Map<String, LinkedHashMap<String, TableFieldSchema>> newFields =
Maps.newHashMap();
+ Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+ for (TableRowToStorageApiProto.SchemaConversionException
schemaConversionException :
+ errorCollector.getExceptions()) {
+ if (schemaConversionException instanceof
TableRowToStorageApiProto.SchemaTooNarrowException) {
+ TableRowToStorageApiProto.SchemaTooNarrowException e =
+ (TableRowToStorageApiProto.SchemaTooNarrowException)
schemaConversionException;
+ List<String> components =
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+ String prefix = String.join(".", components.subList(0,
components.size() - 1));
+ String name = components.get(components.size() - 1);
+ TableFieldSchema.Mode mode =
+ e.isRepeated() ? TableFieldSchema.Mode.REPEATED :
TableFieldSchema.Mode.NULLABLE;
+ TableFieldSchema.Type type =
+ e.isStruct() ? TableFieldSchema.Type.STRUCT :
TableFieldSchema.Type.STRING;
Review Comment:
Maybe add a TODO that we are defaulting to STRING types for now, but would
like to add more robust handling?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
Review Comment:
nit: cleanup duplicate license header
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaUpdateHoldingFn.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * This is a stateful DoFn that buffers elements that triggered table schema
update. Once the table
+ * schema has been updated, this reprocesses the messages and allows them to
continue on through the
+ * sink.
+ */
+public class SchemaUpdateHoldingFn<DestinationT extends @NonNull Object,
ElementT>
+ extends DoFn<
+ KV<ShardedKey<DestinationT>, @Nullable ElementT>,
+ KV<DestinationT, StorageApiWritePayload>> {
+ private static final Duration POLL_DURATION = Duration.standardSeconds(1);
+
+ @StateId("bufferedElements")
+ private final StateSpec<BagState<TimestampedValue<ElementT>>> bufferedSpec;
+
+ @StateId("minBufferedTimestamp")
+ private final StateSpec<CombiningState<Long, long[], Long>>
minBufferedTsSpec;
+
+ @StateId("timerTimestamp")
+ private final StateSpec<ValueState<Long>> timerTsSpec;
+
+ @TimerId("pollTimer")
+ private final TimerSpec pollTimerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ private final ConvertMessagesDoFn<DestinationT, ElementT>
convertMessagesDoFn;
+
+ public SchemaUpdateHoldingFn(
+ Coder<ElementT> elementCoder,
+ ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn) {
+ this.convertMessagesDoFn = convertMessagesDoFn;
+ this.bufferedSpec =
StateSpecs.bag(TimestampedValue.TimestampedValueCoder.of(elementCoder));
+ this.timerTsSpec = StateSpecs.value();
+
+ Combine.BinaryCombineLongFn minCombineFn =
+ new Combine.BinaryCombineLongFn() {
+ @Override
+ public long identity() {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+ }
+
+ @Override
+ public long apply(long left, long right) {
+ return Math.min(left, right);
+ }
+ };
+ this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);
+ }
+
+ @StartBundle
+ public void startBundle() {
+ convertMessagesDoFn.startBundle();
+ ;
+ }
+
+ @Teardown
+ public void onTeardown() {
+ convertMessagesDoFn.onTeardown();
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<ShardedKey<DestinationT>, @Nullable ElementT> element,
+ @Timestamp Instant timestamp,
+ @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+ @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long>
minBufferedTimestamp,
+ @StateId("timerTimestamp") ValueState<Long> timerTs,
+ @TimerId("pollTimer") Timer pollTimer,
+ ProcessContext context,
+ BoundedWindow window,
+ MultiOutputReceiver o)
+ throws Exception {
+
convertMessagesDoFn.getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
Review Comment:
This is currently a no-op right?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -847,11 +896,40 @@ long flush(
+ failedContext.offset);
}
+ // Schema mismatched exceptions can happen if the table was
recently updated. Since
+ // vortex caches schemas
+ // we might see the new schema before vortex does. In this case,
we simply need to
+ // retry.
+ Exceptions.@Nullable StorageException storageException =
+ (error == null) ? null :
Exceptions.toStorageException(error);
+ boolean schemaMismatchError =
+ (storageException instanceof
Exceptions.SchemaMismatchedException);
+ if (!schemaMismatchError && error != null) {
+ // There's no special error code for missing required fields,
and that can also
+ // happen due to vortex
+ // being delayed at seeing a new schema. We're forced to parse
the description to
+ // determine that this
+ // has happened.
+ Status status = Status.fromThrowable(error);
+ if (status.getCode() == Status.Code.INVALID_ARGUMENT) {
+ String description = status.getDescription();
+ schemaMismatchError =
+ description != null &&
description.contains("incompatible fields");
+ }
+ }
+ if (schemaMismatchError) {
+ LOG.info(
+ "Vortex failed stream open due to incompatible fields.
This is likely because the BigTable "
+ + "schema was recently updated and Vortex hasn't
noticed yet, so retrying. error {}",
+ Preconditions.checkStateNotNull(error).toString());
+ }
+
Review Comment:
nit: large part of this is duplicated between this and
`StorageApiWritesShardedRecords`
Side note: historically there's been many overlaps between the two classes.
Would make things a lot easier to follow if there's a shared utility class
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java:
##########
@@ -32,17 +34,26 @@ public interface MessageConverter<T> {
DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns)
throws Exception;
StorageApiWritePayload toMessage(
- T element, @Nullable RowMutationInformation rowMutationInformation)
throws Exception;
+ T element,
+ @Nullable RowMutationInformation rowMutationInformation,
+ TableRowToStorageApiProto.ErrorCollector collectedExceptions)
+ throws Exception;
TableRow toFailsafeTableRow(T element);
+
+ void updateSchemaFromTable() throws IOException, InterruptedException;
Review Comment:
Can we have subclasses that don't implement this throw an
UnsupportedOperationException?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -884,72 +1003,107 @@ public void process(
}
}
};
- Instant now = Instant.now();
- List<AppendRowsContext> contexts = Lists.newArrayList();
- RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
- new RetryManager<>(
- Duration.standardSeconds(1),
- Duration.standardSeconds(20),
- maxRetries,
-
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
- int numAppends = 0;
- for (SplittingIterable.Value splitValue : messages) {
- // Handle the case of a row that is too large.
- if (splitValue.getProtoRows().getSerializedSize() >= maxRequestSize) {
- if (splitValue.getProtoRows().getSerializedRowsCount() > 1) {
- // TODO(reuvenlax): Is it worth trying to handle this case by
splitting the protoRows?
- // Given that we split
- // the ProtoRows iterable at 2MB and the max request size is 10MB,
this scenario seems
- // nearly impossible.
- LOG.error(
- "A request containing more than one row is over the request
size limit of {}. This is unexpected. All rows in the request will be sent to
the failed-rows PCollection.",
- maxRequestSize);
- }
- for (int i = 0; i <
splitValue.getProtoRows().getSerializedRowsCount(); ++i) {
- org.joda.time.Instant timestamp =
splitValue.getTimestamps().get(i);
- TableRow failedRow = splitValue.getFailsafeTableRows().get(i);
- if (failedRow == null) {
- ByteString rowBytes =
splitValue.getProtoRows().getSerializedRows(i);
- failedRow = appendClientInfo.get().toTableRow(rowBytes,
Predicates.alwaysTrue());
- }
- o.get(failedRowsTag)
- .outputWithTimestamp(
- new BigQueryStorageApiInsertError(
- failedRow,
- "Row payload too large. Maximum size " +
maxRequestSize,
- tableReference),
- timestamp);
- }
- int numRowsFailed =
splitValue.getProtoRows().getSerializedRowsCount();
- rowsSentToFailedRowsCollection.inc(numRowsFailed);
- BigQuerySinkMetrics.appendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.FAILED,
- BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
- shortTableId)
- .inc(numRowsFailed);
- } else {
- ++numAppends;
- // RetryManager
- AppendRowsContext context =
- new AppendRowsContext(
- element.getKey(),
- splitValue.getProtoRows(),
- splitValue.getTimestamps(),
- splitValue.getFailsafeTableRows());
- contexts.add(context);
- retryManager.addOperation(runOperation, onError, onSuccess, context);
-
recordsAppended.inc(splitValue.getProtoRows().getSerializedRowsCount());
-
appendSizeDistribution.update(context.protoRows.getSerializedRowsCount());
+
+ BackOff backoff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.standardSeconds(1))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxRetries(500)
+ .withThrottledTimeCounter(
+ BigQuerySinkMetrics.throttledTimeCounter(
+ BigQuerySinkMetrics.RpcMethod.OPEN_WRITE_STREAM))
+ .backoff();
+ CreateRetryManagerResult<DestinationT> createRetryManagerResult;
+ do {
+ // Each ProtoRows object contains at most 1MB of rows.
+ // TODO: Push messageFromTableRow up to top level. That we we cans
skip TableRow entirely if
+ // already proto or already schema.
+ Iterable<SplittingIterable.Value> messages =
+ new SplittingIterable(
+ element.getValue(),
+ splitSize,
+ // Unknown field merger
+ (bytes, tableRow) ->
+ appendClientInfo.get().mergeNewFields(bytes, tableRow,
ignoreUnknownValues),
+ // Convert back to TableRow
+ bytes -> appendClientInfo.get().toTableRow(bytes,
Predicates.alwaysTrue()),
+ // Failed rows consumer
+ (failedRow, errorMessage) -> {
+ o.get(failedRowsTag)
+ .outputWithTimestamp(
+ new BigQueryStorageApiInsertError(
+ failedRow.getValue(), errorMessage,
tableReference),
+ failedRow.getTimestamp());
+ rowsSentToFailedRowsCollection.inc();
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.FAILED,
+ BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
+ shortTableId)
+ .inc(1);
+ },
+ // Get the currently-known TableSchema hash
+ () -> appendClientInfo.get().getTableSchemaHash(),
+ () ->
+ TableRowToStorageApiProto.wrapDescriptorProto(
+ messageConverter.getDescriptor(false)),
+ autoUpdateSchema,
+ elementTs);
+
+ createRetryManagerResult =
Review Comment:
readability nit: declare it inline with initialization?
```suggestion
CreateRetryManagerResult<DestinationT> createRetryManagerResult =
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
+ TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema
oldSchema)
+ throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+ // This isn't the most efficient, especially if we have deeply-nested
schemas. However we don't
+ // expect to be
+ // upgrading schemas very regularly - if we do then we have other problems!
+ Map<String, LinkedHashMap<String, TableFieldSchema>> newFields =
Maps.newHashMap();
+ Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+ for (TableRowToStorageApiProto.SchemaConversionException
schemaConversionException :
+ errorCollector.getExceptions()) {
+ if (schemaConversionException instanceof
TableRowToStorageApiProto.SchemaTooNarrowException) {
+ TableRowToStorageApiProto.SchemaTooNarrowException e =
+ (TableRowToStorageApiProto.SchemaTooNarrowException)
schemaConversionException;
+ List<String> components =
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+ String prefix = String.join(".", components.subList(0,
components.size() - 1));
+ String name = components.get(components.size() - 1);
+ TableFieldSchema.Mode mode =
+ e.isRepeated() ? TableFieldSchema.Mode.REPEATED :
TableFieldSchema.Mode.NULLABLE;
+ TableFieldSchema.Type type =
+ e.isStruct() ? TableFieldSchema.Type.STRUCT :
TableFieldSchema.Type.STRING;
+ @Nullable
+ TableFieldSchema oldValue =
+ newFields
+ .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+ .put(
+ name,
+ TableFieldSchema.newBuilder()
+ .setName(name)
+ .setMode(mode)
+ .setType(type)
+ .build());
Review Comment:
Should we compare the current and new value, and keep the more "relaxed"
version of the two?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -89,6 +93,45 @@
* with the Storage write API.
*/
public class TableRowToStorageApiProto {
+
+ public static class ErrorCollector {
Review Comment:
Can we add a java-doc here too?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -425,18 +519,38 @@ public interface ThrowingBiFunction<FirstInputT,
SecondInputT, OutputT> {
})
.put(
TableFieldSchema.Type.STRING,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.put(
TableFieldSchema.Type.JSON,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.put(
TableFieldSchema.Type.GEOGRAPHY,
- (schemaInformation, value) ->
- Preconditions.checkArgumentNotNull(value).toString())
+ (fullName, value) ->
Preconditions.checkArgumentNotNull(value).toString())
.build();
+ static final HashFunction SCHEMA_HASH_FUNCTION = Hashing.goodFastHash(32);
+
+ public static byte[] tableSchemaHash(TableSchema tableSchema) {
+ return tableSchemaHash("", tableSchema.getFieldsList()).asBytes();
+ }
+
+ public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema>
fields) {
Review Comment:
And can we add some unit testing?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]