[FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + AvroRowDeserializationSchema.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78c8ea20 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78c8ea20 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78c8ea20 Branch: refs/heads/master Commit: 78c8ea2085439b0d4fe0d7187044b44b6f110aff Parents: 6e118d1 Author: Fabian Hueske <[email protected]> Authored: Thu Nov 2 21:40:26 2017 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Nov 3 00:01:40 2017 +0100 ---------------------------------------------------------------------- .../AvroRowDeserializationSchema.java | 30 +++++++++++++++++--- .../AvroRowSerializationSchema.java | 29 ++++++++++++++++--- .../kafka/AvroRowDeSerializationSchemaTest.java | 24 ++++++++++++++++ 3 files changed, 75 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java index 0713738..c7f1d82 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java @@ -32,6 +32,8 @@ import org.apache.avro.util.Utf8; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.List; /** @@ -44,24 +46,29 @@ import java.util.List; public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { /** + * Avro record class. + */ + private Class<? extends SpecificRecord> recordClazz; + + /** * Schema for deterministic field order. */ - private final Schema schema; + private transient Schema schema; /** * Reader that deserializes byte array into a record. */ - private final DatumReader<SpecificRecord> datumReader; + private transient DatumReader<SpecificRecord> datumReader; /** * Input stream to read message from. */ - private final MutableByteArrayInputStream inputStream; + private transient MutableByteArrayInputStream inputStream; /** * Avro decoder that decodes binary data. */ - private final Decoder decoder; + private transient Decoder decoder; /** * Record to deserialize byte array to. @@ -75,6 +82,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< */ public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; this.schema = SpecificData.get().getSchema(recordClazz); this.datumReader = new SpecificDatumReader<>(schema); this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); @@ -97,6 +105,20 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema< return (Row) row; } + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeObject(recordClazz); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new SpecificDatumReader<>(schema); + this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + /** * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. * Avro's {@link Utf8} fields are converted into regular Java strings. http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java index 450c78f..09acc6a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java @@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.List; /** @@ -42,24 +44,29 @@ import java.util.List; public class AvroRowSerializationSchema implements SerializationSchema<Row> { /** + * Avro record class. + */ + private Class<? extends SpecificRecord> recordClazz; + + /** * Avro serialization schema. */ - private final Schema schema; + private transient Schema schema; /** * Writer to serialize Avro record into a byte array. */ - private final DatumWriter<GenericRecord> datumWriter; + private transient DatumWriter<GenericRecord> datumWriter; /** * Output stream to serialize records into byte array. */ - private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); /** * Low-level class for serialization of Avro values. */ - private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); /** * Creates a Avro serialization schema for the given schema. @@ -68,6 +75,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { */ public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; this.schema = SpecificData.get().getSchema(recordClazz); this.datumWriter = new SpecificDatumWriter<>(schema); } @@ -89,6 +97,19 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> { } } + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeObject(recordClazz); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new SpecificDatumWriter<>(schema); + this.arrayOutputStream = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + } + /** * Converts a (nested) Flink Row into Avro's {@link GenericRecord}. * Strings are converted into Avro's {@link Utf8} fields. http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java index d5be274..28f2ed3 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils; import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema; import org.apache.flink.types.Row; +import org.apache.flink.util.InstantiationUtil; import org.apache.avro.specific.SpecificRecord; import org.junit.Test; @@ -121,4 +122,27 @@ public class AvroRowDeSerializationSchemaTest { assertEquals(testData.f2, actual); } + + @Test + public void testSerializability() throws IOException, ClassNotFoundException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0); + + byte[] serBytes = InstantiationUtil.serializeObject(serOrig); + byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig); + + AvroRowSerializationSchema serCopy = + InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader()); + AvroRowDeserializationSchema deserCopy = + InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader()); + + final byte[] bytes = serCopy.serialize(testData.f2); + deserCopy.deserialize(bytes); + deserCopy.deserialize(bytes); + final Row actual = deserCopy.deserialize(bytes); + + assertEquals(testData.f2, actual); + } }
