reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3121490687


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -46,6 +46,7 @@ public class BigQuerySinkMetrics {
 
   public static final String METRICS_NAMESPACE = "BigQuerySink";
 
+  // Status codes

Review Comment:
   fixed



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -585,12 +699,25 @@ private SchemaInformation(
       }
     }
 
+    // Returns a SchemaInformation that descends from this one. Does not 
modify the current
+    // SchemaInformation -
+    // the new SchemaInformation is traversable upwards only.
+    public SchemaInformation createDescendent(TableFieldSchema 
tableFieldSchema) {
+      return new SchemaInformation(
+          tableFieldSchema, Iterables.concat(this.parentSchemas, 
ImmutableList.of(this)));
+    }
+
     public String getFullName() {
-      String prefix =
-          StreamSupport.stream(parentSchemas.spliterator(), false)
-              .map(SchemaInformation::getName)
-              .collect(Collectors.joining("."));
-      return prefix.isEmpty() ? getName() : prefix + "." + getName();
+      if (!Iterables.isEmpty(parentSchemas)) {
+        String prefix =
+            StreamSupport.stream(parentSchemas.spliterator(), false)
+                .skip(1)
+                .map(SchemaInformation::getName)
+                .collect(Collectors.joining("."));
+        return prefix.isEmpty() ? getName() : prefix + "." + getName();
+      } else {
+        return "";

Review Comment:
   Added comment explaining why not



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(

Review Comment:
   done. Also added a unit-test for this file.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;
+        @Nullable
+        TableFieldSchema oldValue =
+            newFields
+                .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+                .put(
+                    name,
+                    TableFieldSchema.newBuilder()
+                        .setName(name)
+                        .setMode(mode)
+                        .setType(type)
+                        .build());
+        if (oldValue != null) {
+          // Duplicates are ok because we might run this over an entire 
bundle. However we must
+          // ensure that they are compatible.
+          if (!oldValue.getType().equals(type)) {
+            throw new TableRowToStorageApiProto.SchemaDoesntMatchException(
+                "Inconsistent types seen for field: " + e.getMissingField());

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ConvertMessagesDoFn.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * DoFn that interacts with the StorageApiDynamicDestinations instance to 
convert messages to
+ * StorageApiWritePaylod. Messages that fail to convert are routed to the 
dead-letter PCollection.
+ * If schemaUpdateOptions are set, then messages that fail to convert due to 
missing columns are
+ * routed to a buffering transform that holds them until the table's schema 
has been updated.
+ */
+public class ConvertMessagesDoFn<DestinationT extends @NonNull Object, 
ElementT>
+    extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT, 
StorageApiWritePayload>> {
+  private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+  private TwoLevelMessageConverterCache<DestinationT, ElementT> 
messageConverters;
+  private final BigQueryServices bqServices;
+  private final TupleTag<BigQueryStorageApiInsertError> failedWritesTag;
+  private final TupleTag<KV<DestinationT, StorageApiWritePayload>> 
successfulWritesTag;
+  private final TupleTag<KV<@NonNull DestinationT, TableSchema>> 
patchTableSchemaTag;
+  private final TupleTag<KV<@NonNull DestinationT, ElementT>> 
retryElementsWaitingForSchemaTag;
+  private final @Nullable SerializableFunction<ElementT, 
RowMutationInformation> rowMutationFn;
+  private final BadRecordRouter badRecordRouter;
+  private final Coder<KV<DestinationT, ElementT>> elementCoder;
+  private final Map<DestinationT, BufferedCollectorInformation> 
errorCollectors = Maps.newHashMap();
+  private final boolean hasSchemaUpdateOptions;
+  private transient BigQueryServices.@Nullable DatasetService 
datasetServiceInternal = null;
+  private transient BigQueryServices.@Nullable WriteStreamService 
writeStreamServiceInternal = null;
+
+  static final class BufferedCollectorInformation {

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;

Review Comment:
   Done (this will be a follow-on PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to