This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch release-2.17.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push: new 01925d2 [BEAM-8819] Fix AvroCoder serialisation by introducing AvroGenericCoder new 014c740 Merge pull request #10241 from mxm/release-2.17.0 01925d2 is described below commit 01925d238144b3c16212fd11c48dcca018eb7309 Author: Piotr Szczepanik <piotr.szczepa...@allegro.pl> AuthorDate: Thu Nov 28 14:34:49 2019 +0100 [BEAM-8819] Fix AvroCoder serialisation by introducing AvroGenericCoder Backport of #10218 --- ...gistrar.java => AvroGenericCoderRegistrar.java} | 12 +++++----- ...slator.java => AvroGenericCoderTranslator.java} | 14 +++++------ .../core/construction/CoderTranslationTest.java | 5 ++++ .../java/org/apache/beam/sdk/coders/AvroCoder.java | 7 +++--- .../apache/beam/sdk/coders/AvroGenericCoder.java | 28 +++++++--------------- sdks/python/apache_beam/coders/avro_record.py | 2 +- sdks/python/apache_beam/coders/coders.py | 12 +++++----- sdks/python/apache_beam/coders/coders_test.py | 2 +- .../apache_beam/coders/coders_test_common.py | 2 +- .../io/external/xlang_parquetio_test.py | 2 +- 10 files changed, 40 insertions(+), 46 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java similarity index 76% rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java index 565bdbf..0cfda94 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java @@ -19,22 +19,22 @@ package org.apache.beam.runners.core.construction; import com.google.auto.service.AutoService; import java.util.Map; -import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.AvroGenericCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -/** Coder registrar for AvroCoder. */ +/** Coder registrar for AvroGenericCoder. */ @AutoService(CoderTranslatorRegistrar.class) -public class AvroCoderRegistrar implements CoderTranslatorRegistrar { - public static final String AVRO_CODER_URN = "beam:coder:avro:v1"; +public class AvroGenericCoderRegistrar implements CoderTranslatorRegistrar { + public static final String AVRO_CODER_URN = "beam:coder:avro:generic:v1"; @Override public Map<Class<? extends Coder>, String> getCoderURNs() { - return ImmutableMap.of(AvroCoder.class, AVRO_CODER_URN); + return ImmutableMap.of(AvroGenericCoder.class, AVRO_CODER_URN); } @Override public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators() { - return ImmutableMap.of(AvroCoder.class, new AvroCoderTranslator()); + return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator()); } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java similarity index 74% copy from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java copy to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java index 93fca3d..564a8d3 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java @@ -20,25 +20,25 @@ package org.apache.beam.runners.core.construction; import java.util.Collections; import java.util.List; import org.apache.avro.Schema; -import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.AvroGenericCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; -/** Coder translator for AvroCoder. */ -public class AvroCoderTranslator implements CoderTranslator<AvroCoder<?>> { +/** Coder translator for AvroGenericCoder. */ +public class AvroGenericCoderTranslator implements CoderTranslator<AvroGenericCoder> { @Override - public List<? extends Coder<?>> getComponents(AvroCoder from) { + public List<? extends Coder<?>> getComponents(AvroGenericCoder from) { return Collections.emptyList(); } @Override - public byte[] getPayload(AvroCoder from) { + public byte[] getPayload(AvroGenericCoder from) { return from.getSchema().toString().getBytes(Charsets.UTF_8); } @Override - public AvroCoder fromComponents(List<Coder<?>> components, byte[] payload) { + public AvroGenericCoder fromComponents(List<Coder<?>> components, byte[] payload) { Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8)); - return AvroCoder.of(schema); + return AvroGenericCoder.of(schema); } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java index dc28b79..20b276a 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java @@ -132,6 +132,11 @@ public class CoderTranslationTest { KvCoder.of( new RecordCoder(), AvroCoder.of(SchemaBuilder.record("record").fields().endRecord()))) + .add( + StringUtf8Coder.of(), + SerializableCoder.of(Record.class), + new RecordCoder(), + KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))) .build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index b044165..27863ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -122,10 +122,11 @@ public class AvroCoder<T> extends CustomCoder<T> { } /** - * Returns an {@code AvroCoder} instance for the Avro schema. The implicit type is GenericRecord. + * Returns an {@code AvroGenericCoder} instance for the Avro schema. The implicit type is + * GenericRecord. */ - public static AvroCoder<GenericRecord> of(Schema schema) { - return new AvroCoder<>(GenericRecord.class, schema); + public static AvroGenericCoder of(Schema schema) { + return AvroGenericCoder.of(schema); } /** diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java similarity index 50% rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java index 93fca3d..be726cc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java @@ -15,30 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction; +package org.apache.beam.sdk.coders; -import java.util.Collections; -import java.util.List; import org.apache.avro.Schema; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; +import org.apache.avro.generic.GenericRecord; -/** Coder translator for AvroCoder. */ -public class AvroCoderTranslator implements CoderTranslator<AvroCoder<?>> { - @Override - public List<? extends Coder<?>> getComponents(AvroCoder from) { - return Collections.emptyList(); +/** AvroCoder specialisation for GenericRecord. */ +public class AvroGenericCoder extends AvroCoder<GenericRecord> { + AvroGenericCoder(Schema schema) { + super(GenericRecord.class, schema); } - @Override - public byte[] getPayload(AvroCoder from) { - return from.getSchema().toString().getBytes(Charsets.UTF_8); - } - - @Override - public AvroCoder fromComponents(List<Coder<?>> components, byte[] payload) { - Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8)); - return AvroCoder.of(schema); + public static AvroGenericCoder of(Schema schema) { + return new AvroGenericCoder(schema); } } diff --git a/sdks/python/apache_beam/coders/avro_record.py b/sdks/python/apache_beam/coders/avro_record.py index e65057b..a5b8b60 100644 --- a/sdks/python/apache_beam/coders/avro_record.py +++ b/sdks/python/apache_beam/coders/avro_record.py @@ -15,7 +15,7 @@ # limitations under the License. # -"""AvroRecord for AvroCoder.""" +"""AvroRecord for AvroGenericCoder.""" from __future__ import absolute_import diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 35020b6..449e959 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -60,7 +60,7 @@ except ImportError: __all__ = [ 'Coder', - 'AvroCoder', 'BooleanCoder', 'BytesCoder', 'DillCoder', + 'AvroGenericCoder', 'BooleanCoder', 'BytesCoder', 'DillCoder', 'FastPrimitivesCoder', 'FloatCoder', 'IterableCoder', 'PickleCoder', 'ProtoCoder', 'SingletonCoder', 'StrUtf8Coder', 'TimestampCoder', 'TupleCoder', 'TupleSequenceCoder', 'VarIntCoder', @@ -819,10 +819,10 @@ class DeterministicProtoCoder(ProtoCoder): return self -AVRO_CODER_URN = "beam:coder:avro:v1" +AVRO_GENERIC_CODER_URN = "beam:coder:avro:generic:v1" -class AvroCoder(FastCoder): +class AvroGenericCoder(FastCoder): """A coder used for AvroRecord values.""" def __init__(self, schema): @@ -846,11 +846,11 @@ class AvroCoder(FastCoder): return AvroRecord def to_runner_api_parameter(self, context): - return AVRO_CODER_URN, self.schema, () + return AVRO_GENERIC_CODER_URN, self.schema, () - @Coder.register_urn(AVRO_CODER_URN, bytes) + @Coder.register_urn(AVRO_GENERIC_CODER_URN, bytes) def from_runner_api_parameter(payload, unused_components, unused_context): - return AvroCoder(payload) + return AvroGenericCoder(payload) class TupleCoder(FastCoder): diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 74d1dce..9b39962 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -113,7 +113,7 @@ class DeterministicProtoCoderTest(unittest.TestCase): self.assertEqual(coder.encode(mm_forward), coder.encode(mm_reverse)) -class AvroTestCoder(coders.AvroCoder): +class AvroTestCoder(coders.AvroGenericCoder): SCHEMA = """ { "type": "record", "name": "testrecord", diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 1b40b64..d6cb27b 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -67,7 +67,7 @@ class CodersTest(unittest.TestCase): if isinstance(c, type) and issubclass(c, coders.Coder) and 'Base' not in c.__name__) standard -= set([coders.Coder, - coders.AvroCoder, + coders.AvroGenericCoder, coders.DeterministicProtoCoder, coders.FastCoder, coders.ProtoCoder, diff --git a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py index 0c948ca..434bb3b 100644 --- a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py +++ b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py @@ -69,7 +69,7 @@ class XlangParquetIOTest(unittest.TestCase): raise e -class AvroTestCoder(coders.AvroCoder): +class AvroTestCoder(coders.AvroGenericCoder): SCHEMA = """ { "type": "record", "name": "testrecord",