Repository: flink Updated Branches: refs/heads/release-1.3 6714f4a39 -> 4c6b6c29d
[FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + AvroRowDeserializationSchema. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c6b6c29 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c6b6c29 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c6b6c29 Branch: refs/heads/release-1.3 Commit: 4c6b6c29d064a021db25feb1db62a122b913d06d Parents: 6714f4a Author: Fabian Hueske <[email protected]> Authored: Thu Nov 2 21:40:26 2017 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Nov 3 09:51:09 2017 +0100 ---------------------------------------------------------------------- .../AvroRowDeserializationSchema.java | 42 +++++++++++++++----- .../AvroRowSerializationSchema.java | 41 ++++++++++++++----- .../kafka/AvroRowDeSerializationSchemaTest.java | 34 ++++++++++++++-- 3 files changed, 96 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java index 37241f5..0cfca5e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java @@ -16,9 +16,9 @@ */ package org.apache.flink.streaming.util.serialization; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.List; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -28,8 +28,12 @@ import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; /** * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. @@ -41,24 +45,29 @@ import org.apache.flink.util.Preconditions; public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { /** + * Avro record class. + */ + private Class<? extends SpecificRecord> recordClazz; + + /** * Schema for deterministic field order. */ - private final Schema schema; + private transient Schema schema; /** * Reader that deserializes byte array into a record. */ - private final DatumReader<SpecificRecord> datumReader; + private transient DatumReader<SpecificRecord> datumReader; /** * Input stream to read message from. */ - private final MutableByteArrayInputStream inputStream; + private transient MutableByteArrayInputStream inputStream; /** * Avro decoder that decodes binary data */ - private final Decoder decoder; + private transient Decoder decoder; /** * Record to deserialize byte array to. @@ -72,6 +81,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< */ public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; this.schema = SpecificData.get().getSchema(recordClazz); this.datumReader = new SpecificDatumReader<>(schema); this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); @@ -94,6 +104,20 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< return (Row) row; } + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeObject(recordClazz); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new SpecificDatumReader<>(schema); + this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + /** * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. * Avro's {@link Utf8} fields are converted into regular Java strings. http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java index 8388ab5..1cda529 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java @@ -17,9 +17,9 @@ */ package org.apache.flink.streaming.util.serialization; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -30,8 +30,12 @@ import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; /** * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. @@ -39,24 +43,29 @@ import org.apache.flink.util.Preconditions; public class AvroRowSerializationSchema implements SerializationSchema<Row> { /** + * Avro record class. + */ + private Class<? extends SpecificRecord> recordClazz; + + /** * Avro serialization schema. */ - private final Schema schema; + private transient Schema schema; /** * Writer to serialize Avro record into a byte array. */ - private final DatumWriter<GenericRecord> datumWriter; + private transient DatumWriter<GenericRecord> datumWriter; /** * Output stream to serialize records into byte array. */ - private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); /** * Low-level class for serialization of Avro values. */ - private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); /** * Creates a Avro serialization schema for the given schema. @@ -65,6 +74,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { */ public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; this.schema = SpecificData.get().getSchema(recordClazz); this.datumWriter = new SpecificDatumWriter<>(schema); } @@ -86,6 +96,19 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { } } + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeObject(recordClazz); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new SpecificDatumWriter<>(schema); + this.arrayOutputStream = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + } + /** * Converts a (nested) Flink Row into Avro's {@link GenericRecord}. * Strings are converted into Avro's {@link Utf8} fields. http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java index e13968e..b844e43 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java @@ -17,16 +17,21 @@ */ package org.apache.flink.streaming.connectors.kafka; -import java.io.IOException; -import org.apache.avro.specific.SpecificRecord; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils; import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema; import org.apache.flink.types.Row; -import static org.junit.Assert.assertEquals; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.avro.specific.SpecificRecord; + +import java.io.IOException; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + /** * Test for the Avro serialization and deserialization schema. */ @@ -117,4 +122,27 @@ public class AvroRowDeSerializationSchemaTest { assertEquals(testData.f2, actual); } + + @Test + public void testSerializability() throws IOException, ClassNotFoundException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0); + + byte[] serBytes = InstantiationUtil.serializeObject(serOrig); + byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig); + + AvroRowSerializationSchema serCopy = + InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader()); + AvroRowDeserializationSchema deserCopy = + InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader()); + + final byte[] bytes = serCopy.serialize(testData.f2); + deserCopy.deserialize(bytes); + deserCopy.deserialize(bytes); + final Row actual = deserCopy.deserialize(bytes); + + assertEquals(testData.f2, actual); + } }
