http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
new file mode 100644
index 0000000..caa6e0d
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.formats.avro.AvroOutputFormat.Codec;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * IT cases for the {@link AvroOutputFormat}.
+ */
+@SuppressWarnings("serial")
+public class AvroOutputFormatITCase extends JavaProgramTestBase {
+
+       public static String outputPath1;
+
+       public static String outputPath2;
+
+       public static String inputPath;
+
+       public static String userData = "alice|1|blue\n" +
+               "bob|2|red\n" +
+               "john|3|yellow\n" +
+               "walt|4|black\n";
+
+       @Override
+       protected void preSubmit() throws Exception {
+               inputPath = createTempFile("user", userData);
+               outputPath1 = getTempDirPath("avro_output1");
+               outputPath2 = getTempDirPath("avro_output2");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<String, Integer, String>> input = 
env.readCsvFile(inputPath)
+                       .fieldDelimiter("|")
+                       .types(String.class, Integer.class, String.class);
+
+               //output the data with AvroOutputFormat for specific user type
+               DataSet<User> specificUser = input.map(new ConvertToUser());
+               AvroOutputFormat<User> avroOutputFormat = new 
AvroOutputFormat<User>(User.class);
+               avroOutputFormat.setCodec(Codec.SNAPPY); // FLINK-4771: use a 
codec
+               avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure 
the OF is properly serializing the schema
+               specificUser.write(avroOutputFormat, outputPath1);
+
+               //output the data with AvroOutputFormat for reflect user type
+               DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new 
ConvertToReflective());
+               reflectiveUser.write(new 
AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+               env.execute();
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               //compare result for specific user type
+               File [] output1;
+               File file1 = asFile(outputPath1);
+               if (file1.isDirectory()) {
+                       output1 = file1.listFiles();
+                       // check for avro ext in dir.
+                       for (File avroOutput : output1) {
+                               Assert.assertTrue("Expect extension '.avro'", 
avroOutput.toString().endsWith(".avro"));
+                       }
+               } else {
+                       output1 = new File[] {file1};
+               }
+               List<String> result1 = new ArrayList<String>();
+               DatumReader<User> userDatumReader1 = new 
SpecificDatumReader<User>(User.class);
+               for (File avroOutput : output1) {
+
+                       DataFileReader<User> dataFileReader1 = new 
DataFileReader<User>(avroOutput, userDatumReader1);
+                       while (dataFileReader1.hasNext()) {
+                               User user = dataFileReader1.next();
+                               result1.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+                       }
+               }
+               for (String expectedResult : userData.split("\n")) {
+                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result1.contains(expectedResult));
+               }
+
+               //compare result for reflect user type
+               File [] output2;
+               File file2 = asFile(outputPath2);
+               if (file2.isDirectory()) {
+                       output2 = file2.listFiles();
+               } else {
+                       output2 = new File[] {file2};
+               }
+               List<String> result2 = new ArrayList<String>();
+               DatumReader<ReflectiveUser> userDatumReader2 = new 
ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+               for (File avroOutput : output2) {
+                       DataFileReader<ReflectiveUser> dataFileReader2 = new 
DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+                       while (dataFileReader2.hasNext()) {
+                               ReflectiveUser user = dataFileReader2.next();
+                               result2.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+                       }
+               }
+               for (String expectedResult : userData.split("\n")) {
+                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result2.contains(expectedResult));
+               }
+
+       }
+
+       private static final class ConvertToUser extends 
RichMapFunction<Tuple3<String, Integer, String>, User> {
+
+               @Override
+               public User map(Tuple3<String, Integer, String> value) throws 
Exception {
+                       User user = new User();
+                       user.setName(value.f0);
+                       user.setFavoriteNumber(value.f1);
+                       user.setFavoriteColor(value.f2);
+                       user.setTypeBoolTest(true);
+                       user.setTypeArrayString(Collections.emptyList());
+                       user.setTypeArrayBoolean(Collections.emptyList());
+                       user.setTypeEnum(Colors.BLUE);
+                       user.setTypeMap(Collections.emptyMap());
+                       return user;
+               }
+       }
+
+       private static final class ConvertToReflective extends 
RichMapFunction<User, ReflectiveUser> {
+
+               @Override
+               public ReflectiveUser map(User value) throws Exception {
+                       return new ReflectiveUser(value.getName().toString(), 
value.getFavoriteNumber(), value.getFavoriteColor().toString());
+               }
+       }
+
+       private static class ReflectiveUser {
+               private String name;
+               private int favoriteNumber;
+               private String favoriteColor;
+
+               public ReflectiveUser() {}
+
+               public ReflectiveUser(String name, int favoriteNumber, String 
favoriteColor) {
+                       this.name = name;
+                       this.favoriteNumber = favoriteNumber;
+                       this.favoriteColor = favoriteColor;
+               }
+
+               public String getName() {
+                       return this.name;
+               }
+
+               public String getFavoriteColor() {
+                       return this.favoriteColor;
+               }
+
+               public int getFavoriteNumber() {
+                       return this.favoriteNumber;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
new file mode 100644
index 0000000..b5ad564
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.User;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link AvroOutputFormat}.
+ */
+public class AvroOutputFormatTest {
+
+       @Test
+       public void testSetCodec() throws Exception {
+               // given
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+
+               // when
+               try {
+                       outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);
+               } catch (Exception ex) {
+                       // then
+                       fail("unexpected exception");
+               }
+       }
+
+       @Test
+       public void testSetCodecError() throws Exception {
+               // given
+               boolean error = false;
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+
+               // when
+               try {
+                       outputFormat.setCodec(null);
+               } catch (Exception ex) {
+                       error = true;
+               }
+
+               // then
+               assertTrue(error);
+       }
+
+       @Test
+       public void testSerialization() throws Exception {
+
+               serializeAndDeserialize(null, null);
+               serializeAndDeserialize(null, User.SCHEMA$);
+               for (final AvroOutputFormat.Codec codec : 
AvroOutputFormat.Codec.values()) {
+                       serializeAndDeserialize(codec, null);
+                       serializeAndDeserialize(codec, User.SCHEMA$);
+               }
+       }
+
+       private void serializeAndDeserialize(final AvroOutputFormat.Codec 
codec, final Schema schema) throws IOException, ClassNotFoundException {
+               // given
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+               if (codec != null) {
+                       outputFormat.setCodec(codec);
+               }
+               if (schema != null) {
+                       outputFormat.setSchema(schema);
+               }
+
+               final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+               // when
+               try (final ObjectOutputStream oos = new 
ObjectOutputStream(bos)) {
+                       oos.writeObject(outputFormat);
+               }
+               try (final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
+                       // then
+                       Object o = ois.readObject();
+                       assertTrue(o instanceof AvroOutputFormat);
+                       final AvroOutputFormat<User> restored = 
(AvroOutputFormat<User>) o;
+                       final AvroOutputFormat.Codec restoredCodec = 
(AvroOutputFormat.Codec) Whitebox.getInternalState(restored, "codec");
+                       final Schema restoredSchema = (Schema) 
Whitebox.getInternalState(restored, "userDefinedSchema");
+
+                       assertTrue(codec != null ? restoredCodec == codec : 
restoredCodec == null);
+                       assertTrue(schema != null ? 
restoredSchema.equals(schema) : restoredSchema == null);
+               }
+       }
+
+       @Test
+       public void testCompression() throws Exception {
+               // given
+               final Path outputPath = new 
Path(File.createTempFile("avro-output-file", "avro").getAbsolutePath());
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(outputPath, User.class);
+               outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+               final Path compressedOutputPath = new 
Path(File.createTempFile("avro-output-file", 
"compressed.avro").getAbsolutePath());
+               final AvroOutputFormat<User> compressedOutputFormat = new 
AvroOutputFormat<>(compressedOutputPath, User.class);
+               
compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+               compressedOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);
+
+               // when
+               output(outputFormat);
+               output(compressedOutputFormat);
+
+               // then
+               assertTrue(fileSize(outputPath) > 
fileSize(compressedOutputPath));
+
+               // cleanup
+               FileSystem fs = FileSystem.getLocalFileSystem();
+               fs.delete(outputPath, false);
+               fs.delete(compressedOutputPath, false);
+       }
+
+       private long fileSize(Path path) throws IOException {
+               return path.getFileSystem().getFileStatus(path).getLen();
+       }
+
+       private void output(final AvroOutputFormat<User> outputFormat) throws 
IOException {
+               outputFormat.configure(new Configuration());
+               outputFormat.open(1, 1);
+               for (int i = 0; i < 100; i++) {
+                       User user = new User();
+                       user.setName("testUser");
+                       user.setFavoriteNumber(1);
+                       user.setFavoriteColor("blue");
+                       user.setTypeBoolTest(true);
+                       user.setTypeArrayString(Collections.emptyList());
+                       user.setTypeArrayBoolean(Collections.emptyList());
+                       user.setTypeEnum(Colors.BLUE);
+                       user.setTypeMap(Collections.emptyMap());
+                       outputFormat.writeRecord(user);
+               }
+               outputFormat.close();
+       }
+
+       @Test
+       public void testGenericRecord() throws IOException {
+               final Path outputPath = new 
Path(File.createTempFile("avro-output-file", "generic.avro").getAbsolutePath());
+               final AvroOutputFormat<GenericRecord> outputFormat = new 
AvroOutputFormat<>(outputPath, GenericRecord.class);
+               Schema schema = new 
Schema.Parser().parse("{\"type\":\"record\", \"name\":\"user\", \"fields\": 
[{\"name\":\"user_name\", \"type\":\"string\"}, {\"name\":\"favorite_number\", 
\"type\":\"int\"}, {\"name\":\"favorite_color\", \"type\":\"string\"}]}");
+               outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+               outputFormat.setSchema(schema);
+               output(outputFormat, schema);
+
+               GenericDatumReader<GenericRecord> reader = new 
GenericDatumReader<>(schema);
+               DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<>(new File(outputPath.getPath()), reader);
+
+               while (dataFileReader.hasNext()) {
+                       GenericRecord record = dataFileReader.next();
+                       assertEquals(record.get("user_name").toString(), 
"testUser");
+                       assertEquals(record.get("favorite_number"), 1);
+                       assertEquals(record.get("favorite_color").toString(), 
"blue");
+               }
+
+               //cleanup
+               FileSystem fs = FileSystem.getLocalFileSystem();
+               fs.delete(outputPath, false);
+
+       }
+
+       private void output(final AvroOutputFormat<GenericRecord> outputFormat, 
Schema schema) throws IOException {
+               outputFormat.configure(new Configuration());
+               outputFormat.open(1, 1);
+               for (int i = 0; i < 100; i++) {
+                       GenericRecord record = new GenericData.Record(schema);
+                       record.put("user_name", "testUser");
+                       record.put("favorite_number", 1);
+                       record.put("favorite_color", "blue");
+                       outputFormat.writeRecord(record);
+               }
+               outputFormat.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..92d2c31
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+
+       public File testFile;
+
+       static final String TEST_NAME = "Alyssa";
+
+       static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+       static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+       static final boolean TEST_ARRAY_BOOLEAN_1 = true;
+       static final boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+       static final Colors TEST_ENUM_COLOR = Colors.GREEN;
+
+       static final String TEST_MAP_KEY1 = "KEY 1";
+       static final long TEST_MAP_VALUE1 = 8546456L;
+       static final String TEST_MAP_KEY2 = "KEY 2";
+       static final long TEST_MAP_VALUE2 = 17554L;
+
+       static final int TEST_NUM = 239;
+       static final String TEST_STREET = "Baker Street";
+       static final String TEST_CITY = "London";
+       static final String TEST_STATE = "London";
+       static final String TEST_ZIP = "NW1 6XE";
+
+       private Schema userSchema = new User().getSchema();
+
+       public static void writeTestFile(File testFile) throws IOException {
+               ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
+               stringArray.add(TEST_ARRAY_STRING_1);
+               stringArray.add(TEST_ARRAY_STRING_2);
+
+               ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+               booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+               booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+
+               HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
+               longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+               longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+
+               Address addr = new Address();
+               addr.setNum(TEST_NUM);
+               addr.setStreet(TEST_STREET);
+               addr.setCity(TEST_CITY);
+               addr.setState(TEST_STATE);
+               addr.setZip(TEST_ZIP);
+
+               User user1 = new User();
+
+               user1.setName(TEST_NAME);
+               user1.setFavoriteNumber(256);
+               user1.setTypeDoubleTest(123.45d);
+               user1.setTypeBoolTest(true);
+               user1.setTypeArrayString(stringArray);
+               user1.setTypeArrayBoolean(booleanArray);
+               user1.setTypeEnum(TEST_ENUM_COLOR);
+               user1.setTypeMap(longMap);
+               user1.setTypeNested(addr);
+
+               // Construct via builder
+               User user2 = User.newBuilder()
+                               .setName("Charlie")
+                               .setFavoriteColor("blue")
+                               .setFavoriteNumber(null)
+                               .setTypeBoolTest(false)
+                               .setTypeDoubleTest(1.337d)
+                               .setTypeNullTest(null)
+                               .setTypeLongTest(1337L)
+                               .setTypeArrayString(new 
ArrayList<CharSequence>())
+                               .setTypeArrayBoolean(new ArrayList<Boolean>())
+                               .setTypeNullableArray(null)
+                               .setTypeEnum(Colors.RED)
+                               .setTypeMap(new HashMap<CharSequence, Long>())
+                               .setTypeFixed(null)
+                               .setTypeUnion(null)
+                               .setTypeNested(
+                                               
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
+                                                               
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
+                                                               .build())
+                               .build();
+               DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
+               DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
+               dataFileWriter.create(user1.getSchema(), testFile);
+               dataFileWriter.append(user1);
+               dataFileWriter.append(user2);
+               dataFileWriter.close();
+       }
+
+       @Before
+       public void createFiles() throws IOException {
+               testFile = File.createTempFile("AvroInputFormatTest", null);
+               writeTestFile(testFile);
+       }
+
+       /**
+        * Test if the AvroInputFormat is able to properly read data from an 
avro file.
+        * @throws IOException
+        */
+       @Test
+       public void testDeserialisation() throws IOException {
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+               format.configure(parameters);
+               FileInputSplit[] splits = format.createInputSplits(1);
+               assertEquals(splits.length, 1);
+               format.open(splits[0]);
+
+               User u = format.nextRecord(null);
+               assertNotNull(u);
+
+               String name = u.getName().toString();
+               assertNotNull("empty record", name);
+               assertEquals("name not equal", TEST_NAME, name);
+
+               // check arrays
+               List<CharSequence> sl = u.getTypeArrayString();
+               assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
+               assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+
+               List<Boolean> bl = u.getTypeArrayBoolean();
+               assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
+               assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
+
+               // check enums
+               Colors enumValue = u.getTypeEnum();
+               assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+
+               // check maps
+               Map<CharSequence, Long> lm = u.getTypeMap();
+               assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, 
lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+               assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, 
lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+               assertFalse("expecting second element", format.reachedEnd());
+               assertNotNull("expecting second element", format.nextRecord(u));
+
+               assertNull(format.nextRecord(u));
+               assertTrue(format.reachedEnd());
+
+               format.close();
+       }
+
+       /**
+        * Test if the AvroInputFormat is able to properly read data from an 
avro file.
+        * @throws IOException
+        */
+       @Test
+       public void testDeserialisationReuseAvroRecordFalse() throws 
IOException {
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.setReuseAvroValue(false);
+
+               format.configure(parameters);
+               FileInputSplit[] splits = format.createInputSplits(1);
+               assertEquals(splits.length, 1);
+               format.open(splits[0]);
+
+               User u = format.nextRecord(null);
+               assertNotNull(u);
+
+               String name = u.getName().toString();
+               assertNotNull("empty record", name);
+               assertEquals("name not equal", TEST_NAME, name);
+
+               // check arrays
+               List<CharSequence> sl = u.getTypeArrayString();
+               assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
+               assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+
+               List<Boolean> bl = u.getTypeArrayBoolean();
+               assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
+               assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
+
+               // check enums
+               Colors enumValue = u.getTypeEnum();
+               assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+
+               // check maps
+               Map<CharSequence, Long> lm = u.getTypeMap();
+               assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, 
lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+               assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, 
lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+               assertFalse("expecting second element", format.reachedEnd());
+               assertNotNull("expecting second element", format.nextRecord(u));
+
+               assertNull(format.nextRecord(u));
+               assertTrue(format.reachedEnd());
+
+               format.close();
+       }
+
+       /**
+        * Test if the Flink serialization is able to properly process 
GenericData.Record types.
+        * Usually users of Avro generate classes (POJOs) from Avro schemas.
+        * However, if generated classes are not available, one can also use 
GenericData.Record.
+        * It is an untyped key-value record which is using a schema to 
validate the correctness of the data.
+        *
+        * <p>It is not recommended to use GenericData.Record with Flink. Use 
generated POJOs instead.
+        */
+       @Test
+       public void testDeserializeToGenericType() throws IOException {
+               DatumReader<GenericData.Record> datumReader = new 
GenericDatumReader<>(userSchema);
+
+               try (FileReader<GenericData.Record> dataFileReader = 
DataFileReader.openReader(testFile, datumReader)) {
+                       // initialize Record by reading it from disk (that's 
easier than creating it by hand)
+                       GenericData.Record rec = new 
GenericData.Record(userSchema);
+                       dataFileReader.next(rec);
+
+                       // check if record has been read correctly
+                       assertNotNull(rec);
+                       assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+                       assertEquals(null, rec.get("type_long_test")); // it is 
null for the first record.
+
+                       // now serialize it with our framework:
+                       TypeInformation<GenericData.Record> te = 
TypeExtractor.createTypeInfo(GenericData.Record.class);
+
+                       ExecutionConfig ec = new ExecutionConfig();
+                       assertEquals(GenericTypeInfo.class, te.getClass());
+
+                       Serializers.recursivelyRegisterType(te.getTypeClass(), 
ec, new HashSet<Class<?>>());
+
+                       TypeSerializer<GenericData.Record> tser = 
te.createSerializer(ec);
+                       assertEquals(1, 
ec.getDefaultKryoSerializerClasses().size());
+                       assertTrue(
+                                       
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
+                                                       
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));
+
+                       ByteArrayOutputStream out = new ByteArrayOutputStream();
+                       try (DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out)) {
+                               tser.serialize(rec, outView);
+                       }
+
+                       GenericData.Record newRec;
+                       try (DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(
+                                       new 
ByteArrayInputStream(out.toByteArray()))) {
+                               newRec = tser.deserialize(inView);
+                       }
+
+                       // check if it is still the same
+                       assertNotNull(newRec);
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
+                       assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString());
+                       assertEquals(null, newRec.get("type_long_test"));
+               }
+       }
+
+       /**
+        * This test validates proper serialization with specific (generated 
POJO) types.
+        */
+       @Test
+       public void testDeserializeToSpecificType() throws IOException {
+
+               DatumReader<User> datumReader = new 
SpecificDatumReader<User>(userSchema);
+
+               try (FileReader<User> dataFileReader = 
DataFileReader.openReader(testFile, datumReader)) {
+                       User rec = dataFileReader.next();
+
+                       // check if record has been read correctly
+                       assertNotNull(rec);
+                       assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+
+                       // now serialize it with our framework:
+                       ExecutionConfig ec = new ExecutionConfig();
+                       TypeInformation<User> te = 
TypeExtractor.createTypeInfo(User.class);
+
+                       assertEquals(AvroTypeInfo.class, te.getClass());
+                       TypeSerializer<User> tser = te.createSerializer(ec);
+
+                       ByteArrayOutputStream out = new ByteArrayOutputStream();
+                       try (DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out)) {
+                               tser.serialize(rec, outView);
+                       }
+
+                       User newRec;
+                       try (DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(
+                                       new 
ByteArrayInputStream(out.toByteArray()))) {
+                               newRec = tser.deserialize(inView);
+                       }
+
+                       // check if it is still the same
+                       assertNotNull(newRec);
+                       assertEquals("name not equal", TEST_NAME, 
newRec.getName().toString());
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
+               }
+       }
+
+       /**
+        * Test if the AvroInputFormat is able to properly read data from an 
Avro
+        * file as a GenericRecord.
+        *
+        * @throws IOException
+        */
+       @Test
+       public void testDeserialisationGenericRecord() throws IOException {
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<GenericRecord> format = new 
AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+                               GenericRecord.class);
+
+               doTestDeserializationGenericRecord(format, parameters);
+       }
+
+       /**
+        * Helper method to test GenericRecord serialisation.
+        *
+        * @param format
+        *            the format to test
+        * @param parameters
+        *            the configuration to use
+        * @throws IOException
+        *             thrown id there is a issue
+        */
+       @SuppressWarnings("unchecked")
+       private void doTestDeserializationGenericRecord(final 
AvroInputFormat<GenericRecord> format,
+                       final Configuration parameters) throws IOException {
+               try {
+                       format.configure(parameters);
+                       FileInputSplit[] splits = format.createInputSplits(1);
+                       assertEquals(splits.length, 1);
+                       format.open(splits[0]);
+
+                       GenericRecord u = format.nextRecord(null);
+                       assertNotNull(u);
+                       assertEquals("The schemas should be equal", userSchema, 
u.getSchema());
+
+                       String name = u.get("name").toString();
+                       assertNotNull("empty record", name);
+                       assertEquals("name not equal", TEST_NAME, name);
+
+                       // check arrays
+                       List<CharSequence> sl = (List<CharSequence>) 
u.get("type_array_string");
+                       assertEquals("element 0 not equal", 
TEST_ARRAY_STRING_1, sl.get(0).toString());
+                       assertEquals("element 1 not equal", 
TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+                       List<Boolean> bl = (List<Boolean>) 
u.get("type_array_boolean");
+                       assertEquals("element 0 not equal", 
TEST_ARRAY_BOOLEAN_1, bl.get(0));
+                       assertEquals("element 1 not equal", 
TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+                       // check enums
+                       GenericData.EnumSymbol enumValue = 
(GenericData.EnumSymbol) u.get("type_enum");
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), enumValue.toString());
+
+                       // check maps
+                       Map<CharSequence, Long> lm = (Map<CharSequence, Long>) 
u.get("type_map");
+                       assertEquals("map value of key 1 not equal", 
TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+                       assertEquals("map value of key 2 not equal", 
TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+                       assertFalse("expecting second element", 
format.reachedEnd());
+                       assertNotNull("expecting second element", 
format.nextRecord(u));
+
+                       assertNull(format.nextRecord(u));
+                       assertTrue(format.reachedEnd());
+               } finally {
+                       format.close();
+               }
+       }
+
+       /**
+        * Test if the AvroInputFormat is able to properly read data from an 
avro
+        * file as a GenericRecord.
+        *
+        * @throws IOException if there is an error
+        */
+       @Test
+       public void testDeserialisationGenericRecordReuseAvroValueFalse() 
throws IOException {
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<GenericRecord> format = new 
AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+                               GenericRecord.class);
+               format.configure(parameters);
+               format.setReuseAvroValue(false);
+
+               doTestDeserializationGenericRecord(format, parameters);
+       }
+
+       @After
+       public void deleteFiles() {
+               testFile.delete();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
new file mode 100644
index 0000000..5341bcf
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.formats.avro.utils.AvroTestUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for the Avro serialization and deserialization schema.
+ */
+public class AvroRowDeSerializationSchemaTest {
+
+       @Test
+       public void testSerializeDeserializeSimpleRow() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testSerializeSimpleRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               serializationSchema.serialize(testData.f2);
+               serializationSchema.serialize(testData.f2);
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testDeserializeRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               deserializationSchema.deserialize(bytes);
+               deserializationSchema.deserialize(bytes);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testSerializeDeserializeComplexRow() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testSerializeComplexRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               serializationSchema.serialize(testData.f2);
+               serializationSchema.serialize(testData.f2);
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testDeserializeComplexRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               deserializationSchema.deserialize(bytes);
+               deserializationSchema.deserialize(bytes);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testSerializability() throws IOException, 
ClassNotFoundException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serOrig = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserOrig = new 
AvroRowDeserializationSchema(testData.f0);
+
+               byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
+               byte[] deserBytes = 
InstantiationUtil.serializeObject(deserOrig);
+
+               AvroRowSerializationSchema serCopy =
+                       InstantiationUtil.deserializeObject(serBytes, 
Thread.currentThread().getContextClassLoader());
+               AvroRowDeserializationSchema deserCopy =
+                       InstantiationUtil.deserializeObject(deserBytes, 
Thread.currentThread().getContextClassLoader());
+
+               final byte[] bytes = serCopy.serialize(testData.f2);
+               deserCopy.deserialize(bytes);
+               deserCopy.deserialize(bytes);
+               final Row actual = deserCopy.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
new file mode 100644
index 0000000..40a84f9
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.User;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroSplittableInputFormatTest {
+
+       private File testFile;
+
+       static final String TEST_NAME = "Alyssa";
+
+       static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+       static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+       static final boolean TEST_ARRAY_BOOLEAN_1 = true;
+       static final boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+       static final Colors TEST_ENUM_COLOR = Colors.GREEN;
+
+       static final String TEST_MAP_KEY1 = "KEY 1";
+       static final long TEST_MAP_VALUE1 = 8546456L;
+       static final String TEST_MAP_KEY2 = "KEY 2";
+       static final long TEST_MAP_VALUE2 = 17554L;
+
+       static final Integer TEST_NUM = new Integer(239);
+       static final String TEST_STREET = "Baker Street";
+       static final String TEST_CITY = "London";
+       static final String TEST_STATE = "London";
+       static final String TEST_ZIP = "NW1 6XE";
+
+       static final int NUM_RECORDS = 5000;
+
+       @Before
+       public void createFiles() throws IOException {
+               testFile = File.createTempFile("AvroSplittableInputFormatTest", 
null);
+
+               ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
+               stringArray.add(TEST_ARRAY_STRING_1);
+               stringArray.add(TEST_ARRAY_STRING_2);
+
+               ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+               booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+               booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+
+               HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
+               longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+               longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+
+               Address addr = new Address();
+               addr.setNum(new Integer(TEST_NUM));
+               addr.setStreet(TEST_STREET);
+               addr.setCity(TEST_CITY);
+               addr.setState(TEST_STATE);
+               addr.setZip(TEST_ZIP);
+
+               User user1 = new User();
+               user1.setName(TEST_NAME);
+               user1.setFavoriteNumber(256);
+               user1.setTypeDoubleTest(123.45d);
+               user1.setTypeBoolTest(true);
+               user1.setTypeArrayString(stringArray);
+               user1.setTypeArrayBoolean(booleanArray);
+               user1.setTypeEnum(TEST_ENUM_COLOR);
+               user1.setTypeMap(longMap);
+               user1.setTypeNested(addr);
+
+               // Construct via builder
+               User user2 = User.newBuilder()
+                               .setName(TEST_NAME)
+                               .setFavoriteColor("blue")
+                               .setFavoriteNumber(null)
+                               .setTypeBoolTest(false)
+                               .setTypeDoubleTest(1.337d)
+                               .setTypeNullTest(null)
+                               .setTypeLongTest(1337L)
+                               .setTypeArrayString(new 
ArrayList<CharSequence>())
+                               .setTypeArrayBoolean(new ArrayList<Boolean>())
+                               .setTypeNullableArray(null)
+                               .setTypeEnum(Colors.RED)
+                               .setTypeMap(new HashMap<CharSequence, Long>())
+                               .setTypeFixed(new Fixed16())
+                               .setTypeUnion(123L)
+                               .setTypeNested(
+                                               
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
+                                                               
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
+                                                               .build())
+                               .build();
+               DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
+               DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
+               dataFileWriter.create(user1.getSchema(), testFile);
+               dataFileWriter.append(user1);
+               dataFileWriter.append(user2);
+
+               Random rnd = new Random(1337);
+               for (int i = 0; i < NUM_RECORDS - 2; i++) {
+                       User user = new User();
+                       user.setName(TEST_NAME + rnd.nextInt());
+                       user.setFavoriteNumber(rnd.nextInt());
+                       user.setTypeDoubleTest(rnd.nextDouble());
+                       user.setTypeBoolTest(true);
+                       user.setTypeArrayString(stringArray);
+                       user.setTypeArrayBoolean(booleanArray);
+                       user.setTypeEnum(TEST_ENUM_COLOR);
+                       user.setTypeMap(longMap);
+                       Address address = new Address();
+                       address.setNum(new Integer(TEST_NUM));
+                       address.setStreet(TEST_STREET);
+                       address.setCity(TEST_CITY);
+                       address.setState(TEST_STATE);
+                       address.setZip(TEST_ZIP);
+                       user.setTypeNested(address);
+
+                       dataFileWriter.append(user);
+               }
+               dataFileWriter.close();
+       }
+
+       @Test
+       public void testSplittedIF() throws IOException {
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+               format.configure(parameters);
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+               int elements = 0;
+               int[] elementsPerSplit = new int[4];
+               for (int i = 0; i < splits.length; i++) {
+                       format.open(splits[i]);
+                       while (!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
+       @Test
+       public void testAvroRecoveryWithFailureAtStart() throws Exception {
+               final int recordsUntilCheckpoint = 132;
+
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.configure(parameters);
+
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+
+               int elements = 0;
+               int[] elementsPerSplit = new int[4];
+               for (int i = 0; i < splits.length; i++) {
+                       format.reopen(splits[i], format.getCurrentState());
+                       while (!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+
+                               if (format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
+
+                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
+                                       Tuple2<Long, Long> state = 
format.getCurrentState();
+
+                                       // this is to make sure that nothing 
stays from the previous format
+                                       // (as it is going to be in the normal 
case)
+                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+                                       format.reopen(splits[i], state);
+                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+                               }
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
+       @Test
+       public void testAvroRecovery() throws Exception {
+               final int recordsUntilCheckpoint = 132;
+
+               Configuration parameters = new Configuration();
+
+               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
+               format.configure(parameters);
+
+               FileInputSplit[] splits = format.createInputSplits(4);
+               assertEquals(splits.length, 4);
+
+               int elements = 0;
+               int[] elementsPerSplit = new int[4];
+               for (int i = 0; i < splits.length; i++) {
+                       format.open(splits[i]);
+                       while (!format.reachedEnd()) {
+                               User u = format.nextRecord(null);
+                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                               elements++;
+
+                               if (format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
+
+                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
+                                       Tuple2<Long, Long> state = 
format.getCurrentState();
+
+                                       // this is to make sure that nothing 
stays from the previous format
+                                       // (as it is going to be in the normal 
case)
+                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
+
+                                       format.reopen(splits[i], state);
+                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+                               }
+                               elementsPerSplit[i]++;
+                       }
+                       format.close();
+               }
+
+               Assert.assertEquals(1539, elementsPerSplit[0]);
+               Assert.assertEquals(1026, elementsPerSplit[1]);
+               Assert.assertEquals(1539, elementsPerSplit[2]);
+               Assert.assertEquals(896, elementsPerSplit[3]);
+               Assert.assertEquals(NUM_RECORDS, elements);
+               format.close();
+       }
+
+       /*
+       This test is gave the reference values for the test of Flink's IF.
+
+       This dependency needs to be added
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-mapred</artifactId>
+            <version>1.7.6</version>
+        </dependency>
+
+       @Test
+       public void testHadoop() throws Exception {
+               JobConf jf = new JobConf();
+               FileInputFormat.addInputPath(jf, new 
org.apache.hadoop.fs.Path(testFile.toURI()));
+               
jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY,
 false);
+               org.apache.avro.mapred.AvroInputFormat<User> format = new 
org.apache.avro.mapred.AvroInputFormat<User>();
+               InputSplit[] sp = format.getSplits(jf, 4);
+               int elementsPerSplit[] = new int[4];
+               int cnt = 0;
+               int i = 0;
+               for (InputSplit s:sp) {
+                       RecordReader<AvroWrapper<User>, NullWritable> r = 
format.getRecordReader(s, jf, new HadoopDummyReporter());
+                       AvroWrapper<User> k = r.createKey();
+                       NullWritable v = r.createValue();
+
+                       while (r.next(k, v)) {
+                               cnt++;
+                               elementsPerSplit[i]++;
+                       }
+                       i++;
+               }
+               System.out.println("Status "+Arrays.toString(elementsPerSplit));
+       } **/
+
+       @After
+       public void deleteFiles() {
+               testFile.delete();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
new file mode 100644
index 0000000..87e169b
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes 
for Avro serialization.
+ */
+public class EncoderDecoderTest {
+       @Test
+       public void testComplexStringsDirecty() {
+               try {
+                       Random rnd = new Random(349712539451944123L);
+
+                       for (int i = 0; i < 10; i++) {
+                               String testString = 
StringUtils.getRandomString(rnd, 10, 100);
+
+                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
+                               {
+                                       DataOutputStream dataOut = new 
DataOutputStream(baos);
+                                       DataOutputEncoder encoder = new 
DataOutputEncoder();
+                                       encoder.setOut(dataOut);
+
+                                       encoder.writeString(testString);
+                                       dataOut.flush();
+                                       dataOut.close();
+                               }
+
+                               byte[] data = baos.toByteArray();
+
+                               // deserialize
+                               {
+                                       ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                                       DataInputStream dataIn = new 
DataInputStream(bais);
+                                       DataInputDecoder decoder = new 
DataInputDecoder();
+                                       decoder.setIn(dataIn);
+
+                                       String deserialized = 
decoder.readString();
+
+                                       assertEquals(testString, deserialized);
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test failed due to an exception: " + 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPrimitiveTypes() {
+
+               testObjectSerialization(new Boolean(true));
+               testObjectSerialization(new Boolean(false));
+
+               testObjectSerialization(Byte.valueOf((byte) 0));
+               testObjectSerialization(Byte.valueOf((byte) 1));
+               testObjectSerialization(Byte.valueOf((byte) -1));
+               testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
+               testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
+
+               testObjectSerialization(Short.valueOf((short) 0));
+               testObjectSerialization(Short.valueOf((short) 1));
+               testObjectSerialization(Short.valueOf((short) -1));
+               testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
+               testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
+
+               testObjectSerialization(Integer.valueOf(0));
+               testObjectSerialization(Integer.valueOf(1));
+               testObjectSerialization(Integer.valueOf(-1));
+               testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
+               testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
+
+               testObjectSerialization(Long.valueOf(0));
+               testObjectSerialization(Long.valueOf(1));
+               testObjectSerialization(Long.valueOf(-1));
+               testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
+               testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
+
+               testObjectSerialization(Float.valueOf(0));
+               testObjectSerialization(Float.valueOf(1));
+               testObjectSerialization(Float.valueOf(-1));
+               testObjectSerialization(Float.valueOf((float) Math.E));
+               testObjectSerialization(Float.valueOf((float) Math.PI));
+               testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
+               testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
+               testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
+               testObjectSerialization(Float.valueOf(Float.NaN));
+               testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
+               testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
+
+               testObjectSerialization(Double.valueOf(0));
+               testObjectSerialization(Double.valueOf(1));
+               testObjectSerialization(Double.valueOf(-1));
+               testObjectSerialization(Double.valueOf(Math.E));
+               testObjectSerialization(Double.valueOf(Math.PI));
+               testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
+               testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
+               testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
+               testObjectSerialization(Double.valueOf(Double.NaN));
+               
testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
+               
testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
+
+               testObjectSerialization("");
+               testObjectSerialization("abcdefg");
+               testObjectSerialization("ab\u1535\u0155xyz\u706F");
+
+               testObjectSerialization(new SimpleTypes(3637, 54876486548L, 
(byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
+               testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) 
-65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 
0.0000001));
+       }
+
+       @Test
+       public void testArrayTypes() {
+               {
+                       int[] array = new int[] {1, 2, 3, 4, 5};
+                       testObjectSerialization(array);
+               }
+               {
+                       long[] array = new long[] {1, 2, 3, 4, 5};
+                       testObjectSerialization(array);
+               }
+               {
+                       float[] array = new float[] {1, 2, 3, 4, 5};
+                       testObjectSerialization(array);
+               }
+               {
+                       double[] array = new double[] {1, 2, 3, 4, 5};
+                       testObjectSerialization(array);
+               }
+               {
+                       String[] array = new String[] {"Oh", "my", "what", 
"do", "we", "have", "here", "?"};
+                       testObjectSerialization(array);
+               }
+       }
+
+       @Test
+       public void testEmptyArray() {
+               {
+                       int[] array = new int[0];
+                       testObjectSerialization(array);
+               }
+               {
+                       long[] array = new long[0];
+                       testObjectSerialization(array);
+               }
+               {
+                       float[] array = new float[0];
+                       testObjectSerialization(array);
+               }
+               {
+                       double[] array = new double[0];
+                       testObjectSerialization(array);
+               }
+               {
+                       String[] array = new String[0];
+                       testObjectSerialization(array);
+               }
+       }
+
+       @Test
+       public void testObjects() {
+               // simple object containing only primitives
+               {
+                       testObjectSerialization(new Book(976243875L, "The 
Serialization Odysse", 42));
+               }
+
+               // object with collection
+               {
+                       ArrayList<String> list = new ArrayList<String>();
+                       list.add("A");
+                       list.add("B");
+                       list.add("C");
+                       list.add("D");
+                       list.add("E");
+
+                       testObjectSerialization(new BookAuthor(976243875L, 
list, "Arno Nym"));
+               }
+
+               // object with empty collection
+               {
+                       ArrayList<String> list = new ArrayList<String>();
+                       testObjectSerialization(new BookAuthor(987654321L, 
list, "The Saurus"));
+               }
+       }
+
+       @Test
+       public void testNestedObjectsWithCollections() {
+               testObjectSerialization(new ComplexNestedObject2(true));
+       }
+
+       @Test
+       public void testGeneratedObjectWithNullableFields() {
+               List<CharSequence> strings = Arrays.asList(new CharSequence[] { 
"These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", 
"sequence" });
+               List<Boolean> bools = Arrays.asList(true, true, false, false, 
true, false, true, true);
+               Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
+               map.put("1", 1L);
+               map.put("2", 2L);
+               map.put("3", 3L);
+
+               byte[] b = new byte[16];
+               new Random().nextBytes(b);
+               Fixed16 f = new Fixed16(b);
+               Address addr = new Address(new Integer(239), "6th Main", 
"Bangalore",
+                               "Karnataka", "560075");
+               User user = new User("Freudenreich", 1337, "macintosh gray",
+                               1234567890L, 3.1415926, null, true, strings, 
bools, null,
+                               Colors.GREEN, map, f, new Boolean(true), addr);
+
+               testObjectSerialization(user);
+       }
+
+       @Test
+       public void testVarLenCountEncoding() {
+               try {
+                       long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 
45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
+
+                       // write
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
+                       {
+                               DataOutputStream dataOut = new 
DataOutputStream(baos);
+
+                               for (long val : values) {
+                                       
DataOutputEncoder.writeVarLongCount(dataOut, val);
+                               }
+
+                               dataOut.flush();
+                               dataOut.close();
+                       }
+
+                       // read
+                       {
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+                               DataInputStream dataIn = new 
DataInputStream(bais);
+
+                               for (long val : values) {
+                                       long read = 
DataInputDecoder.readVarLongCount(dataIn);
+                                       assertEquals("Wrong var-len encoded 
value read.", val, read);
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test failed due to an exception: " + 
e.getMessage());
+               }
+       }
+
+       private static <X> void testObjectSerialization(X obj) {
+
+               try {
+
+                       // serialize
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
+                       {
+                               DataOutputStream dataOut = new 
DataOutputStream(baos);
+                               DataOutputEncoder encoder = new 
DataOutputEncoder();
+                               encoder.setOut(dataOut);
+
+                               @SuppressWarnings("unchecked")
+                               Class<X> clazz = (Class<X>) obj.getClass();
+                               ReflectDatumWriter<X> writer = new 
ReflectDatumWriter<X>(clazz);
+
+                               writer.write(obj, encoder);
+                               dataOut.flush();
+                               dataOut.close();
+                       }
+
+                       byte[] data = baos.toByteArray();
+                       X result = null;
+
+                       // deserialize
+                       {
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                               DataInputStream dataIn = new 
DataInputStream(bais);
+                               DataInputDecoder decoder = new 
DataInputDecoder();
+                               decoder.setIn(dataIn);
+
+                               @SuppressWarnings("unchecked")
+                               Class<X> clazz = (Class<X>) obj.getClass();
+                               ReflectDatumReader<X> reader = new 
ReflectDatumReader<X>(clazz);
+
+                               // create a reuse object if possible, otherwise 
we have no reuse object
+                               X reuse = null;
+                               try {
+                                       @SuppressWarnings("unchecked")
+                                       X test = (X) 
obj.getClass().newInstance();
+                                       reuse = test;
+                               } catch (Throwable t) {}
+
+                               result = reader.read(reuse, decoder);
+                       }
+
+                       // check
+                       final String message = "Deserialized object is not the 
same as the original";
+
+                       if (obj.getClass().isArray()) {
+                               Class<?> clazz = obj.getClass();
+                               if (clazz == byte[].class) {
+                                       assertArrayEquals(message, (byte[]) 
obj, (byte[]) result);
+                               }
+                               else if (clazz == short[].class) {
+                                       assertArrayEquals(message, (short[]) 
obj, (short[]) result);
+                               }
+                               else if (clazz == int[].class) {
+                                       assertArrayEquals(message, (int[]) obj, 
(int[]) result);
+                               }
+                               else if (clazz == long[].class) {
+                                       assertArrayEquals(message, (long[]) 
obj, (long[]) result);
+                               }
+                               else if (clazz == char[].class) {
+                                       assertArrayEquals(message, (char[]) 
obj, (char[]) result);
+                               }
+                               else if (clazz == float[].class) {
+                                       assertArrayEquals(message, (float[]) 
obj, (float[]) result, 0.0f);
+                               }
+                               else if (clazz == double[].class) {
+                                       assertArrayEquals(message, (double[]) 
obj, (double[]) result, 0.0);
+                               } else {
+                                       assertArrayEquals(message, (Object[]) 
obj, (Object[]) result);
+                               }
+                       } else {
+                               assertEquals(message, obj, result);
+                       }
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test failed due to an exception: " + 
e.getMessage());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Test Objects
+       // 
--------------------------------------------------------------------------------------------
+
+       private static final class SimpleTypes {
+
+               private final int iVal;
+               private final long lVal;
+               private final byte bVal;
+               private final String sVal;
+               private final short rVal;
+               private final double dVal;
+
+               public SimpleTypes() {
+                       this(0, 0, (byte) 0, "", (short) 0, 0);
+               }
+
+               public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, 
short rVal, double dVal) {
+                       this.iVal = iVal;
+                       this.lVal = lVal;
+                       this.bVal = bVal;
+                       this.sVal = sVal;
+                       this.rVal = rVal;
+                       this.dVal = dVal;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj.getClass() == SimpleTypes.class) {
+                               SimpleTypes other = (SimpleTypes) obj;
+
+                               return other.iVal == this.iVal &&
+                                               other.lVal == this.lVal &&
+                                               other.bVal == this.bVal &&
+                                               other.sVal.equals(this.sVal) &&
+                                               other.rVal == this.rVal &&
+                                               other.dVal == this.dVal;
+
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       private static class ComplexNestedObject1 {
+
+               private double doubleValue;
+
+               private List<String> stringList;
+
+               public ComplexNestedObject1() {}
+
+               public ComplexNestedObject1(int offInit) {
+                       this.doubleValue = 6293485.6723 + offInit;
+
+                       this.stringList = new ArrayList<String>();
+                       this.stringList.add("A" + offInit);
+                       this.stringList.add("somewhat" + offInit);
+                       this.stringList.add("random" + offInit);
+                       this.stringList.add("collection" + offInit);
+                       this.stringList.add("of" + offInit);
+                       this.stringList.add("strings" + offInit);
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj.getClass() == ComplexNestedObject1.class) {
+                               ComplexNestedObject1 other = 
(ComplexNestedObject1) obj;
+                               return other.doubleValue == this.doubleValue && 
this.stringList.equals(other.stringList);
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       private static class ComplexNestedObject2 {
+
+               private long longValue;
+
+               private Map<String, ComplexNestedObject1> theMap;
+
+               public ComplexNestedObject2() {}
+
+               public ComplexNestedObject2(boolean init) {
+                       this.longValue = 46547;
+
+                       this.theMap = new HashMap<String, 
ComplexNestedObject1>();
+                       this.theMap.put("36354L", new 
ComplexNestedObject1(43546543));
+                       this.theMap.put("785611L", new 
ComplexNestedObject1(45784568));
+                       this.theMap.put("43L", new 
ComplexNestedObject1(9876543));
+                       this.theMap.put("-45687L", new 
ComplexNestedObject1(7897615));
+                       this.theMap.put("1919876876896L", new 
ComplexNestedObject1(27154));
+                       this.theMap.put("-868468468L", new 
ComplexNestedObject1(546435));
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj.getClass() == ComplexNestedObject2.class) {
+                               ComplexNestedObject2 other = 
(ComplexNestedObject2) obj;
+                               return other.longValue == this.longValue && 
this.theMap.equals(other.theMap);
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       private static class Book {
+
+               private long bookId;
+               private String title;
+               private long authorId;
+
+               public Book() {}
+
+               public Book(long bookId, String title, long authorId) {
+                       this.bookId = bookId;
+                       this.title = title;
+                       this.authorId = authorId;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj.getClass() == Book.class) {
+                               Book other = (Book) obj;
+                               return other.bookId == this.bookId && 
other.authorId == this.authorId && this.title.equals(other.title);
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       private static class BookAuthor {
+
+               private long authorId;
+               private List<String> bookTitles;
+               private String authorName;
+
+               public BookAuthor() {}
+
+               public BookAuthor(long authorId, List<String> bookTitles, 
String authorName) {
+                       this.authorId = authorId;
+                       this.bookTitles = bookTitles;
+                       this.authorName = authorName;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj.getClass() == BookAuthor.class) {
+                               BookAuthor other = (BookAuthor) obj;
+                               return other.authorName.equals(this.authorName) 
&& other.authorId == this.authorId &&
+                                               
other.bookTitles.equals(this.bookTitles);
+                       } else {
+                               return false;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java
new file mode 100644
index 0000000..17f56a6
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/testjar/AvroExternalJarProgram.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.testjar;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.AvroInputFormat;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This file defines the classes for the AvroExternalJarProgramITCase.
+ */
+public class AvroExternalJarProgram  {
+
+       private static final class Color {
+
+               private String name;
+               private double saturation;
+
+               public Color() {
+                       name = "";
+                       saturation = 1.0;
+               }
+
+               public Color(String name, double saturation) {
+                       this.name = name;
+                       this.saturation = saturation;
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               public void setName(String name) {
+                       this.name = name;
+               }
+
+               public double getSaturation() {
+                       return saturation;
+               }
+
+               public void setSaturation(double saturation) {
+                       this.saturation = saturation;
+               }
+
+               @Override
+               public String toString() {
+                       return name + '(' + saturation + ')';
+               }
+       }
+
+       private static final class MyUser {
+
+               private String name;
+               private List<Color> colors;
+
+               public MyUser() {
+                       name = "unknown";
+                       colors = new ArrayList<Color>();
+               }
+
+               public MyUser(String name, List<Color> colors) {
+                       this.name = name;
+                       this.colors = colors;
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               public List<Color> getColors() {
+                       return colors;
+               }
+
+               public void setName(String name) {
+                       this.name = name;
+               }
+
+               public void setColors(List<Color> colors) {
+                       this.colors = colors;
+               }
+
+               @Override
+               public String toString() {
+                       return name + " : " + colors;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static final class NameExtractor extends 
RichMapFunction<MyUser, Tuple2<String, MyUser>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<String, MyUser> map(MyUser u) {
+                       String namePrefix = u.getName().substring(0, 1);
+                       return new Tuple2<String, MyUser>(namePrefix, u);
+               }
+       }
+
+       private static final class NameGrouper extends 
RichReduceFunction<Tuple2<String, MyUser>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> 
val1, Tuple2<String, MyUser> val2) {
+                       return val1;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Test Data
+       // 
--------------------------------------------------------------------------------------------
+
+       private static final class Generator {
+
+               private final Random rnd = new Random(2389756789345689276L);
+
+               public MyUser nextUser() {
+                       return randomUser();
+               }
+
+               private MyUser randomUser() {
+
+                       int numColors = rnd.nextInt(5);
+                       ArrayList<Color> colors = new 
ArrayList<Color>(numColors);
+                       for (int i = 0; i < numColors; i++) {
+                               colors.add(new Color(randomString(), 
rnd.nextDouble()));
+                       }
+
+                       return new MyUser(randomString(), colors);
+               }
+
+               private String randomString() {
+                       char[] c = new char[this.rnd.nextInt(20) + 5];
+
+                       for (int i = 0; i < c.length; i++) {
+                               c[i] = (char) (this.rnd.nextInt(150) + 40);
+                       }
+
+                       return new String(c);
+               }
+       }
+
+       public static void writeTestData(File testFile, int numRecords) throws 
IOException {
+
+               DatumWriter<MyUser> userDatumWriter = new 
ReflectDatumWriter<MyUser>(MyUser.class);
+               DataFileWriter<MyUser> dataFileWriter = new 
DataFileWriter<MyUser>(userDatumWriter);
+
+               
dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
+
+               Generator generator = new Generator();
+
+               for (int i = 0; i < numRecords; i++) {
+                       MyUser user = generator.nextUser();
+                       dataFileWriter.append(user);
+               }
+
+               dataFileWriter.close();
+       }
+
+//     public static void main(String[] args) throws Exception {
+//             String testDataFile = new 
File("src/test/resources/testdata.avro").getAbsolutePath();
+//             writeTestData(new File(testDataFile), 50);
+//     }
+
+       public static void main(String[] args) throws Exception {
+               String inputPath = args[0];
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<MyUser> input = env.createInput(new 
AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
+
+               DataSet<Tuple2<String, MyUser>> result = input.map(new 
NameExtractor()).groupBy(0).reduce(new NameGrouper());
+
+               result.output(new DiscardingOutputFormat<Tuple2<String, 
MyUser>>());
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
new file mode 100644
index 0000000..89be9c0
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericArraySerializerTest;
+
+/**
+ * Test for the {@link AvroSerializer}.
+ */
+public class AvroGenericArraySerializerTest extends 
AbstractGenericArraySerializerTest {
+
+       @Override
+       protected <T> TypeSerializer<T> createComponentSerializer(Class<T> 
type) {
+               return new AvroSerializer<T>(type);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
new file mode 100644
index 0000000..a247766
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeComparatorTest;
+
+/**
+ * Test for the {@link AvroSerializer}.
+ */
+public class AvroGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
+
+       @Override
+       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+               return new AvroSerializer<>(type);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
new file mode 100644
index 0000000..1c1a19b
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
+
+/**
+ * Test for the {@link AvroSerializer}.
+ */
+public class AvroGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
+
+       @Override
+       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+               return new AvroSerializer<>(type);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
new file mode 100644
index 0000000..bb3d911
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+
+import org.apache.avro.reflect.Nullable;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link AvroSerializer}.
+ */
+public class AvroSerializerEmptyArrayTest {
+
+       @Test
+       public void testBookSerialization() {
+               try {
+                       Book b = new Book(123, "This is a test book", 26382648);
+                       AvroSerializer<Book> serializer = new 
AvroSerializer<Book>(Book.class);
+                       SerializerTestInstance<Book> test = new 
SerializerTestInstance<Book>(serializer, Book.class, -1, b);
+                       test.testAll();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSerialization() {
+               try {
+                       List<String> titles = new ArrayList<String>();
+
+                       List<Book> books = new ArrayList<Book>();
+                       books.add(new Book(123, "This is a test book", 1));
+                       books.add(new Book(24234234, "This is a test book", 1));
+                       books.add(new Book(1234324, "This is a test book", 3));
+
+                       BookAuthor a = new BookAuthor(1, titles, "Test Author");
+                       a.books = books;
+                       a.bookType = BookAuthor.BookType.journal;
+
+                       AvroSerializer<BookAuthor> serializer = new 
AvroSerializer<BookAuthor>(BookAuthor.class);
+
+                       SerializerTestInstance<BookAuthor> test = new 
SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
+                       test.testAll();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       /**
+        * Avro POJO for testing.
+        */
+       public static class Book {
+
+               long bookId;
+               @Nullable
+               String title;
+               long authorId;
+
+               public Book() {}
+
+               public Book(long bookId, String title, long authorId) {
+                       this.bookId = bookId;
+                       this.title = title;
+                       this.authorId = authorId;
+               }
+
+               @Override
+               public int hashCode() {
+                       final int prime = 31;
+                       int result = 1;
+                       result = prime * result + (int) (authorId ^ (authorId 
>>> 32));
+                       result = prime * result + (int) (bookId ^ (bookId >>> 
32));
+                       result = prime * result + ((title == null) ? 0 : 
title.hashCode());
+                       return result;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (this == obj) {
+                               return true;
+                       }
+                       if (obj == null) {
+                               return false;
+                       }
+                       if (getClass() != obj.getClass()) {
+                               return false;
+                       }
+                       Book other = (Book) obj;
+                       if (authorId != other.authorId) {
+                               return false;
+                       }
+                       if (bookId != other.bookId) {
+                               return false;
+                       }
+                       if (title == null) {
+                               if (other.title != null) {
+                                       return false;
+                               }
+                       } else if (!title.equals(other.title)) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       /**
+        * Avro POJO for testing.
+        */
+       public static class BookAuthor {
+
+               enum BookType {
+                       book,
+                       article,
+                       journal
+               }
+
+               long authorId;
+
+               @Nullable
+               List<String> bookTitles;
+
+               @Nullable
+               List<Book> books;
+
+               String authorName;
+
+               BookType bookType;
+
+               public BookAuthor() {}
+
+               public BookAuthor(long authorId, List<String> bookTitles, 
String authorName) {
+                       this.authorId = authorId;
+                       this.bookTitles = bookTitles;
+                       this.authorName = authorName;
+               }
+
+               @Override
+               public int hashCode() {
+                       final int prime = 31;
+                       int result = 1;
+                       result = prime * result + (int) (authorId ^ (authorId 
>>> 32));
+                       result = prime * result + ((authorName == null) ? 0 : 
authorName.hashCode());
+                       result = prime * result + ((bookTitles == null) ? 0 : 
bookTitles.hashCode());
+                       result = prime * result + ((bookType == null) ? 0 : 
bookType.hashCode());
+                       result = prime * result + ((books == null) ? 0 : 
books.hashCode());
+                       return result;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (this == obj) {
+                               return true;
+                       }
+                       if (obj == null) {
+                               return false;
+                       }
+                       if (getClass() != obj.getClass()) {
+                               return false;
+                       }
+                       BookAuthor other = (BookAuthor) obj;
+                       if (authorId != other.authorId) {
+                               return false;
+                       }
+                       if (authorName == null) {
+                               if (other.authorName != null) {
+                                       return false;
+                               }
+                       } else if (!authorName.equals(other.authorName)) {
+                               return false;
+                       }
+                       if (bookTitles == null) {
+                               if (other.bookTitles != null) {
+                                       return false;
+                               }
+                       } else if (!bookTitles.equals(other.bookTitles)) {
+                               return false;
+                       }
+                       if (bookType != other.bookType) {
+                               return false;
+                       }
+                       if (books == null) {
+                               if (other.books != null) {
+                                       return false;
+                               }
+                       } else if (!books.equals(other.books)) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+}

Reply via email to