reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3121491394


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;
+        @Nullable
+        TableFieldSchema oldValue =
+            newFields
+                .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+                .put(
+                    name,
+                    TableFieldSchema.newBuilder()
+                        .setName(name)
+                        .setMode(mode)
+                        .setType(type)
+                        .build());
+        if (oldValue != null) {
+          // Duplicates are ok because we might run this over an entire 
bundle. However we must
+          // ensure that they are compatible.
+          if (!oldValue.getType().equals(type)) {
+            throw new TableRowToStorageApiProto.SchemaDoesntMatchException(
+                "Inconsistent types seen for field: " + e.getMissingField());
+          }
+        }
+      } else if (schemaConversionException
+          instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException) {
+        ((TableRowToStorageApiProto.SchemaMissingRequiredFieldException) 
schemaConversionException)
+            .getMissingFields()
+            .forEach(
+                f -> {
+                  List<String> components = 
Arrays.asList(f.toLowerCase().split("\\."));
+                  String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+                  String name = components.get(components.size() - 1);
+                  relaxedFields.computeIfAbsent(prefix, p -> 
Sets.newHashSet()).add(name);
+                });
+      } else {
+        throw new RuntimeException(
+            "Unexpected error " + schemaConversionException, 
schemaConversionException);
+      }
+    }
+    return TableSchema.newBuilder()
+        .addAllFields(
+            getIncrementalSchemaHelper(newFields, relaxedFields, 
oldSchema.getFieldsList(), ""))
+        .build();
+  }
+
+  private static List<TableFieldSchema> getIncrementalSchemaHelper(
+      Map<String, LinkedHashMap<String, TableFieldSchema>> newFields,
+      Map<String, Set<String>> relaxedFields,
+      List<TableFieldSchema> tableFields,
+      String prefix) {
+    List<TableFieldSchema> fields = Lists.newArrayList();
+
+    Set<String> fieldsToRelax = relaxedFields.getOrDefault(prefix, 
Collections.emptySet());
+    // Add existing fields in the same order.
+    for (TableFieldSchema fieldSchema : tableFields) {
+      String fieldName = fieldSchema.getName().toLowerCase();
+      TableFieldSchema.Builder clonedField = null;
+      if (fieldsToRelax.contains(fieldName)) {
+        // Since we're only generating the incremental schema, existing fields 
are only examined if
+        // they change -
+        // i.e. they are relaxed.
+        clonedField = fieldSchema.toBuilder();
+        clonedField.setMode(TableFieldSchema.Mode.NULLABLE);
+      }
+      if (fieldSchema.getType().equals(TableFieldSchema.Type.STRUCT)) {
+        // Recursively walk the schema, looking for more field relaxations.
+        String newPrefix = prefix.isEmpty() ? fieldName : String.join(".", 
prefix, fieldName);
+        List<TableFieldSchema> newSubfields =
+            getIncrementalSchemaHelper(
+                newFields, relaxedFields, fieldSchema.getFieldsList(), 
newPrefix);
+        if (!newSubfields.isEmpty()) {
+          if (clonedField == null) {
+            clonedField = fieldSchema.toBuilder();
+          }
+          clonedField.clearFields();
+          clonedField.addAllFields(newSubfields);
+        }
+      }
+      if (clonedField != null) {
+        fields.add(clonedField.build());
+      }
+    }
+
+    LinkedHashMap<String, TableFieldSchema> fieldsToAdd =
+        newFields.getOrDefault(prefix, new LinkedHashMap<>());
+    for (Map.Entry<String, TableFieldSchema> entry : fieldsToAdd.entrySet()) {
+      TableFieldSchema.Builder field = entry.getValue().toBuilder();
+      // We rely on the exception telling us intermediate struct fields.
+      if (field.getType().equals(TableFieldSchema.Type.STRUCT)) {
+        String fieldName = field.getName().toLowerCase();
+        String newPrefix = prefix.isEmpty() ? fieldName : String.join(".", 
prefix, fieldName);
+        field.addAllFields(
+            getIncrementalSchemaHelper(
+                newFields, relaxedFields, Collections.emptyList(), newPrefix));
+      }
+      fields.add(field.build());
+    }
+
+    return fields;
+  }
+
+  // Merge two schemas. schema1 is considered the primary schema, and will 
control what order
+  // overlapping fields
+  // are created in the final schema.
+  public static TableSchema mergeSchemas(TableSchema schema1, TableSchema 
schema2) {
+    List<TableFieldSchema> mergedFields =
+        mergeFields(schema1.getFieldsList(), schema2.getFieldsList());
+    return TableSchema.newBuilder().addAllFields(mergedFields).build();
+  }
+
+  private static List<TableFieldSchema> mergeFields(
+      List<TableFieldSchema> fields1, List<TableFieldSchema> fields2) {
+    // Use LinkedHashMap to preserve the order of fields
+    Map<String, TableFieldSchema> mergedFieldsMap = Maps.newLinkedHashMap();
+
+    // Add all fields from schema 1.
+    fields1.forEach(f -> mergedFieldsMap.put(f.getName().toLowerCase(), f));
+
+    // Merge or append fields from schema 2
+    for (TableFieldSchema f2 : fields2) {
+      String lowerName = f2.getName().toLowerCase();
+      mergedFieldsMap.compute(lowerName, (k, v) -> v == null ? f2 : 
mergeField(v, f2));
+    }
+    return Lists.newArrayList(mergedFieldsMap.values());
+  }
+
+  private static TableFieldSchema mergeField(TableFieldSchema f1, 
TableFieldSchema f2) {
+    if (!f1.getType().equals(f2.getType())) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Conflicting field types for field '%s': %s vs %s",
+              f1.getName(), f1.getType(), f2.getType()));
+    }
+
+    TableFieldSchema.Builder builder = f1.toBuilder().mergeFrom(f2);
+    builder.clearFields();
+
+    // Handle mode weakening (REPEATED > NULLABLE > REQUIRED)
+    TableFieldSchema.Mode mode1 =
+        f1.getMode() == TableFieldSchema.Mode.MODE_UNSPECIFIED
+            ? TableFieldSchema.Mode.NULLABLE
+            : f1.getMode();
+    TableFieldSchema.Mode mode2 =
+        f2.getMode() == TableFieldSchema.Mode.MODE_UNSPECIFIED
+            ? TableFieldSchema.Mode.NULLABLE
+            : f2.getMode();
+
+    if (mode1 == TableFieldSchema.Mode.REPEATED || mode2 == 
TableFieldSchema.Mode.REPEATED) {
+      // Any field can be relaxed into a repeated field.
+      builder.setMode(TableFieldSchema.Mode.REPEATED);

Review Comment:
   I just had this conversation with Yiru.... I was pretty sure that you could, 
but indeed the documentation doesn't support that. For now will disable this. 
Also the subsequent case is also wrong (you can't turn a REPEATED field into a 
NULLABLE field)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -89,6 +93,45 @@
  * with the Storage write API.
  */
 public class TableRowToStorageApiProto {
+
+  public static class ErrorCollector {

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.UnknownFieldSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.util.ThrowingSupplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class UpgradeTableSchema {
+  public static TableRowToStorageApiProto.ErrorCollector newErrorCollector() {
+    return new TableRowToStorageApiProto.ErrorCollector(
+        e ->
+            (e instanceof TableRowToStorageApiProto.SchemaTooNarrowException
+                    && !((TableRowToStorageApiProto.SchemaTooNarrowException) 
e)
+                        .getMissingField()
+                        .isEmpty())
+                || e instanceof 
TableRowToStorageApiProto.SchemaMissingRequiredFieldException);
+  }
+
+  public static TableSchema getIncrementalSchema(
+      TableRowToStorageApiProto.ErrorCollector errorCollector, TableSchema 
oldSchema)
+      throws TableRowToStorageApiProto.SchemaDoesntMatchException {
+    // This isn't the most efficient, especially if we have deeply-nested 
schemas. However we don't
+    // expect to be
+    // upgrading schemas very regularly - if we do then we have other problems!
+    Map<String, LinkedHashMap<String, TableFieldSchema>> newFields = 
Maps.newHashMap();
+    Map<String, Set<String>> relaxedFields = Maps.newHashMap();
+
+    for (TableRowToStorageApiProto.SchemaConversionException 
schemaConversionException :
+        errorCollector.getExceptions()) {
+      if (schemaConversionException instanceof 
TableRowToStorageApiProto.SchemaTooNarrowException) {
+        TableRowToStorageApiProto.SchemaTooNarrowException e =
+            (TableRowToStorageApiProto.SchemaTooNarrowException) 
schemaConversionException;
+        List<String> components = 
Arrays.asList(e.getMissingField().toLowerCase().split("\\."));
+        String prefix = String.join(".", components.subList(0, 
components.size() - 1));
+        String name = components.get(components.size() - 1);
+        TableFieldSchema.Mode mode =
+            e.isRepeated() ? TableFieldSchema.Mode.REPEATED : 
TableFieldSchema.Mode.NULLABLE;
+        TableFieldSchema.Type type =
+            e.isStruct() ? TableFieldSchema.Type.STRUCT : 
TableFieldSchema.Type.STRING;
+        @Nullable
+        TableFieldSchema oldValue =
+            newFields
+                .computeIfAbsent(prefix, p -> Maps.newLinkedHashMap())
+                .put(
+                    name,
+                    TableFieldSchema.newBuilder()
+                        .setName(name)
+                        .setMode(mode)
+                        .setType(type)
+                        .build());

Review Comment:
   Yes - that's what getIncrementalSchemaHelper does



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to