[FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + 
AvroRowDeserializationSchema.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78c8ea20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78c8ea20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78c8ea20

Branch: refs/heads/master
Commit: 78c8ea2085439b0d4fe0d7187044b44b6f110aff
Parents: 6e118d1
Author: Fabian Hueske <[email protected]>
Authored: Thu Nov 2 21:40:26 2017 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Fri Nov 3 00:01:40 2017 +0100

----------------------------------------------------------------------
 .../AvroRowDeserializationSchema.java           | 30 +++++++++++++++++---
 .../AvroRowSerializationSchema.java             | 29 ++++++++++++++++---
 .../kafka/AvroRowDeSerializationSchemaTest.java | 24 ++++++++++++++++
 3 files changed, 75 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index 0713738..c7f1d82 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -32,6 +32,8 @@ import org.apache.avro.util.Utf8;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.List;
 
 /**
@@ -44,24 +46,29 @@ import java.util.List;
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<Row> {
 
        /**
+        * Avro record class.
+        */
+       private Class<? extends SpecificRecord> recordClazz;
+
+       /**
         * Schema for deterministic field order.
         */
-       private final Schema schema;
+       private transient Schema schema;
 
        /**
         * Reader that deserializes byte array into a record.
         */
-       private final DatumReader<SpecificRecord> datumReader;
+       private transient DatumReader<SpecificRecord> datumReader;
 
        /**
         * Input stream to read message from.
         */
-       private final MutableByteArrayInputStream inputStream;
+       private transient MutableByteArrayInputStream inputStream;
 
        /**
         * Avro decoder that decodes binary data.
         */
-       private final Decoder decoder;
+       private transient Decoder decoder;
 
        /**
         * Record to deserialize byte array to.
@@ -75,6 +82,7 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
         */
        public AvroRowDeserializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
                Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.recordClazz = recordClazz;
                this.schema = SpecificData.get().getSchema(recordClazz);
                this.datumReader = new SpecificDatumReader<>(schema);
                this.record = (SpecificRecord) 
SpecificData.newInstance(recordClazz, schema);
@@ -97,6 +105,20 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
                return (Row) row;
        }
 
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.writeObject(recordClazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
+               this.recordClazz = (Class<? extends SpecificRecord>) 
ois.readObject();
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumReader = new SpecificDatumReader<>(schema);
+               this.record = (SpecificRecord) 
SpecificData.newInstance(recordClazz, schema);
+               this.inputStream = new MutableByteArrayInputStream();
+               this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+       }
+
        /**
         * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
         * Avro's {@link Utf8} fields are converted into regular Java strings.

http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 450c78f..09acc6a 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.List;
 
 /**
@@ -42,24 +44,29 @@ import java.util.List;
 public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 
        /**
+        * Avro record class.
+        */
+       private Class<? extends SpecificRecord> recordClazz;
+
+       /**
         * Avro serialization schema.
         */
-       private final Schema schema;
+       private transient Schema schema;
 
        /**
         * Writer to serialize Avro record into a byte array.
         */
-       private final DatumWriter<GenericRecord> datumWriter;
+       private transient DatumWriter<GenericRecord> datumWriter;
 
        /**
         * Output stream to serialize records into byte array.
         */
-       private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+       private transient ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
 
        /**
         * Low-level class for serialization of Avro values.
         */
-       private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+       private transient Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
 
        /**
         * Creates a Avro serialization schema for the given schema.
@@ -68,6 +75,7 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
         */
        public AvroRowSerializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
                Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.recordClazz = recordClazz;
                this.schema = SpecificData.get().getSchema(recordClazz);
                this.datumWriter = new SpecificDatumWriter<>(schema);
        }
@@ -89,6 +97,19 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
                }
        }
 
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.writeObject(recordClazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
+               this.recordClazz = (Class<? extends SpecificRecord>) 
ois.readObject();
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumWriter = new SpecificDatumWriter<>(schema);
+               this.arrayOutputStream = new ByteArrayOutputStream();
+               this.encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+       }
+
        /**
         * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
         * Strings are converted into Avro's {@link Utf8} fields.

http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
index d5be274..28f2ed3 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
 import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.junit.Test;
@@ -121,4 +122,27 @@ public class AvroRowDeSerializationSchemaTest {
 
                assertEquals(testData.f2, actual);
        }
+
+       @Test
+       public void testSerializability() throws IOException, 
ClassNotFoundException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serOrig = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserOrig = new 
AvroRowDeserializationSchema(testData.f0);
+
+               byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
+               byte[] deserBytes = 
InstantiationUtil.serializeObject(deserOrig);
+
+               AvroRowSerializationSchema serCopy =
+                       InstantiationUtil.deserializeObject(serBytes, 
Thread.currentThread().getContextClassLoader());
+               AvroRowDeserializationSchema deserCopy =
+                       InstantiationUtil.deserializeObject(deserBytes, 
Thread.currentThread().getContextClassLoader());
+
+               final byte[] bytes = serCopy.serialize(testData.f2);
+               deserCopy.deserialize(bytes);
+               deserCopy.deserialize(bytes);
+               final Row actual = deserCopy.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
 }

Reply via email to