This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 50fcf55 Merge pull request #14639: [BEAM-10277][BEAM-12198] Populate encoding positions to all matching schemas 50fcf55 is described below commit 50fcf55d60e8fa7a8399d63e030c930a2d45402a Author: reuvenlax <re...@google.com> AuthorDate: Wed Apr 28 21:13:57 2021 -0700 Merge pull request #14639: [BEAM-10277][BEAM-12198] Populate encoding positions to all matching schemas --- model/pipeline/src/main/proto/schema.proto | 4 ++ .../util/RowCoderCloudObjectTranslator.java | 6 +++ .../util/SchemaCoderCloudObjectTranslator.java | 6 +++ .../java/org/apache/beam/sdk/coders/RowCoder.java | 7 +++ .../apache/beam/sdk/coders/RowCoderGenerator.java | 9 +++- .../java/org/apache/beam/sdk/schemas/Schema.java | 8 ++++ .../org/apache/beam/sdk/schemas/SchemaCoder.java | 6 +++ .../apache/beam/sdk/schemas/SchemaTranslation.java | 2 +- .../main/java/org/apache/beam/sdk/values/Row.java | 56 +++++++++++++++++++++- .../impl/transform/BeamSqlOutputToConsoleFn.java | 2 +- .../apache/beam/sdk/extensions/sql/TestUtils.java | 6 +-- 11 files changed, 104 insertions(+), 8 deletions(-) diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto index a40087c..837689f 100644 --- a/model/pipeline/src/main/proto/schema.proto +++ b/model/pipeline/src/main/proto/schema.proto @@ -37,6 +37,8 @@ message Schema { // REQUIRED. An RFC 4122 UUID. string id = 2; repeated Option options = 3; + // Indicates that encoding positions have been overridden. + bool encoding_positions_set = 4; } message Field { @@ -52,6 +54,8 @@ message Field { // or all of them are. Used to support backwards compatibility with schema // changes. // If no fields have encoding position populated the order of encoding is the same as the order in the Schema. + // If this Field is part of a Schema where encoding_positions_set is True then encoding_position must be + // defined, otherwise this field is ignored. int32 encoding_position = 5; repeated Option options = 6; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java index 6de8321..e84d89b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.dataflow.util; import java.io.IOException; +import java.util.UUID; +import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.annotations.Experimental; @@ -59,6 +61,10 @@ public class RowCoderCloudObjectTranslator implements CloudObjectTranslator<RowC SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder(); JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder); Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build()); + @Nullable UUID uuid = schema.getUUID(); + if (schema.isEncodingPositionsOverridden() && uuid != null) { + RowCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions()); + } return RowCoder.of(schema); } catch (IOException e) { throw new RuntimeException(e); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java index da7a7d8..efd6e3c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.dataflow.util; import java.io.IOException; +import java.util.UUID; +import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.sdk.annotations.Experimental; @@ -99,6 +101,10 @@ public class SchemaCoderCloudObjectTranslator implements CloudObjectTranslator<S SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder(); JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder); Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build()); + @Nullable UUID uuid = schema.getUUID(); + if (schema.isEncodingPositionsOverridden() && uuid != null) { + SchemaCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions()); + } return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction); } catch (IOException e) { throw new RuntimeException(e); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index 9ef658d..2d51411 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.coders; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.Schema; @@ -34,6 +36,11 @@ public class RowCoder extends SchemaCoder<Row> { return new RowCoder(schema); } + /** Override encoding positions for the given schema. */ + public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) { + SchemaCoder.overrideEncodingPositions(uuid, encodingPositions); + } + private RowCoder(Schema schema) { super( schema, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java index 46c7758..a8d45b7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java @@ -112,9 +112,15 @@ public abstract class RowCoderGenerator { // Cache for Coder class that are already generated. private static final Map<UUID, Coder<Row>> GENERATED_CODERS = Maps.newConcurrentMap(); + private static final Map<UUID, Map<String, Integer>> ENCODING_POSITION_OVERRIDES = + Maps.newConcurrentMap(); private static final Logger LOG = LoggerFactory.getLogger(RowCoderGenerator.class); + public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) { + ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions); + } + @SuppressWarnings("unchecked") public static Coder<Row> generate(Schema schema) { // Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of nested @@ -128,7 +134,8 @@ public abstract class RowCoderGenerator { builder = implementMethods(schema, builder); int[] encodingPosToRowIndex = new int[schema.getFieldCount()]; - Map<String, Integer> encodingPositions = schema.getEncodingPositions(); + Map<String, Integer> encodingPositions = + ENCODING_POSITION_OVERRIDES.getOrDefault(schema.getUUID(), schema.getEncodingPositions()); for (int recordIndex = 0; recordIndex < schema.getFieldCount(); ++recordIndex) { String name = schema.getField(recordIndex).getName(); int encodingPosition = encodingPositions.get(name); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index 4acf367..2d35ab0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -96,6 +96,7 @@ public class Schema implements Serializable { // A mapping between field names an indices. private final BiMap<String, Integer> fieldIndices = HashBiMap.create(); private Map<String, Integer> encodingPositions = Maps.newHashMap(); + private boolean encodingPositionsOverridden = false; private final List<Field> fields; // Cache the hashCode, so it doesn't have to be recomputed. Schema objects are immutable, so this @@ -287,9 +288,15 @@ public class Schema implements Serializable { return encodingPositions; } + /** Returns whether encoding positions have been explicitly overridden. */ + public boolean isEncodingPositionsOverridden() { + return encodingPositionsOverridden; + } + /** Sets the encoding positions for this schema. */ public void setEncodingPositions(Map<String, Integer> encodingPositions) { this.encodingPositions = encodingPositions; + this.encodingPositionsOverridden = true; } /** Get this schema's UUID. */ @@ -398,6 +405,7 @@ public class Schema implements Serializable { builder.append(System.lineSeparator()); builder.append("Options:"); builder.append(options); + builder.append("UUID: " + uuid); return builder.toString(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index 015ff73..fe097d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; import java.util.Objects; import java.util.UUID; import org.apache.beam.sdk.annotations.Experimental; @@ -89,6 +90,11 @@ public class SchemaCoder<T> extends CustomCoder<T> { return RowCoder.of(schema); } + /** Override encoding positions for the given schema. */ + public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) { + RowCoderGenerator.overrideEncodingPositions(uuid, encodingPositions); + } + /** Returns the schema associated with this type. */ public Schema getSchema() { return schema; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index 1559732..7fe0f5c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -225,7 +225,7 @@ public class SchemaTranslation { // but if it does happen, we expect none to be specified - in which case the should all be // zero. Preconditions.checkState(dinstictEncodingPositions == 1); - } else { + } else if (protoSchema.getEncodingPositionsSet()) { schema.setEncodingPositions(encodingLocationMap); } if (!protoSchema.getId().isEmpty()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index 93544f5..a19d9b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.values.RowUtils.FieldOverride; import org.apache.beam.sdk.values.RowUtils.FieldOverrides; import org.apache.beam.sdk.values.RowUtils.RowFieldMatcher; import org.apache.beam.sdk.values.RowUtils.RowPosition; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; @@ -579,7 +578,60 @@ public abstract class Row implements Serializable { @Override public String toString() { - return "Row:" + Arrays.deepToString(Iterables.toArray(getValues(), Object.class)); + return toString(true); + } + + /** Convert Row to String. */ + public String toString(boolean includeFieldNames) { + StringBuilder builder = new StringBuilder(); + builder.append("Row: "); + builder.append(System.lineSeparator()); + for (int i = 0; i < getSchema().getFieldCount(); ++i) { + Schema.Field field = getSchema().getField(i); + if (includeFieldNames) { + builder.append(field.getName() + ":"); + } + builder.append(toString(field.getType(), getValue(i), includeFieldNames)); + builder.append(System.lineSeparator()); + } + return builder.toString(); + } + + private String toString(Schema.FieldType fieldType, Object value, boolean includeFieldNames) { + StringBuilder builder = new StringBuilder(); + switch (fieldType.getTypeName()) { + case ARRAY: + case ITERABLE: + builder.append("["); + for (Object element : (Iterable<?>) value) { + builder.append( + toString(fieldType.getCollectionElementType(), element, includeFieldNames)); + builder.append(", "); + } + builder.append("]"); + break; + case MAP: + builder.append("{"); + for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) { + builder.append("("); + builder.append(toString(fieldType.getMapKeyType(), entry.getKey(), includeFieldNames)); + builder.append(", "); + builder.append( + toString(fieldType.getMapValueType(), entry.getValue(), includeFieldNames)); + builder.append("), "); + } + builder.append("}"); + break; + case BYTES: + builder.append(Arrays.toString((byte[]) value)); + break; + case ROW: + builder.append(((Row) value).toString(includeFieldNames)); + break; + default: + builder.append(value); + } + return builder.toString(); } /** diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java index 7b1b2af..d3e44db 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java @@ -32,6 +32,6 @@ public class BeamSqlOutputToConsoleFn extends DoFn<Row, Void> { @ProcessElement public void processElement(ProcessContext c) { - System.out.println("Output: " + c.element().getValues()); + System.out.println("Output: " + c.element().toString()); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index 3ab0ddd..6aa103b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -39,8 +39,8 @@ public class TestUtils { /** A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. */ public static class BeamSqlRow2StringDoFn extends DoFn<Row, String> { @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().toString()); + public void processElement(@Element Row row, OutputReceiver<String> o) { + o.output(row.toString(false)); } } @@ -48,7 +48,7 @@ public class TestUtils { public static List<String> beamSqlRows2Strings(List<Row> rows) { List<String> strs = new ArrayList<>(); for (Row row : rows) { - strs.add(row.toString()); + strs.add(row.toString(false)); } return strs;