reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3121490687
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -46,6 +46,7 @@ public class BigQuerySinkMetrics {
public static final String METRICS_NAMESPACE = "BigQuerySink";
+ // Status codes
Review Comment:
fixed
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -585,12 +699,25 @@ private SchemaInformation(
}
}
+ // Returns a SchemaInformation that descends from this one. Does not
modify the current
+ // SchemaInformation -
+ // the new SchemaInformation is traversable upwards only.
+ public SchemaInformation createDescendent(TableFieldSchema
tableFieldSchema) {
+ return new SchemaInformation(
+ tableFieldSchema, Iterables.concat(this.parentSchemas,
ImmutableList.of(this)));
+ }
+
public String getFullName() {
- String prefix =
- StreamSupport.stream(parentSchemas.spliterator(), false)
- .map(SchemaInformation::getName)
- .collect(Collectors.joining("."));
- return prefix.isEmpty() ? getName() : prefix + "." + getName();
+ if (!Iterables.isEmpty(parentSchemas)) {
+ String prefix =
+ StreamSupport.stream(parentSchemas.spliterator(), false)
+ .skip(1)
+ .map(SchemaInformation::getName)
+ .collect(Collectors.joining("."));
+ return prefix.isEmpty() ? getName() : prefix + "." + getName();
+ } else {
+ return "";
Review Comment:
Added comment explaining why not
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
Review Comment:
done. Also added a unit-test for this file.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
+ TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema
oldSchema)
+ throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+ // This isn't the most efficient, especially if we have deeply-nested
schemas. However we don't
+ // expect to be
+ // upgrading schemas very regularly - if we do then we have other problems!
+ Map<String, LinkedHashMap<String, TableFieldSchema>> newFields =
Maps.newHashMap();
+ Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+ for (TableRowToStorageApiProto.SchemaConversionException
schemaConversionException :
+ errorCollector.getExceptions()) {
+ if (schemaConversionException instanceof
TableRowToStorageApiProto.SchemaTooNarrowException) {
+ TableRowToStorageApiProto.SchemaTooNarrowException e =
+ (TableRowToStorageApiProto.SchemaTooNarrowException)
schemaConversionException;
+ List<String> components =
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+ String prefix = String.join(".", components.subList(0,
components.size() - 1));
+ String name = components.get(components.size() - 1);
+ TableFieldSchema.Mode mode =
+ e.isRepeated() ? TableFieldSchema.Mode.REPEATED :
TableFieldSchema.Mode.NULLABLE;
+ TableFieldSchema.Type type =
+ e.isStruct() ? TableFieldSchema.Type.STRUCT :
TableFieldSchema.Type.STRING;
+ @Nullable
+ TableFieldSchema oldValue =
+ newFields
+ .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+ .put(
+ name,
+ TableFieldSchema.newBuilder()
+ .setName(name)
+ .setMode(mode)
+ .setType(type)
+ .build());
+ if (oldValue != null) {
+ // Duplicates are ok because we might run this over an entire
bundle. However we must
+ // ensure that they are compatible.
+ if (!oldValue.getType().equals(type)) {
+ throw new TableRowToStorageApiProto.SchemaDoesntMatchException(
+ "Inconsistent types seen for field: " + e.getMissingField());
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ConvertMessagesDoFn.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * DoFn that interacts with the StorageApiDynamicDestinations instance to
convert messages to
+ * StorageApiWritePaylod. Messages that fail to convert are routed to the
dead-letter PCollection.
+ * If schemaUpdateOptions are set, then messages that fail to convert due to
missing columns are
+ * routed to a buffering transform that holds them until the table's schema
has been updated.
+ */
+public class ConvertMessagesDoFn<DestinationT extends @NonNull Object,
ElementT>
+ extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT,
StorageApiWritePayload>> {
+ private final StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations;
+ private TwoLevelMessageConverterCache<DestinationT, ElementT>
messageConverters;
+ private final BigQueryServices bqServices;
+ private final TupleTag<BigQueryStorageApiInsertError> failedWritesTag;
+ private final TupleTag<KV<DestinationT, StorageApiWritePayload>>
successfulWritesTag;
+ private final TupleTag<KV<@NonNull DestinationT, TableSchema>>
patchTableSchemaTag;
+ private final TupleTag<KV<@NonNull DestinationT, ElementT>>
retryElementsWaitingForSchemaTag;
+ private final @Nullable SerializableFunction<ElementT,
RowMutationInformation> rowMutationFn;
+ private final BadRecordRouter badRecordRouter;
+ private final Coder<KV<DestinationT, ElementT>> elementCoder;
+ private final Map<DestinationT, BufferedCollectorInformation>
errorCollectors = Maps.newHashMap();
+ private final boolean hasSchemaUpdateOptions;
+ private transient BigQueryServices.@Nullable DatasetService
datasetServiceInternal = null;
+ private transient BigQueryServices.@Nullable WriteStreamService
writeStreamServiceInternal = null;
+
+ static final class BufferedCollectorInformation {
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+ public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+ return new TableRowToStorageApiProto.ErrorCollector(
+ e ->
+ (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+ && !((TableRowToStorageApiProto.SchemaTooNarrowException)
e)
+ .getMissingField()
+ .isEmpty())
+ || e instanceof
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+ }
+
+ public static TableSchema getIncrementalSchema(
+ TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema
oldSchema)
+ throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+ // This isn't the most efficient, especially if we have deeply-nested
schemas. However we don't
+ // expect to be
+ // upgrading schemas very regularly - if we do then we have other problems!
+ Map<String, LinkedHashMap<String, TableFieldSchema>> newFields =
Maps.newHashMap();
+ Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+ for (TableRowToStorageApiProto.SchemaConversionException
schemaConversionException :
+ errorCollector.getExceptions()) {
+ if (schemaConversionException instanceof
TableRowToStorageApiProto.SchemaTooNarrowException) {
+ TableRowToStorageApiProto.SchemaTooNarrowException e =
+ (TableRowToStorageApiProto.SchemaTooNarrowException)
schemaConversionException;
+ List<String> components =
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+ String prefix = String.join(".", components.subList(0,
components.size() - 1));
+ String name = components.get(components.size() - 1);
+ TableFieldSchema.Mode mode =
+ e.isRepeated() ? TableFieldSchema.Mode.REPEATED :
TableFieldSchema.Mode.NULLABLE;
+ TableFieldSchema.Type type =
+ e.isStruct() ? TableFieldSchema.Type.STRUCT :
TableFieldSchema.Type.STRING;
Review Comment:
Done (this will be a follow-on PR)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]