This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b3b18c  Ensure that all nested schemas are given ids, and fix bug 
where nullable was not propagated to the proto.
     new 2738c16  Merge pull request #8422: [BEAM-7002] Fix failures in 
SchemaCoder
0b3b18c is described below

commit 0b3b18c60db373917c9626e6b4b57b83d4e0ee58
Author: Reuven Lax <re...@relax-macbookpro3.roam.corp.google.com>
AuthorDate: Sat Apr 27 05:33:03 2019 -0700

    Ensure that all nested schemas are given ids, and fix bug where nullable
    was not propagated to the proto.
---
 .../core/construction/SchemaTranslation.java       |  1 +
 .../java/org/apache/beam/sdk/coders/RowCoder.java  | 43 +++++++++++++++++++++-
 .../apache/beam/sdk/coders/RowCoderGenerator.java  | 17 ++++++---
 .../org/apache/beam/sdk/schemas/SchemaCoder.java   |  5 +++
 4 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java
index 86de2b8..90af770 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java
@@ -112,6 +112,7 @@ public class SchemaTranslation {
       default:
         break;
     }
+    builder.setNullable(fieldType.getNullable());
     return builder.build();
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 90a6def..dd507d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -90,19 +90,52 @@ public class RowCoder extends CustomCoder<Row> {
           schema.getUUID(),
           id);
     } else {
+      // Clone the schema before modifying the Java object.
       schema = SerializableUtils.clone(schema);
-      schema.setUUID(id);
+      setSchemaIds(schema, id);
     }
     this.schema = schema;
     this.id = id;
   }
 
+  // Sets the schema id, and then recursively ensures that all schemas have 
ids set.
+  private void setSchemaIds(Schema schema, UUID id) {
+    if (schema.getUUID() == null) {
+      schema.setUUID(id);
+    }
+    for (Field field : schema.getFields()) {
+      setSchemaIds(field.getType());
+    }
+  }
+
+  private void setSchemaIds(FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case ROW:
+        setSchemaIds(fieldType.getRowSchema(), UUID.randomUUID());
+        return;
+      case MAP:
+        setSchemaIds(fieldType.getMapKeyType());
+        setSchemaIds(fieldType.getMapValueType());
+        return;
+      case LOGICAL_TYPE:
+        setSchemaIds(fieldType.getLogicalType().getBaseType());
+        return;
+
+      case ARRAY:
+        setSchemaIds(fieldType.getCollectionElementType());
+        return;
+
+      default:
+        return;
+    }
+  }
+
   // Return the generated coder class for this schema.
   private Coder<Row> getDelegateCoder() {
     if (delegateCoder == null) {
       // RowCoderGenerator caches based on id, so if a new instance of this 
RowCoder is
       // deserialized, we don't need to run ByteBuddy again to construct the 
class.
-      delegateCoder = RowCoderGenerator.generate(schema, id);
+      delegateCoder = RowCoderGenerator.generate(schema);
     }
     return delegateCoder;
   }
@@ -208,4 +241,10 @@ public class RowCoder extends CustomCoder<Row> {
         return ESTIMATED_FIELD_SIZES.get(typeDescriptor.getTypeName());
     }
   }
+
+  @Override
+  public String toString() {
+    String string = "Schema: " + schema + "  UUID: " + id + " delegateCoder: " 
+ getDelegateCoder();
+    return string;
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 994d695..351b04e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -59,7 +61,7 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
  * A utility for automatically generating a {@link Coder} for {@link Row} 
objects corresponding to a
  * specific schema. The resulting coder is loaded into the default ClassLoader 
and returned.
  *
- * <p>When {@link RowCoderGenerator#generate(Schema, UUID)} is called, a new 
subclass of {@literal
+ * <p>When {@link RowCoderGenerator#generate(Schema)} is called, a new 
subclass of {@literal
  * Coder<Row>} is generated for the specified schema. This class is generated 
using low-level
  * bytecode generation, and hardcodes encodings for all fields of the Schema. 
Empirically, this is
  * 30-40% faster than a coder that introspects the schema.
@@ -124,10 +126,10 @@ public abstract class RowCoderGenerator {
   }
 
   @SuppressWarnings("unchecked")
-  public static Coder<Row> generate(Schema schema, UUID coderId) {
+  public static Coder<Row> generate(Schema schema) {
     // Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of 
nested
     // coders. Using HashMap::computeIfAbsent generates 
ConcurrentModificationExceptions in Java 11.
-    Coder<Row> rowCoder = generatedCoders.get(coderId);
+    Coder<Row> rowCoder = generatedCoders.get(schema.getUUID());
     if (rowCoder == null) {
       TypeDescription.Generic coderType =
           TypeDescription.Generic.Builder.parameterizedType(Coder.class, 
Row.class).build();
@@ -149,7 +151,7 @@ public abstract class RowCoderGenerator {
           | InvocationTargetException e) {
         throw new RuntimeException("Unable to generate coder for schema " + 
schema);
       }
-      generatedCoders.put(coderId, rowCoder);
+      generatedCoders.put(schema.getUUID(), rowCoder);
     }
     return rowCoder;
   }
@@ -220,6 +222,8 @@ public abstract class RowCoderGenerator {
     static void encodeDelegate(
         Coder[] coders, Row value, OutputStream outputStream, boolean 
hasNullableFields)
         throws IOException {
+      checkState(value.getFieldCount() == value.getSchema().getFieldCount());
+
       // Encode the field count. This allows us to handle compatible schema 
changes.
       VAR_INT_CODER.encode(value.getFieldCount(), outputStream);
       // Encode a bitmap for the null fields to save having to encode a bunch 
of nulls.
@@ -294,6 +298,7 @@ public abstract class RowCoderGenerator {
     static Row decodeDelegate(Schema schema, Coder[] coders, InputStream 
inputStream)
         throws IOException {
       int fieldCount = VAR_INT_CODER.decode(inputStream);
+
       BitSet nullFields = NULL_LIST_CODER.decode(inputStream);
       List<Object> fieldValues = Lists.newArrayListWithCapacity(coders.length);
       for (int i = 0; i < fieldCount; ++i) {
@@ -367,8 +372,8 @@ public abstract class RowCoderGenerator {
     } else if (TypeName.MAP.equals(fieldType.getTypeName())) {
       return mapCoder(fieldType.getMapKeyType(), fieldType.getMapValueType());
     } else if (TypeName.ROW.equals(fieldType.getTypeName())) {
-      Coder<Row> nestedCoder = generate(fieldType.getRowSchema(), 
UUID.randomUUID());
-      RowCoder.of(fieldType.getRowSchema());
+      checkState(fieldType.getRowSchema().getUUID() != null);
+      Coder<Row> nestedCoder = generate(fieldType.getRowSchema());
       return rowCoder(nestedCoder.getClass());
     } else {
       StackManipulation primitiveCoder = 
coderForPrimitiveType(fieldType.getTypeName());
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index ff603d2..0199534 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -95,4 +95,9 @@ public class SchemaCoder<T> extends CustomCoder<T> {
   public boolean consistentWithEquals() {
     return rowCoder.consistentWithEquals();
   }
+
+  @Override
+  public String toString() {
+    return "SchemaCoder: " + rowCoder.toString();
+  }
 }

Reply via email to