This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0b3b18c Ensure that all nested schemas are given ids, and fix bug where nullable was not propagated to the proto. new 2738c16 Merge pull request #8422: [BEAM-7002] Fix failures in SchemaCoder 0b3b18c is described below commit 0b3b18c60db373917c9626e6b4b57b83d4e0ee58 Author: Reuven Lax <re...@relax-macbookpro3.roam.corp.google.com> AuthorDate: Sat Apr 27 05:33:03 2019 -0700 Ensure that all nested schemas are given ids, and fix bug where nullable was not propagated to the proto. --- .../core/construction/SchemaTranslation.java | 1 + .../java/org/apache/beam/sdk/coders/RowCoder.java | 43 +++++++++++++++++++++- .../apache/beam/sdk/coders/RowCoderGenerator.java | 17 ++++++--- .../org/apache/beam/sdk/schemas/SchemaCoder.java | 5 +++ 4 files changed, 58 insertions(+), 8 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java index 86de2b8..90af770 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java @@ -112,6 +112,7 @@ public class SchemaTranslation { default: break; } + builder.setNullable(fieldType.getNullable()); return builder.build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index 90a6def..dd507d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -90,19 +90,52 @@ public class RowCoder extends CustomCoder<Row> { schema.getUUID(), id); } else { + // Clone the schema before modifying the Java object. schema = SerializableUtils.clone(schema); - schema.setUUID(id); + setSchemaIds(schema, id); } this.schema = schema; this.id = id; } + // Sets the schema id, and then recursively ensures that all schemas have ids set. + private void setSchemaIds(Schema schema, UUID id) { + if (schema.getUUID() == null) { + schema.setUUID(id); + } + for (Field field : schema.getFields()) { + setSchemaIds(field.getType()); + } + } + + private void setSchemaIds(FieldType fieldType) { + switch (fieldType.getTypeName()) { + case ROW: + setSchemaIds(fieldType.getRowSchema(), UUID.randomUUID()); + return; + case MAP: + setSchemaIds(fieldType.getMapKeyType()); + setSchemaIds(fieldType.getMapValueType()); + return; + case LOGICAL_TYPE: + setSchemaIds(fieldType.getLogicalType().getBaseType()); + return; + + case ARRAY: + setSchemaIds(fieldType.getCollectionElementType()); + return; + + default: + return; + } + } + // Return the generated coder class for this schema. private Coder<Row> getDelegateCoder() { if (delegateCoder == null) { // RowCoderGenerator caches based on id, so if a new instance of this RowCoder is // deserialized, we don't need to run ByteBuddy again to construct the class. - delegateCoder = RowCoderGenerator.generate(schema, id); + delegateCoder = RowCoderGenerator.generate(schema); } return delegateCoder; } @@ -208,4 +241,10 @@ public class RowCoder extends CustomCoder<Row> { return ESTIMATED_FIELD_SIZES.get(typeDescriptor.getTypeName()); } } + + @Override + public String toString() { + String string = "Schema: " + schema + " UUID: " + id + " delegateCoder: " + getDelegateCoder(); + return string; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java index 994d695..351b04e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.coders; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -59,7 +61,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps; * A utility for automatically generating a {@link Coder} for {@link Row} objects corresponding to a * specific schema. The resulting coder is loaded into the default ClassLoader and returned. * - * <p>When {@link RowCoderGenerator#generate(Schema, UUID)} is called, a new subclass of {@literal + * <p>When {@link RowCoderGenerator#generate(Schema)} is called, a new subclass of {@literal * Coder<Row>} is generated for the specified schema. This class is generated using low-level * bytecode generation, and hardcodes encodings for all fields of the Schema. Empirically, this is * 30-40% faster than a coder that introspects the schema. @@ -124,10 +126,10 @@ public abstract class RowCoderGenerator { } @SuppressWarnings("unchecked") - public static Coder<Row> generate(Schema schema, UUID coderId) { + public static Coder<Row> generate(Schema schema) { // Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of nested // coders. Using HashMap::computeIfAbsent generates ConcurrentModificationExceptions in Java 11. - Coder<Row> rowCoder = generatedCoders.get(coderId); + Coder<Row> rowCoder = generatedCoders.get(schema.getUUID()); if (rowCoder == null) { TypeDescription.Generic coderType = TypeDescription.Generic.Builder.parameterizedType(Coder.class, Row.class).build(); @@ -149,7 +151,7 @@ public abstract class RowCoderGenerator { | InvocationTargetException e) { throw new RuntimeException("Unable to generate coder for schema " + schema); } - generatedCoders.put(coderId, rowCoder); + generatedCoders.put(schema.getUUID(), rowCoder); } return rowCoder; } @@ -220,6 +222,8 @@ public abstract class RowCoderGenerator { static void encodeDelegate( Coder[] coders, Row value, OutputStream outputStream, boolean hasNullableFields) throws IOException { + checkState(value.getFieldCount() == value.getSchema().getFieldCount()); + // Encode the field count. This allows us to handle compatible schema changes. VAR_INT_CODER.encode(value.getFieldCount(), outputStream); // Encode a bitmap for the null fields to save having to encode a bunch of nulls. @@ -294,6 +298,7 @@ public abstract class RowCoderGenerator { static Row decodeDelegate(Schema schema, Coder[] coders, InputStream inputStream) throws IOException { int fieldCount = VAR_INT_CODER.decode(inputStream); + BitSet nullFields = NULL_LIST_CODER.decode(inputStream); List<Object> fieldValues = Lists.newArrayListWithCapacity(coders.length); for (int i = 0; i < fieldCount; ++i) { @@ -367,8 +372,8 @@ public abstract class RowCoderGenerator { } else if (TypeName.MAP.equals(fieldType.getTypeName())) { return mapCoder(fieldType.getMapKeyType(), fieldType.getMapValueType()); } else if (TypeName.ROW.equals(fieldType.getTypeName())) { - Coder<Row> nestedCoder = generate(fieldType.getRowSchema(), UUID.randomUUID()); - RowCoder.of(fieldType.getRowSchema()); + checkState(fieldType.getRowSchema().getUUID() != null); + Coder<Row> nestedCoder = generate(fieldType.getRowSchema()); return rowCoder(nestedCoder.getClass()); } else { StackManipulation primitiveCoder = coderForPrimitiveType(fieldType.getTypeName()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index ff603d2..0199534 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -95,4 +95,9 @@ public class SchemaCoder<T> extends CustomCoder<T> { public boolean consistentWithEquals() { return rowCoder.consistentWithEquals(); } + + @Override + public String toString() { + return "SchemaCoder: " + rowCoder.toString(); + } }