http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
new file mode 100644
index 0000000..ae41031
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.AvroInputFormat;
+import org.apache.flink.formats.avro.AvroRecordInputFormatTest;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for the {@link AvroInputFormat} reading Pojos.
+ */
+@RunWith(Parameterized.class)
+public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
+       public AvroTypeExtractionTest(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       private File inFile;
+       private String resultPath;
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+               inFile = tempFolder.newFile();
+               AvroRecordInputFormatTest.writeTestFile(inFile);
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testSimpleAvroRead() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users)
+                               // null map type because the order changes in 
different JVMs (hard to test)
+               .map(new MapFunction<User, User>() {
+                       @Override
+                       public User map(User value) throws Exception {
+                               value.setTypeMap(null);
+                               return value;
+                       }
+               });
+
+               usersDS.writeAsText(resultPath);
+
+               env.execute("Simple Avro read job");
+
+               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": null, \"type_fixed\": null, \"type_union\": null, 
\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": 
\"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
+                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, 
\"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", 
\"zip\": \"NW1 6XE\"}}\n";
+       }
+
+       @Test
+       public void testSerializeWithAvro() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableForceAvro();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users)
+                               // null map type because the order changes in 
different JVMs (hard to test)
+                               .map(new MapFunction<User, User>() {
+                                       @Override
+                                       public User map(User value) throws 
Exception {
+                                               Map<CharSequence, Long> ab = 
new HashMap<CharSequence, Long>(1);
+                                               ab.put("hehe", 12L);
+                                               value.setTypeMap(ab);
+                                               return value;
+                                       }
+                               });
+
+               usersDS.writeAsText(resultPath);
+
+               env.execute("Simple Avro read job");
+
+               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, 
\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": 
\"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
+                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": 
{\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": 
{\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": 
\"London\", \"zip\": \"NW1 6XE\"}}\n";
+
+       }
+
+       @Test
+       public void testKeySelection() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableObjectReuse();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users);
+
+               DataSet<Tuple2<String, Integer>> res = 
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
+                       @Override
+                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+                               for (User u : values) {
+                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
+                               }
+                       }
+               });
+               res.writeAsText(resultPath);
+               env.execute("Avro Key selection");
+
+               expected = "(Alyssa,1)\n(Charlie,1)\n";
+       }
+
+       @Test
+       public void testWithAvroGenericSer() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableForceAvro();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users);
+
+               DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new 
KeySelector<User, String>() {
+                       @Override
+                       public String getKey(User value) throws Exception {
+                               return String.valueOf(value.getName());
+                       }
+               }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, 
Integer>>() {
+                       @Override
+                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+                               for (User u : values) {
+                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
+                               }
+                       }
+               });
+
+               res.writeAsText(resultPath);
+               env.execute("Avro Key selection");
+
+               expected = "(Charlie,1)\n(Alyssa,1)\n";
+       }
+
+       @Test
+       public void testWithKryoGenericSer() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().enableForceKryo();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users);
+
+               DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new 
KeySelector<User, String>() {
+                       @Override
+                       public String getKey(User value) throws Exception {
+                               return String.valueOf(value.getName());
+                       }
+               }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, 
Integer>>() {
+                       @Override
+                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+                               for (User u : values) {
+                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
+                               }
+                       }
+               });
+
+               res.writeAsText(resultPath);
+               env.execute("Avro Key selection");
+
+               expected = "(Charlie,1)\n(Alyssa,1)\n";
+       }
+
+       /**
+        * Test some know fields for grouping on.
+        */
+       @Test
+       public void testAllFields() throws Exception {
+               for (String fieldName : Arrays.asList("name", "type_enum", 
"type_double_test")) {
+                       testField(fieldName);
+               }
+       }
+
+       private void testField(final String fieldName) throws Exception {
+               before();
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users);
+
+               DataSet<Object> res = 
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
+                       @Override
+                       public void reduce(Iterable<User> values, 
Collector<Object> out) throws Exception {
+                               for (User u : values) {
+                                       out.collect(u.get(fieldName));
+                               }
+                       }
+               });
+               res.writeAsText(resultPath);
+               env.execute("Simple Avro read job");
+
+               // test if automatic registration of the Types worked
+               ExecutionConfig ec = env.getConfig();
+               
Assert.assertTrue(ec.getRegisteredKryoTypes().contains(Fixed16.class));
+
+               if (fieldName.equals("name")) {
+                       expected = "Alyssa\nCharlie";
+               } else if (fieldName.equals("type_enum")) {
+                       expected = "GREEN\nRED\n";
+               } else if (fieldName.equals("type_double_test")) {
+                       expected = "123.45\n1.337\n";
+               } else {
+                       Assert.fail("Unknown field");
+               }
+
+               after();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
new file mode 100644
index 0000000..79a4a45
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.User;
+
+/**
+ * Test for {@link AvroTypeInfo}.
+ */
+public class AvroTypeInfoTest extends TypeInformationTestBase<AvroTypeInfo<?>> 
{
+
+       @Override
+       protected AvroTypeInfo<?>[] getTestData() {
+               return new AvroTypeInfo<?>[] {
+                       new AvroTypeInfo<>(Address.class),
+                       new AvroTypeInfo<>(User.class),
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
new file mode 100644
index 0000000..90ac040
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * Utilities for creating Avro Schemas.
+ */
+public final class AvroTestUtils {
+
+       private static final String NAMESPACE = 
"org.apache.flink.streaming.connectors.kafka";
+
+       /**
+        * Creates a flat Avro Schema for testing.
+        */
+       public static Schema createFlatAvroSchema(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+               final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = 
SchemaBuilder
+                       .record("BasicAvroRecord")
+                       .namespace(NAMESPACE)
+                       .fields();
+
+               final Schema nullSchema = Schema.create(Schema.Type.NULL);
+
+               for (int i = 0; i < fieldNames.length; i++) {
+                       Schema schema = 
ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
+                       Schema unionSchema = 
Schema.createUnion(Arrays.asList(nullSchema, schema));
+                       
fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
+               }
+
+               return fieldAssembler.endRecord();
+       }
+
+       /**
+        * Tests a simple Avro data types without nesting.
+        */
+       public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> getSimpleTestData() {
+               final Address addr = Address.newBuilder()
+                       .setNum(42)
+                       .setStreet("Main Street 42")
+                       .setCity("Test City")
+                       .setState("Test State")
+                       .setZip("12345")
+                       .build();
+
+               final Row rowAddr = new Row(5);
+               rowAddr.setField(0, 42);
+               rowAddr.setField(1, "Main Street 42");
+               rowAddr.setField(2, "Test City");
+               rowAddr.setField(3, "Test State");
+               rowAddr.setField(4, "12345");
+
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> t = new Tuple3<>();
+               t.f0 = Address.class;
+               t.f1 = addr;
+               t.f2 = rowAddr;
+
+               return t;
+       }
+
+       /**
+        * Tests all Avro data types as well as nested types.
+        */
+       public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> getComplexTestData() {
+               final Address addr = Address.newBuilder()
+                       .setNum(42)
+                       .setStreet("Main Street 42")
+                       .setCity("Test City")
+                       .setState("Test State")
+                       .setZip("12345")
+                       .build();
+
+               final Row rowAddr = new Row(5);
+               rowAddr.setField(0, 42);
+               rowAddr.setField(1, "Main Street 42");
+               rowAddr.setField(2, "Test City");
+               rowAddr.setField(3, "Test State");
+               rowAddr.setField(4, "12345");
+
+               final User user = User.newBuilder()
+                       .setName("Charlie")
+                       .setFavoriteNumber(null)
+                       .setFavoriteColor("blue")
+                       .setTypeLongTest(1337L)
+                       .setTypeDoubleTest(1.337d)
+                       .setTypeNullTest(null)
+                       .setTypeBoolTest(false)
+                       .setTypeArrayString(new ArrayList<CharSequence>())
+                       .setTypeArrayBoolean(new ArrayList<Boolean>())
+                       .setTypeNullableArray(null)
+                       .setTypeEnum(Colors.RED)
+                       .setTypeMap(new HashMap<CharSequence, Long>())
+                       .setTypeFixed(null)
+                       .setTypeUnion(null)
+                       .setTypeNested(addr)
+                       .build();
+
+               final Row rowUser = new Row(15);
+               rowUser.setField(0, "Charlie");
+               rowUser.setField(1, null);
+               rowUser.setField(2, "blue");
+               rowUser.setField(3, 1337L);
+               rowUser.setField(4, 1.337d);
+               rowUser.setField(5, null);
+               rowUser.setField(6, false);
+               rowUser.setField(7, new ArrayList<CharSequence>());
+               rowUser.setField(8, new ArrayList<Boolean>());
+               rowUser.setField(9, null);
+               rowUser.setField(10, Colors.RED);
+               rowUser.setField(11, new HashMap<CharSequence, Long>());
+               rowUser.setField(12, null);
+               rowUser.setField(13, null);
+               rowUser.setField(14, rowAddr);
+
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> t = new Tuple3<>();
+               t.f0 = User.class;
+               t.f1 = user;
+               t.f2 = rowUser;
+
+               return t;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc 
b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
new file mode 100644
index 0000000..9685a15
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -0,0 +1,35 @@
+[
+{"namespace": "org.apache.flink.formats.avro.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+     {"name": "num", "type": "int"},
+     {"name": "street", "type": "string"},
+     {"name": "city", "type": "string"},
+     {"name": "state", "type": "string"},
+     {"name": "zip", "type": "string"}
+  ]
+},
+{"namespace": "org.apache.flink.formats.avro.generated",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "type_long_test", "type": ["long", "null"]},
+     {"name": "type_double_test", "type": "double"},
+     {"name": "type_null_test", "type": ["null"]},
+     {"name": "type_bool_test", "type": ["boolean"]},
+     {"name": "type_array_string", "type" : {"type" : "array", "items" : 
"string"}},
+     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : 
"boolean"}},
+     {"name": "type_nullable_array", "type": ["null", {"type":"array", 
"items":"string"}], "default":null},
+     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", 
"symbols" : ["RED", "GREEN", "BLUE"]}},
+     {"name": "type_map", "type": {"type": "map", "values": "long"}},
+     {"name": "type_fixed",
+                 "size": 16,
+                 "type": ["null", {"name": "Fixed16", "size": 16, "type": 
"fixed"}] },
+     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
+     {"name": "type_nested", "type": ["null", "Address"]}
+ ]
+}]

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/log4j-test.properties 
b/flink-formats/flink-avro/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..881dc06
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/testdata.avro 
b/flink-formats/flink-avro/src/test/resources/testdata.avro
new file mode 100644
index 0000000..3102d03
Binary files /dev/null and 
b/flink-formats/flink-avro/src/test/resources/testdata.avro differ

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
new file mode 100644
index 0000000..f8de3e0
--- /dev/null
+++ b/flink-formats/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+
+       <artifactId>flink-formats</artifactId>
+       <name>flink-formats</name>
+       <packaging>pom</packaging>
+
+       <modules>
+               <module>flink-avro</module>
+       </modules>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index e4b907a..2906eb8 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -168,31 +168,30 @@ public class HDFSTest {
        }
 
        @Test
