Repository: flink
Updated Branches:
  refs/heads/release-1.3 6714f4a39 -> 4c6b6c29d


[FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + 
AvroRowDeserializationSchema.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c6b6c29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c6b6c29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c6b6c29

Branch: refs/heads/release-1.3
Commit: 4c6b6c29d064a021db25feb1db62a122b913d06d
Parents: 6714f4a
Author: Fabian Hueske <[email protected]>
Authored: Thu Nov 2 21:40:26 2017 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Fri Nov 3 09:51:09 2017 +0100

----------------------------------------------------------------------
 .../AvroRowDeserializationSchema.java           | 42 +++++++++++++++-----
 .../AvroRowSerializationSchema.java             | 41 ++++++++++++++-----
 .../kafka/AvroRowDeSerializationSchemaTest.java | 34 ++++++++++++++--
 3 files changed, 96 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index 37241f5..0cfca5e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -16,9 +16,9 @@
  */
 package org.apache.flink.streaming.util.serialization;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
@@ -28,8 +28,12 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
 
 /**
  * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
@@ -41,24 +45,29 @@ import org.apache.flink.util.Preconditions;
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<Row> {
 
        /**
+        * Avro record class.
+        */
+       private Class<? extends SpecificRecord> recordClazz;
+
+       /**
         * Schema for deterministic field order.
         */
-       private final Schema schema;
+       private transient Schema schema;
 
        /**
         * Reader that deserializes byte array into a record.
         */
-       private final DatumReader<SpecificRecord> datumReader;
+       private transient DatumReader<SpecificRecord> datumReader;
 
        /**
         * Input stream to read message from.
         */
-       private final MutableByteArrayInputStream inputStream;
+       private transient MutableByteArrayInputStream inputStream;
 
        /**
         * Avro decoder that decodes binary data
         */
-       private final Decoder decoder;
+       private transient Decoder decoder;
 
        /**
         * Record to deserialize byte array to.
@@ -72,6 +81,7 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
         */
        public AvroRowDeserializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
                Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.recordClazz = recordClazz;
                this.schema = SpecificData.get().getSchema(recordClazz);
                this.datumReader = new SpecificDatumReader<>(schema);
                this.record = (SpecificRecord) 
SpecificData.newInstance(recordClazz, schema);
@@ -94,6 +104,20 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
                return (Row) row;
        }
 
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.writeObject(recordClazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
+               this.recordClazz = (Class<? extends SpecificRecord>) 
ois.readObject();
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumReader = new SpecificDatumReader<>(schema);
+               this.record = (SpecificRecord) 
SpecificData.newInstance(recordClazz, schema);
+               this.inputStream = new MutableByteArrayInputStream();
+               this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+       }
+
        /**
         * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
         * Avro's {@link Utf8} fields are converted into regular Java strings.

http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 8388ab5..1cda529 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -17,9 +17,9 @@
  */
 package org.apache.flink.streaming.util.serialization;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -30,8 +30,12 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
 
 /**
  * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
@@ -39,24 +43,29 @@ import org.apache.flink.util.Preconditions;
 public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 
        /**
+        * Avro record class.
+        */
+       private Class<? extends SpecificRecord> recordClazz;
+
+       /**
         * Avro serialization schema.
         */
-       private final Schema schema;
+       private transient Schema schema;
 
        /**
         * Writer to serialize Avro record into a byte array.
         */
-       private final DatumWriter<GenericRecord> datumWriter;
+       private transient DatumWriter<GenericRecord> datumWriter;
 
        /**
         * Output stream to serialize records into byte array.
         */
-       private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+       private transient ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
 
        /**
         * Low-level class for serialization of Avro values.
         */
-       private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+       private transient Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
 
        /**
         * Creates a Avro serialization schema for the given schema.
@@ -65,6 +74,7 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
         */
        public AvroRowSerializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
                Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.recordClazz = recordClazz;
                this.schema = SpecificData.get().getSchema(recordClazz);
                this.datumWriter = new SpecificDatumWriter<>(schema);
        }
@@ -86,6 +96,19 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
                }
        }
 
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.writeObject(recordClazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
+               this.recordClazz = (Class<? extends SpecificRecord>) 
ois.readObject();
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumWriter = new SpecificDatumWriter<>(schema);
+               this.arrayOutputStream = new ByteArrayOutputStream();
+               this.encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+       }
+
        /**
         * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
         * Strings are converted into Avro's {@link Utf8} fields.

http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
index e13968e..b844e43 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -17,16 +17,21 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.io.IOException;
-import org.apache.avro.specific.SpecificRecord;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
 import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
 import org.apache.flink.types.Row;
-import static org.junit.Assert.assertEquals;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.avro.specific.SpecificRecord;
+
+import java.io.IOException;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Test for the Avro serialization and deserialization schema.
  */
@@ -117,4 +122,27 @@ public class AvroRowDeSerializationSchemaTest {
 
                assertEquals(testData.f2, actual);
        }
+
+       @Test
+       public void testSerializability() throws IOException, 
ClassNotFoundException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serOrig = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserOrig = new 
AvroRowDeserializationSchema(testData.f0);
+
+               byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
+               byte[] deserBytes = 
InstantiationUtil.serializeObject(deserOrig);
+
+               AvroRowSerializationSchema serCopy =
+                       InstantiationUtil.deserializeObject(serBytes, 
Thread.currentThread().getContextClassLoader());
+               AvroRowDeserializationSchema deserCopy =
+                       InstantiationUtil.deserializeObject(deserBytes, 
Thread.currentThread().getContextClassLoader());
+
+               final byte[] bytes = serCopy.serialize(testData.f2);
+               deserCopy.deserialize(bytes);
+               deserCopy.deserialize(bytes);
+               final Row actual = deserCopy.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
 }

Reply via email to