Repository: flink Updated Branches: refs/heads/master 65d08058a -> c658763d4
[FLINK-3304] Making the Avro Schema serializable. This closes #1635 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c658763d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c658763d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c658763d Branch: refs/heads/master Commit: c658763d42d072073cc846f1814aa3cc5c42e05c Parents: 65d0805 Author: Kostas Kloudas <kklou...@gmail.com> Authored: Thu Feb 11 18:24:29 2016 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Tue Feb 16 14:47:43 2016 +0100 ---------------------------------------------------------------------- .../flink/api/java/io/AvroOutputFormat.java | 34 +++++++++++++++++--- .../flink/api/avro/AvroOutputFormatITCase.java | 4 ++- 2 files changed, 33 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c658763d/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java index 9a3a025..e53874f 100644 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java +++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java @@ -27,15 +27,16 @@ import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.core.fs.Path; import java.io.IOException; +import java.io.Serializable; -public class AvroOutputFormat<E> extends FileOutputFormat<E> { +public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable { private static final long serialVersionUID = 1L; private final Class<E> avroValueType; - private Schema userDefinedSchema = null; - + private transient Schema userDefinedSchema = null; + private transient DataFileWriter<E> dataFileWriter; public AvroOutputFormat(Path filePath, Class<E> type) { @@ -66,7 +67,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> { super.open(taskNumber, numTasks); DatumWriter<E> datumWriter; - Schema schema = null; + Schema schema; if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { datumWriter = new SpecificDatumWriter<E>(avroValueType); try { @@ -88,6 +89,31 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> { } } + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + + if(userDefinedSchema != null) { + byte[] json = userDefinedSchema.toString().getBytes(); + out.writeInt(json.length); + out.write(json); + } else { + out.writeInt(0); + } + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + int length = in.readInt(); + if(length != 0) { + byte[] json = new byte[length]; + in.read(json); + + Schema schema = new Schema.Parser().parse(new String(json)); + setSchema(schema); + } + } + @Override public void close() throws IOException { dataFileWriter.flush(); http://git-wip-us.apache.org/repos/asf/flink/blob/c658763d/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java index d40fec5..adbe5dd 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java @@ -68,7 +68,9 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase { //output the data with AvroOutputFormat for specific user type DataSet<User> specificUser = input.map(new ConvertToUser()); - specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1); + AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class); + avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema + specificUser.write(avroOutputFormat, outputPath1); //output the data with AvroOutputFormat for reflect user type DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());