-       public void testAvroOut() {
-               String type = "one";
-               AvroOutputFormat<String> avroOut =
-                               new AvroOutputFormat<String>(String.class);
+       public void testChangingFileNames() {
+               org.apache.hadoop.fs.Path hdfsPath = new 
org.apache.hadoop.fs.Path(hdfsURI + "/hdfsTest");
+               Path path = new Path(hdfsPath.toString());
 
-               org.apache.hadoop.fs.Path result = new 
org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
+               String type = "one";
+               TextOutputFormat<String> outputFormat = new 
TextOutputFormat<>(path);
 
-               avroOut.setOutputFilePath(new Path(result.toString()));
-               avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
-               
avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
+               outputFormat.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
+               
outputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
 
                try {
-                       avroOut.open(0, 2);
-                       avroOut.writeRecord(type);
-                       avroOut.close();
+                       outputFormat.open(0, 2);
+                       outputFormat.writeRecord(type);
+                       outputFormat.close();
 
-                       avroOut.open(1, 2);
-                       avroOut.writeRecord(type);
-                       avroOut.close();
+                       outputFormat.open(1, 2);
+                       outputFormat.writeRecord(type);
+                       outputFormat.close();
 
-                       assertTrue("No result file present", 
hdfs.exists(result));
-                       FileStatus[] files = hdfs.listStatus(result);
+                       assertTrue("No result file present", 
hdfs.exists(hdfsPath));
+                       FileStatus[] files = hdfs.listStatus(hdfsPath);
                        Assert.assertEquals(2, files.length);
                        for (FileStatus file : files) {
-                               
assertTrue("1.avro".equals(file.getPath().getName()) || 
"2.avro".equals(file.getPath().getName()));
+                               assertTrue("1".equals(file.getPath().getName()) 
|| "2".equals(file.getPath().getName()));
                        }
 
                } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml 
b/flink-libraries/flink-cep/pom.xml
index 23978b2..bd57d17 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -89,7 +89,13 @@ under the License.
             <scope>test</scope>
         </dependency>
 
-
+        <!-- we include Avro to make the CEPMigrationTest work, it uses a 
Kryo-serialized savepoint (see FLINK-7420) -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     
     <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml 
b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
index ff6f84d..864c257 100644
--- a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
@@ -161,6 +161,12 @@ under the License.
                                        <groupId>commons-beanutils</groupId>
                                        
<artifactId>commons-beanutils</artifactId>
                                </exclusion>
+
+                               <!-- we don't want Hadoop's Avro dependency, 
since Flink adds its own Avro support --> 
+                               <exclusion>
+                                       <groupId>org.apache.avro</groupId>
+                                       <artifactId>avro</artifactId>
+                               </exclusion>
                        </exclusions>
                </dependency>
 
@@ -412,6 +418,12 @@ under the License.
                                        
<groupId>com.sun.jersey.contribs</groupId>
                                        <artifactId>jersey-guice</artifactId>
                                </exclusion>
+
+                               <!-- we don't want Hadoop's Avro dependency, 
since Flink adds its own Avro support -->
+                               <exclusion>
+                                       <groupId>org.apache.avro</groupId>
+                                       <artifactId>avro</artifactId>
+                               </exclusion>
                        </exclusions>
                </dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-shaded-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index ba90fc9..3e7cb41 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -52,11 +52,6 @@ under the License.
                        <artifactId>slf4j-api</artifactId>
                        <scope>provided</scope>
                </dependency>
-               <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
-                       <scope>provided</scope>
-               </dependency>
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 8e10a2e..2217199 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -170,21 +170,6 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-avro_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-avro_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9cf603a..5302c5c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,7 @@ under the License.
                <module>flink-streaming-java</module>
                <module>flink-streaming-scala</module>
                <module>flink-connectors</module>
+               <module>flink-formats</module>
                <module>flink-examples</module>
                <module>flink-clients</module>
                <module>flink-queryable-state</module>
@@ -275,19 +276,6 @@ under the License.
                                <version>5.0.4-1.0</version>
                        </dependency>
 
-                       <!-- Make sure we use a consistent avro version 
throughout the project -->
-                       <dependency>
-                               <groupId>org.apache.avro</groupId>
-                               <artifactId>avro</artifactId>
-                               <version>1.8.2</version>
-                       </dependency>
-                       
-                       <dependency>
-                               <groupId>org.apache.avro</groupId>
-                               <artifactId>avro-ipc</artifactId>
-                               <version>1.8.2</version>
-                       </dependency>
-
                        <dependency>
                                <groupId>org.xerial.snappy</groupId>
                                <artifactId>snappy-java</artifactId>
@@ -1033,7 +1021,8 @@ under the License.
                                                <!-- Test Data. -->
                                                
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
                                                
<exclude>flink-runtime/src/test/resources/flink_11-kryo_registrations</exclude>
-                                               
<exclude>flink-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude>
+                                               
<exclude>flink-core/src/test/resources/kryo-serializer-config-snapshot-v1</exclude>
+                                               
<exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude>
                                                
<exclude>out/test/flink-avro/avro/user.avsc</exclude>
                                                
<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
                                                
<exclude>test-infra/end-to-end-test/test-data/*</exclude>
@@ -1042,8 +1031,8 @@ under the License.
                                                
<exclude>**/src/test/resources/*-snapshot</exclude>
                                                
<exclude>**/src/test/resources/*-savepoint</exclude>
 
-                                               
<exclude>flink-connectors/flink-avro/src/test/resources/testdata.avro</exclude>
-                                               
<exclude>flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
+                                               
<exclude>flink-formats/flink-avro/src/test/resources/testdata.avro</exclude>
+                                               
<exclude>flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/*.java</exclude>
                                                
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude>
                                                
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude>
                                                <!-- Configuration Files. -->

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 5d5c455..a58e17c 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -23,8 +23,7 @@ under the License.
                "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd";>
 
 <suppressions>
-               <suppress 
files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java"
 checks="[a-zA-Z0-9]*"/>
-               <suppress 
files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]generated[\\/].*.java"
 checks="[a-zA-Z0-9]*"/>
+               <suppress 
files="org[\\/]apache[\\/]flink[\\/]formats[\\/]avro[\\/]generated[\\/].*.java" 
checks="[a-zA-Z0-9]*"/>
                <!-- Sometimes we have to temporarily fix very long, different 
formatted Calcite files. -->
                <suppress files="org[\\/]apache[\\/]calcite.*" 
checks="[a-zA-Z0-9]*"/>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 978bc9f..fda6023 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -77,7 +77,7 @@ flink-filesystems/flink-hadoop-fs,\
 flink-filesystems/flink-mapr-fs,\
 flink-filesystems/flink-s3-fs-hadoop,\
 flink-filesystems/flink-s3-fs-presto,\
-flink-connectors/flink-avro,\
+flink-formats/flink-avro,\
 flink-connectors/flink-hbase,\
 flink-connectors/flink-hcatalog,\
 flink-connectors/flink-hadoop-compatibility,\

Reply via email to