Repository: flink
Updated Branches:
  refs/heads/master 65d08058a -> c658763d4


[FLINK-3304] Making the Avro Schema serializable.

This closes #1635


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c658763d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c658763d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c658763d

Branch: refs/heads/master
Commit: c658763d42d072073cc846f1814aa3cc5c42e05c
Parents: 65d0805
Author: Kostas Kloudas <kklou...@gmail.com>
Authored: Thu Feb 11 18:24:29 2016 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Tue Feb 16 14:47:43 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     | 34 +++++++++++++++++---
 .../flink/api/avro/AvroOutputFormatITCase.java  |  4 ++-
 2 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c658763d/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 9a3a025..e53874f 100644
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -27,15 +27,16 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
+import java.io.Serializable;
 
-public class AvroOutputFormat<E> extends FileOutputFormat<E> {
+public class AvroOutputFormat<E> extends FileOutputFormat<E> implements 
Serializable {
 
        private static final long serialVersionUID = 1L;
 
        private final Class<E> avroValueType;
 
-       private Schema userDefinedSchema = null;
-
+       private transient Schema userDefinedSchema = null;
+       
        private transient DataFileWriter<E> dataFileWriter;
 
        public AvroOutputFormat(Path filePath, Class<E> type) {
@@ -66,7 +67,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> {
                super.open(taskNumber, numTasks);
 
                DatumWriter<E> datumWriter;
-               Schema schema = null;
+               Schema schema;
                if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
 {
                        datumWriter = new SpecificDatumWriter<E>(avroValueType);
                        try {
@@ -88,6 +89,31 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> 
{
                }
        }
 
+       private void writeObject(java.io.ObjectOutputStream out) throws 
IOException {
+               out.defaultWriteObject();
+
+               if(userDefinedSchema != null) {
+                       byte[] json = userDefinedSchema.toString().getBytes();
+                       out.writeInt(json.length);
+                       out.write(json);
+               } else {
+                       out.writeInt(0);
+               }
+       }
+
+       private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
+               in.defaultReadObject();
+
+               int length = in.readInt();
+               if(length != 0) {
+                       byte[] json = new byte[length];
+                       in.read(json);
+
+                       Schema schema = new Schema.Parser().parse(new 
String(json));
+                       setSchema(schema);
+               }
+       }
+
        @Override
        public void close() throws IOException {
                dataFileWriter.flush();

http://git-wip-us.apache.org/repos/asf/flink/blob/c658763d/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
index d40fec5..adbe5dd 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -68,7 +68,9 @@ public class AvroOutputFormatITCase extends 
JavaProgramTestBase {
 
                //output the data with AvroOutputFormat for specific user type
                DataSet<User> specificUser = input.map(new ConvertToUser());
-               specificUser.write(new AvroOutputFormat<User>(User.class), 
outputPath1);
+               AvroOutputFormat<User> avroOutputFormat = new 
AvroOutputFormat<User>(User.class);
+               avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure 
the OF is properly serializing the schema
+               specificUser.write(avroOutputFormat, outputPath1);
 
                //output the data with AvroOutputFormat for reflect user type
                DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new 
ConvertToReflective());

Reply via email to