http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
index d916116..048e7ac 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -23,12 +23,11 @@ import java.util.StringTokenizer
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, 
ObjectMapper}
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.twitter.TwitterSource
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
 import org.apache.flink.util.Collector
-import org.codehaus.jackson.JsonNode
-import org.codehaus.jackson.map.ObjectMapper
 
 import scala.collection.mutable.ListBuffer
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
new file mode 100644
index 0000000..19d9129
--- /dev/null
+++ b/flink-formats/flink-avro/pom.xml
@@ -0,0 +1,280 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-formats</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-avro_${scala.binary.version}</artifactId>
+       <name>flink-avro</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+                       <version>1.8.2</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-clients_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>create-test-dependency</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <archive>
+                                                               <manifest>
+                                                                       
<mainClass>org.apache.flink.formats.avro.testjar.AvroExternalJarProgram</mainClass>
+                                                               </manifest>
+                                                       </archive>
+                                                       
<finalName>maven</finalName>
+                                                       <attach>false</attach>
+                                                       <descriptors>
+                                                               
<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+                                                       </descriptors>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <!--Remove the AvroExternalJarProgram code from the 
test-classes directory since it musn't be in the
+                       classpath when running the tests to actually test 
whether the user code class loader
+                       is properly used.-->
+                       <plugin>
+                               <artifactId>maven-clean-plugin</artifactId>
+                               <version>2.5</version><!--$NO-MVN-MAN-VER$-->
+                               <executions>
+                                       <execution>
+                                               
<id>remove-avroexternalprogram</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>clean</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<excludeDefaultDirectories>true</excludeDefaultDirectories>
+                                                       <filesets>
+                                                               <fileset>
+                                                                       
<directory>${project.build.testOutputDirectory}</directory>
+                                                                       
<includes>
+                                                                               
<include>**/testjar/*.class</include>
+                                                                       
</includes>
+                                                               </fileset>
+                                                       </filesets>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <filesets>
+                                               <fileset>
+                                                       
<directory>${project.basedir}/src/test/java/org/apache/flink/formats/avro/generated</directory>
+                                               </fileset>
+                                       </filesets>
+                               </configuration>
+                       </plugin>
+                       <!-- Generate Test class from avro schema -->
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>1.8.2</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+                                                       
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Add Avro test classes to test jar in order to test 
AvroTypeInfo. -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration 
combine.self="override">
+                                                       
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                                                       
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>org.codehaus.jackson:*</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>org.codehaus.jackson</pattern>
+                                                                       
<shadedPattern>org.apache.flink.avro.shaded.org.codehouse.jackson</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+
+               <pluginManagement>
+                       <plugins>
+                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
+                               <plugin>
+                                       <groupId>org.eclipse.m2e</groupId>
+                                       
<artifactId>lifecycle-mapping</artifactId>
+                                       <version>1.0.0</version>
+                                       <configuration>
+                                               <lifecycleMappingMetadata>
+                                                       <pluginExecutions>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.maven.plugins</groupId>
+                                                                               
<artifactId>maven-assembly-plugin</artifactId>
+                                                                               
<versionRange>[2.4,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>single</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.maven.plugins</groupId>
+                                                                               
<artifactId>maven-clean-plugin</artifactId>
+                                                                               
<versionRange>[1,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>clean</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.avro</groupId>
+                                                                               
<artifactId>avro-maven-plugin</artifactId>
+                                                                               
<versionRange>[1.7.7,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>schema</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                       </pluginExecutions>
+                                               </lifecycleMappingMetadata>
+                                       </configuration>
+                               </plugin>
+                       </plugins>
+               </pluginManagement>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..58085f6
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import static 
org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema;
+
+/**
+ * @deprecated Please use 
<code>org.apache.flink.formats.avro.typeutils.AvroTypeInfo</code>
+ * in the <code>flink-avro</code> module. This class will be removed in the 
near future.
+ */
+@Deprecated
+@Public
+public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
+
+       public AvroTypeInfo(Class<T> typeClass) {
+               super(typeClass, generateFieldsFromAvroSchema(typeClass));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
new file mode 100644
index 0000000..9b73ceb
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides a {@link FileInputFormat} for Avro records.
+ *
+ * @param <E>
+ *            the type of the result Avro record. If you specify
+ *            {@link GenericRecord} then the result will be returned as a
+ *            {@link GenericRecord}, so you do not have to know the schema 
ahead
+ *            of time.
+ */
+public class AvroInputFormat<E> extends FileInputFormat<E> implements 
ResultTypeQueryable<E>,
+       CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AvroInputFormat.class);
+
+       private final Class<E> avroValueType;
+
+       private boolean reuseAvroValue = true;
+
+       private transient DataFileReader<E> dataFileReader;
+
+       private transient long end;
+
+       private transient long recordsReadSinceLastSync;
+
+       private long lastSync = -1L;
+
+       public AvroInputFormat(Path filePath, Class<E> type) {
+               super(filePath);
+               this.avroValueType = type;
+       }
+
+       /**
+        * Sets the flag whether to reuse the Avro value instance for all 
records.
+        * By default, the input format reuses the Avro value.
+        *
+        * @param reuseAvroValue True, if the input format should reuse the 
Avro value instance, false otherwise.
+        */
+       public void setReuseAvroValue(boolean reuseAvroValue) {
+               this.reuseAvroValue = reuseAvroValue;
+       }
+
+       /**
+        * If set, the InputFormat will only read entire files.
+        */
+       public void setUnsplittable(boolean unsplittable) {
+               this.unsplittable = unsplittable;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Typing
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TypeInformation<E> getProducedType() {
+               return TypeExtractor.getForClass(this.avroValueType);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Input Format Methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void open(FileInputSplit split) throws IOException {
+               super.open(split);
+               dataFileReader = initReader(split);
+               dataFileReader.sync(split.getStart());
+               lastSync = dataFileReader.previousSync();
+       }
+
+       private DataFileReader<E> initReader(FileInputSplit split) throws 
IOException {
+               DatumReader<E> datumReader;
+
+               if (org.apache.avro.generic.GenericRecord.class == 
avroValueType) {
+                       datumReader = new GenericDatumReader<E>();
+               } else {
+                       datumReader = 
org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
+                               ? new SpecificDatumReader<E>(avroValueType) : 
new ReflectDatumReader<E>(avroValueType);
+               }
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Opening split {}", split);
+               }
+
+               SeekableInput in = new FSDataInputStreamWrapper(stream, 
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+               DataFileReader<E> dataFileReader = (DataFileReader) 
DataFileReader.openReader(in, datumReader);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Loaded SCHEMA: {}", 
dataFileReader.getSchema());
+               }
+
+               end = split.getStart() + split.getLength();
+               recordsReadSinceLastSync = 0;
+               return dataFileReader;
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return !dataFileReader.hasNext() || 
dataFileReader.pastSync(end);
+       }
+
+       public long getRecordsReadFromBlock() {
+               return this.recordsReadSinceLastSync;
+       }
+
+       @Override
+       public E nextRecord(E reuseValue) throws IOException {
+               if (reachedEnd()) {
+                       return null;
+               }
+
+               // if we start a new block, then register the event, and
+               // restart the counter.
+               if (dataFileReader.previousSync() != lastSync) {
+                       lastSync = dataFileReader.previousSync();
+                       recordsReadSinceLastSync = 0;
+               }
+               recordsReadSinceLastSync++;
+
+               if (reuseAvroValue) {
+                       return dataFileReader.next(reuseValue);
+               } else {
+                       if (GenericRecord.class == avroValueType) {
+                               return dataFileReader.next();
+                       } else {
+                               return 
dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Checkpointing
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public Tuple2<Long, Long> getCurrentState() throws IOException {
+               return new Tuple2<>(this.lastSync, 
this.recordsReadSinceLastSync);
+       }
+
+       @Override
+       public void reopen(FileInputSplit split, Tuple2<Long, Long> state) 
throws IOException {
+               Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
+               Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
+
+               try {
+                       this.open(split);
+               } finally {
+                       if (state.f0 != -1) {
+                               lastSync = state.f0;
+                               recordsReadSinceLastSync = state.f1;
+                       }
+               }
+
+               if (lastSync != -1) {
+                       // open and read until the record we were before
+                       // the checkpoint and discard the values
+                       dataFileReader.seek(lastSync);
+                       for (int i = 0; i < recordsReadSinceLastSync; i++) {
+                               dataFileReader.next(null);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
new file mode 100644
index 0000000..c0b8073
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link FileOutputFormat} for Avro records.
+ * @param <E>
+ */
+public class AvroOutputFormat<E> extends FileOutputFormat<E> implements 
Serializable {
+
+       /**
+        * Wrapper which encapsulates the supported codec and a related 
serialization byte.
+        */
+       public enum Codec {
+
+               NULL((byte) 0, CodecFactory.nullCodec()),
+               SNAPPY((byte) 1, CodecFactory.snappyCodec()),
+               BZIP2((byte) 2, CodecFactory.bzip2Codec()),
+               DEFLATE((byte) 3, 
CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+               XZ((byte) 4, 
CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+               private byte codecByte;
+
+               private CodecFactory codecFactory;
+
+               Codec(final byte codecByte, final CodecFactory codecFactory) {
+                       this.codecByte = codecByte;
+                       this.codecFactory = codecFactory;
+               }
+
+               private byte getCodecByte() {
+                       return codecByte;
+               }
+
+               private CodecFactory getCodecFactory() {
+                       return codecFactory;
+               }
+
+               private static Codec forCodecByte(byte codecByte) {
+                       for (final Codec codec : Codec.values()) {
+                               if (codec.getCodecByte() == codecByte) {
+                                       return codec;
+                               }
+                       }
+                       throw new IllegalArgumentException("no codec for 
codecByte: " + codecByte);
+               }
+       }
+
+       private static final long serialVersionUID = 1L;
+
+       private final Class<E> avroValueType;
+
+       private transient Schema userDefinedSchema = null;
+
+       private transient Codec codec = null;
+
+       private transient DataFileWriter<E> dataFileWriter;
+
+       public AvroOutputFormat(Path filePath, Class<E> type) {
+               super(filePath);
+               this.avroValueType = type;
+       }
+
+       public AvroOutputFormat(Class<E> type) {
+               this.avroValueType = type;
+       }
+
+       @Override
+       protected String getDirectoryFileName(int taskNumber) {
+               return super.getDirectoryFileName(taskNumber) + ".avro";
+       }
+
+       public void setSchema(Schema schema) {
+               this.userDefinedSchema = schema;
+       }
+
+       /**
+        * Set avro codec for compression.
+        *
+        * @param codec avro codec.
+        */
+       public void setCodec(final Codec codec) {
+               this.codec = checkNotNull(codec, "codec can not be null");
+       }
+
+       @Override
+       public void writeRecord(E record) throws IOException {
+               dataFileWriter.append(record);
+       }
+
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {
+               super.open(taskNumber, numTasks);
+
+               DatumWriter<E> datumWriter;
+               Schema schema;
+               if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
 {
+                       datumWriter = new SpecificDatumWriter<E>(avroValueType);
+                       try {
+                               schema = 
((org.apache.avro.specific.SpecificRecordBase) 
avroValueType.newInstance()).getSchema();
+                       } catch (InstantiationException | 
IllegalAccessException e) {
+                               throw new RuntimeException(e.getMessage());
+                       }
+               } else if 
(org.apache.avro.generic.GenericRecord.class.isAssignableFrom(avroValueType)) {
+                       if (userDefinedSchema == null) {
+                               throw new IllegalStateException("Schema must be 
set when using Generic Record");
+                       }
+                       datumWriter = new 
GenericDatumWriter<E>(userDefinedSchema);
+                       schema = userDefinedSchema;
+               } else {
+                       datumWriter = new ReflectDatumWriter<E>(avroValueType);
+                       schema = ReflectData.get().getSchema(avroValueType);
+               }
+               dataFileWriter = new DataFileWriter<E>(datumWriter);
+               if (codec != null) {
+                       dataFileWriter.setCodec(codec.getCodecFactory());
+               }
+               if (userDefinedSchema == null) {
+                       dataFileWriter.create(schema, stream);
+               } else {
+                       dataFileWriter.create(userDefinedSchema, stream);
+               }
+       }
+
+       private void writeObject(java.io.ObjectOutputStream out) throws 
IOException {
+               out.defaultWriteObject();
+
+               if (codec != null) {
+                       out.writeByte(codec.getCodecByte());
+               } else {
+                       out.writeByte(-1);
+               }
+
+               if (userDefinedSchema != null) {
+                       byte[] json = 
userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
+                       out.writeInt(json.length);
+                       out.write(json);
+               } else {
+                       out.writeInt(0);
+               }
+       }
+
+       private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
+               in.defaultReadObject();
+
+               byte codecByte = in.readByte();
+               if (codecByte >= 0) {
+                       setCodec(Codec.forCodecByte(codecByte));
+               }
+
+               int length = in.readInt();
+               if (length != 0) {
+                       byte[] json = new byte[length];
+                       in.readFully(json);
+
+                       Schema schema = new Schema.Parser().parse(new 
String(json, ConfigConstants.DEFAULT_CHARSET));
+                       setSchema(schema);
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               dataFileWriter.flush();
+               dataFileWriter.close();
+               super.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
new file mode 100644
index 0000000..4a3c02e
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<Row> {
+
+       /**
+        * Avro record class.
+        */
+       private Class<? extends SpecificRecord> recordClazz;
+
+       /**
+        * Schema for deterministic field order.
+        */
+       private transient Schema schema;
+
+       /**
+        * Reader that deserializes byte array into a record.
+        */
+       private transient DatumReader<SpecificRecord> datumReader;
+
+       /**
+        * Input stream to read message from.
+        */
+       private transient MutableByteArrayInputStream inputStream;
+
+       /**
+        * Avro decoder that decodes binary data.
+        */
+       private transient Decoder decoder;
+
+       /**
+        * Record to deserialize byte array to.
+        */
+       private SpecificRecord record;
+
+       /**
+        * Creates a Avro deserialization schema for the given record.
+        *
+        * @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+        */
+       public AvroRowDeserializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
+               Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.recordClazz = recordClazz;
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumReader = new SpecificDatumReader<>(schema);
+               this.record = (SpecificRecord) 
SpecificData.newInstance(recordClazz, schema);
+               this.inputStream = new MutableByteArrayInputStream();
+               this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+       }
+
+       @Override
+       public Row deserialize(byte[] message) throws IOException {
+               // read record
+               try {
+                       inputStream.setBuffer(message);
+                       this.record = datumReader.read(record, decoder);
+               } catch (IOException e) {
+                       throw new RuntimeException("Failed to deserialize 
Row.", e);
+               }
+
+               // convert to row
+               final Object row = convertToRow(schema, record);
+               return (Row) row;
+       }
+
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.writeObject(recordClazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
+               this.recordClazz = (Class<? extends SpecificRecord>) 
ois.readObject();
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumReader = new SpecificDatumReader<>(schema);
+               this.record = (SpecificRecord) 
SpecificData.newInstance(recordClazz, schema);
+               this.inputStream = new MutableByteArrayInputStream();
+               this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+       }
+
+       /**
+        * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+        * Avro's {@link Utf8} fields are converted into regular Java strings.
+        */
+       private static Object convertToRow(Schema schema, Object recordObj) {
+               if (recordObj instanceof GenericRecord) {
+                       // records can be wrapped in a union
+                       if (schema.getType() == Schema.Type.UNION) {
+                               final List<Schema> types = schema.getTypes();
+                               if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+                                       schema = types.get(1);
+                               }
+                               else {
+                                       throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD]. Given: " + 
schema);
+                               }
+                       } else if (schema.getType() != Schema.Type.RECORD) {
+                               throw new RuntimeException("Record type for row 
type expected. But is: " + schema);
+                       }
+                       final List<Schema.Field> fields = schema.getFields();
+                       final Row row = new Row(fields.size());
+                       final GenericRecord record = (GenericRecord) recordObj;
+                       for (int i = 0; i < fields.size(); i++) {
+                               final Schema.Field field = fields.get(i);
+                               row.setField(i, convertToRow(field.schema(), 
record.get(field.pos())));
+                       }
+                       return row;
+               } else if (recordObj instanceof Utf8) {
+                       return recordObj.toString();
+               } else {
+                       return recordObj;
+               }
+       }
+
+       /**
+        * An extension of the ByteArrayInputStream that allows to change a 
buffer that should be
+        * read without creating a new ByteArrayInputStream instance. This 
allows to re-use the same
+        * InputStream instance, copying message to process, and creation of 
Decoder on every new message.
+        */
+       private static final class MutableByteArrayInputStream extends 
ByteArrayInputStream {
+
+               public MutableByteArrayInputStream() {
+                       super(new byte[0]);
+               }
+
+               /**
+                * Set buffer that can be read via the InputStream interface 
and reset the input stream.
+                * This has the same effect as creating a new 
ByteArrayInputStream with a new buffer.
+                *
+                * @param buf the new buffer to read.
+                */
+               public void setBuffer(byte[] buf) {
+                       this.buf = buf;
+                       this.pos = 0;
+                       this.count = buf.length;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
new file mode 100644
index 0000000..41000a6
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements SerializationSchema<Row> {
+
+       /**
+        * Avro record class.
+        */
+       private Class<? extends SpecificRecord> recordClazz;
+
+       /**
+        * Avro serialization schema.
+        */
+       private transient Schema schema;
+
+       /**
+        * Writer to serialize Avro record into a byte array.
+        */
+       private transient DatumWriter<GenericRecord> datumWriter;
+
+       /**
+        * Output stream to serialize records into byte array.
+        */
+       private transient ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+
+       /**
+        * Low-level class for serialization of Avro values.
+        */
+       private transient Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+       /**
+        * Creates a Avro serialization schema for the given schema.
+        *
+        * @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+        */
+       public AvroRowSerializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
+               Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.recordClazz = recordClazz;
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumWriter = new SpecificDatumWriter<>(schema);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public byte[] serialize(Row row) {
+               // convert to record
+               final Object record = convertToRecord(schema, row);
+
+               // write
+               try {
+                       arrayOutputStream.reset();
+                       datumWriter.write((GenericRecord) record, encoder);
+                       encoder.flush();
+                       return arrayOutputStream.toByteArray();
+               } catch (IOException e) {
+                       throw new RuntimeException("Failed to serialize Row.", 
e);
+               }
+       }
+
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.writeObject(recordClazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
+               this.recordClazz = (Class<? extends SpecificRecord>) 
ois.readObject();
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumWriter = new SpecificDatumWriter<>(schema);
+               this.arrayOutputStream = new ByteArrayOutputStream();
+               this.encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+       }
+
+       /**
+        * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+        * Strings are converted into Avro's {@link Utf8} fields.
+        */
+       private static Object convertToRecord(Schema schema, Object rowObj) {
+               if (rowObj instanceof Row) {
+                       // records can be wrapped in a union
+                       if (schema.getType() == Schema.Type.UNION) {
+                               final List<Schema> types = schema.getTypes();
+                               if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+                                       schema = types.get(1);
+                               }
+                               else if (types.size() == 2 && 
types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == 
Schema.Type.NULL) {
+                                       schema = types.get(0);
+                               }
+                               else {
+                                       throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD] or 
UNION[RECORD, NULL] Given: " + schema);
+                               }
+                       } else if (schema.getType() != Schema.Type.RECORD) {
+                               throw new RuntimeException("Record type for row 
type expected. But is: " + schema);
+                       }
+                       final List<Schema.Field> fields = schema.getFields();
+                       final GenericRecord record = new 
GenericData.Record(schema);
+                       final Row row = (Row) rowObj;
+                       for (int i = 0; i < fields.size(); i++) {
+                               final Schema.Field field = fields.get(i);
+                               record.put(field.pos(), 
convertToRecord(field.schema(), row.getField(i)));
+                       }
+                       return record;
+               } else if (rowObj instanceof String) {
+                       return new Utf8((String) rowObj);
+               } else {
+                       return rowObj;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
new file mode 100644
index 0000000..02f74f5
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
+ * Kryo for deep object copies. We want to change this to Kryo-only.
+ *
+ * @param <T> The type serialized.
+ */
+@Internal
+public final class AvroSerializer<T> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final Class<T> type;
+
+       private final Class<? extends T> typeToInstantiate;
+
+       /**
+        * Map of class tag (using classname as tag) to their Kryo registration.
+        *
+        * <p>This map serves as a preview of the final registration result of
+        * the Kryo instance, taking into account registration overwrites.
+        */
+       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
+       private transient ReflectDatumWriter<T> writer;
+       private transient ReflectDatumReader<T> reader;
+
+       private transient DataOutputEncoder encoder;
+       private transient DataInputDecoder decoder;
+
+       private transient Kryo kryo;
+
+       private transient T deepCopyInstance;
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public AvroSerializer(Class<T> type) {
+               this(type, type);
+       }
+
+       public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
+               this.type = checkNotNull(type);
+               this.typeToInstantiate = checkNotNull(typeToInstantiate);
+
+               InstantiationUtil.checkForInstantiation(typeToInstantiate);
+
+               this.kryoRegistrations = buildKryoRegistrations(type);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public AvroSerializer<T> duplicate() {
+               return new AvroSerializer<T>(type, typeToInstantiate);
+       }
+
+       @Override
+       public T createInstance() {
+               return InstantiationUtil.instantiate(this.typeToInstantiate);
+       }
+
+       @Override
+       public T copy(T from) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, kryo, this);
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, reuse, kryo, this);
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(T value, DataOutputView target) throws 
IOException {
+               checkAvroInitialized();
+               this.encoder.setOut(target);
+               this.writer.write(value, this.encoder);
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               checkAvroInitialized();
+               this.decoder.setIn(source);
+               return this.reader.read(null, this.decoder);
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               checkAvroInitialized();
+               this.decoder.setIn(source);
+               return this.reader.read(reuse, this.decoder);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               checkAvroInitialized();
+
+               if (this.deepCopyInstance == null) {
+                       this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
+               }
+
+               this.decoder.setIn(source);
+               this.encoder.setOut(target);
+
+               T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
+               this.writer.write(tmp, this.encoder);
+       }
+
+       private void checkAvroInitialized() {
+               if (this.reader == null) {
+                       this.reader = new ReflectDatumReader<T>(type);
+                       this.writer = new ReflectDatumWriter<T>(type);
+                       this.encoder = new DataOutputEncoder();
+                       this.decoder = new DataInputDecoder();
+               }
+       }
+
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new Kryo();
+
+                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+                       kryo.setAsmEnabled(true);
+
+                       KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof AvroSerializer) {
+                       @SuppressWarnings("unchecked")
+                       AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
+
+                       return avroSerializer.canEqual(this) &&
+                               type == avroSerializer.type &&
+                               typeToInstantiate == 
avroSerializer.typeToInstantiate;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof AvroSerializer;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
+                       final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (type.equals(config.getTypeClass()) && 
typeToInstantiate.equals(config.getTypeToInstantiate())) {
+                               // resolve Kryo registrations; currently, since 
the Kryo registrations in Avro
+                               // are fixed, there shouldn't be a problem with 
the resolution here.
+
+                               LinkedHashMap<String, KryoRegistration> 
oldRegistrations = config.getKryoRegistrations();
+                               oldRegistrations.putAll(kryoRegistrations);
+
+                               for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
+                                       if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
+                                               return 
CompatibilityResult.requiresMigration();
+                                       }
+                               }
+
+                               this.kryoRegistrations = oldRegistrations;
+                               return CompatibilityResult.compatible();
+                       }
+               }
+
+               // ends up here if the preceding serializer is not
+               // the ValueSerializer, or serialized data type has changed
+               return CompatibilityResult.requiresMigration();
+       }
+
+       /**
+        * {@link TypeSerializerConfigSnapshot} for Avro.
+        */
+       public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               private Class<? extends T> typeToInstantiate;
+
+               public AvroSerializerConfigSnapshot() {}
+
+               public AvroSerializerConfigSnapshot(
+                       Class<T> baseType,
+                       Class<? extends T> typeToInstantiate,
+                       LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
+
+                       super(baseType, kryoRegistrations);
+                       this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       out.writeUTF(typeToInstantiate.getName());
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       String classname = in.readUTF();
+                       try {
+                               typeToInstantiate = (Class<? extends T>) 
Class.forName(classname, true, getUserCodeClassLoader());
+                       } catch (ClassNotFoundException e) {
+                               throw new IOException("Cannot find requested 
class " + classname + " in classpath.", e);
+                       }
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               public Class<? extends T> getTypeToInstantiate() {
+                       return typeToInstantiate;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               // kryoRegistrations may be null if this Avro serializer is 
deserialized from an old version
+               if (kryoRegistrations == null) {
+                       this.kryoRegistrations = buildKryoRegistrations(type);
+               }
+       }
+
+       private static <T> LinkedHashMap<String, KryoRegistration> 
buildKryoRegistrations(Class<T> serializedDataType) {
+               final LinkedHashMap<String, KryoRegistration> registrations = 
new LinkedHashMap<>();
+
+               // register Avro types.
+               registrations.put(
+                               GenericData.Array.class.getName(),
+                               new KryoRegistration(
+                                               GenericData.Array.class,
+                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+               registrations.put(Utf8.class.getName(), new 
KryoRegistration(Utf8.class));
+               registrations.put(GenericData.EnumSymbol.class.getName(), new 
KryoRegistration(GenericData.EnumSymbol.class));
+               registrations.put(GenericData.Fixed.class.getName(), new 
KryoRegistration(GenericData.Fixed.class));
+               registrations.put(GenericData.StringType.class.getName(), new 
KryoRegistration(GenericData.StringType.class));
+
+               // register the serialized data type
+               registrations.put(serializedDataType.getName(), new 
KryoRegistration(serializedDataType));
+
+               return registrations;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..ddc89a8
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special type information to generate a special AvroTypeInfo for Avro POJOs 
(implementing SpecificRecordBase, the typed Avro POJOs)
+ *
+ * <p>Proceeding: It uses a regular pojo type analysis and replaces all {@code 
GenericType<CharSequence>} with a {@code GenericType<avro.Utf8>}.
+ * All other types used by Avro are standard Java types.
+ * Only strings are represented as CharSequence fields and represented as Utf8 
classes at runtime.
+ * CharSequence is not comparable. To make them nicely usable with field 
expressions, we replace them here
+ * by generic type infos containing Utf8 classes (which are comparable),
+ *
+ * <p>This class is checked by the AvroPojoTest.
+ */
+public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
+
+       public AvroTypeInfo(Class<T> typeClass) {
+               super(typeClass, generateFieldsFromAvroSchema(typeClass));
+       }
+
+       @Override
+       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+               return super.createSerializer(config);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Internal
+       public static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
+               PojoTypeExtractor pte = new PojoTypeExtractor();
+               ArrayList<Type> typeHierarchy = new ArrayList<>();
+               typeHierarchy.add(typeClass);
+               TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, 
null, null, null);
+
+               if (!(ti instanceof PojoTypeInfo)) {
+                       throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");
+               }
+               PojoTypeInfo pti =  (PojoTypeInfo) ti;
+               List<PojoField> newFields = new 
ArrayList<>(pti.getTotalFields());
+
+               for (int i = 0; i < pti.getArity(); i++) {
+                       PojoField f = pti.getPojoFieldAt(i);
+                       TypeInformation newType = f.getTypeInformation();
+                       // check if type is a CharSequence
+                       if (newType instanceof GenericTypeInfo) {
+                               if 
((newType).getTypeClass().equals(CharSequence.class)) {
+                                       // replace the type by a 
org.apache.avro.util.Utf8
+                                       newType = new 
GenericTypeInfo(org.apache.avro.util.Utf8.class);
+                               }
+                       }
+                       PojoField newField = new PojoField(f.getField(), 
newType);
+                       newFields.add(newField);
+               }
+               return newFields;
+       }
+
+       private static class PojoTypeExtractor extends TypeExtractor {
+               private PojoTypeExtractor() {
+                       super();
+               }
+
+               @Override
+               public <OUT, IN1, IN2> TypeInformation<OUT> 
analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
+                               ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+                       return super.analyzePojo(clazz, typeHierarchy, 
parameterizedType, in1Type, in2Type);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
new file mode 100644
index 0000000..7305f23
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * Utilities for integrating Avro serializers in Kryo.
+ */
+public class AvroKryoSerializerUtils {
+
+       public static void addAvroSerializers(ExecutionConfig reg, Class<?> 
type) {
+               // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
+               // because Kryo is not able to serialize them properly, we use 
this serializer for them
+               reg.registerTypeWithKryoSerializer(GenericData.Array.class, 
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+
+               // We register this serializer for users who want to use 
untyped Avro records (GenericData.Record).
+               // Kryo is able to serialize everything in there, except for 
the Schema.
+               // This serializer is very slow, but using the 
GenericData.Records of Kryo is in general a bad idea.
+               // we add the serializer as a default serializer because Avro 
is using a private sub-type at runtime.
+               reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
+       }
+
+       /**
+        * Slow serialization approach for Avro schemas.
+        * This is only used with {{@link 
org.apache.avro.generic.GenericData.Record}} types.
+        * Having this serializer, we are able to handle avro Records.
+        */
+       public static class AvroSchemaSerializer extends Serializer<Schema> 
implements Serializable {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void write(Kryo kryo, Output output, Schema object) {
+                       String schemaAsString = object.toString(false);
+                       output.writeString(schemaAsString);
+               }
+
+               @Override
+               public Schema read(Kryo kryo, Input input, Class<Schema> type) {
+                       String schemaAsString = input.readString();
+                       // the parser seems to be stateful, to we need a new 
one for every type.
+                       Schema.Parser sParser = new Schema.Parser();
+                       return sParser.parse(schemaAsString);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
new file mode 100644
index 0000000..32032cc
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Decoder} that reads from a {@link DataInput}.
+ */
+public class DataInputDecoder extends Decoder {
+
+       private final Utf8 stringDecoder = new Utf8();
+
+       private DataInput in;
+
+       public void setIn(DataInput in) {
+               this.in = in;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // primitives
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void readNull() {}
+
+       @Override
+       public boolean readBoolean() throws IOException {
+               return in.readBoolean();
+       }
+
+       @Override
+       public int readInt() throws IOException {
+               return in.readInt();
+       }
+
+       @Override
+       public long readLong() throws IOException {
+               return in.readLong();
+       }
+
+       @Override
+       public float readFloat() throws IOException {
+               return in.readFloat();
+       }
+
+       @Override
+       public double readDouble() throws IOException {
+               return in.readDouble();
+       }
+
+       @Override
+       public int readEnum() throws IOException {
+               return readInt();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // bytes
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void readFixed(byte[] bytes, int start, int length) throws 
IOException {
+               in.readFully(bytes, start, length);
+       }
+
+       @Override
+       public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+               int length = readInt();
+               ByteBuffer result;
+               if (old != null && length <= old.capacity() && old.hasArray()) {
+                       result = old;
+                       result.clear();
+               } else {
+                       result = ByteBuffer.allocate(length);
+               }
+               in.readFully(result.array(), result.arrayOffset() + 
result.position(), length);
+               result.limit(length);
+               return result;
+       }
+
+       @Override
+       public void skipFixed(int length) throws IOException {
+               skipBytes(length);
+       }
+
+       @Override
+       public void skipBytes() throws IOException {
+               int num = readInt();
+               skipBytes(num);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // strings
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public Utf8 readString(Utf8 old) throws IOException {
+               int length = readInt();
+               Utf8 result = (old != null ? old : new Utf8());
+               result.setByteLength(length);
+
+               if (length > 0) {
+                       in.readFully(result.getBytes(), 0, length);
+               }
+
+               return result;
+       }
+
+       @Override
+       public String readString() throws IOException {
+               return readString(stringDecoder).toString();
+       }
+
+       @Override
+       public void skipString() throws IOException {
+               int len = readInt();
+               skipBytes(len);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // collection types
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public long readArrayStart() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long arrayNext() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long skipArray() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long readMapStart() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long mapNext() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long skipMap() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // union
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int readIndex() throws IOException {
+               return readInt();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // utils
+       // 
--------------------------------------------------------------------------------------------
+
+       private void skipBytes(int num) throws IOException {
+               while (num > 0) {
+                       num -= in.skipBytes(num);
+               }
+       }
+
+       public static long readVarLongCount(DataInput in) throws IOException {
+               long value = in.readUnsignedByte();
+
+               if ((value & 0x80) == 0) {
+                       return value;
+               }
+               else {
+                       long curr;
+                       int shift = 7;
+                       value = value & 0x7f;
+                       while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+                               value |= (curr & 0x7f) << shift;
+                               shift += 7;
+                       }
+                       value |= curr << shift;
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
new file mode 100644
index 0000000..c2d490b
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link Encoder} that writes data to a {@link DataOutput}.
+ */
+public final class DataOutputEncoder extends Encoder implements 
java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private DataOutput out;
+
+       public void setOut(DataOutput out) {
+               this.out = out;
+       }
+
+       @Override
+       public void flush() throws IOException {}
+
+       // 
--------------------------------------------------------------------------------------------
+       // primitives
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeNull() {}
+
+       @Override
+       public void writeBoolean(boolean b) throws IOException {
+               out.writeBoolean(b);
+       }
+
+       @Override
+       public void writeInt(int n) throws IOException {
+               out.writeInt(n);
+       }
+
+       @Override
+       public void writeLong(long n) throws IOException {
+               out.writeLong(n);
+       }
+
+       @Override
+       public void writeFloat(float f) throws IOException {
+               out.writeFloat(f);
+       }
+
+       @Override
+       public void writeDouble(double d) throws IOException {
+               out.writeDouble(d);
+       }
+
+       @Override
+       public void writeEnum(int e) throws IOException {
+               out.writeInt(e);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // bytes
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeFixed(byte[] bytes, int start, int len) throws 
IOException {
+               out.write(bytes, start, len);
+       }
+
+       @Override
+       public void writeBytes(byte[] bytes, int start, int len) throws 
IOException {
+               out.writeInt(len);
+               if (len > 0) {
+                       out.write(bytes, start, len);
+               }
+       }
+
+       @Override
+       public void writeBytes(ByteBuffer bytes) throws IOException {
+               int num = bytes.remaining();
+               out.writeInt(num);
+
+               if (num > 0) {
+                       writeFixed(bytes);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // strings
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeString(String str) throws IOException {
+               byte[] bytes = Utf8.getBytesFor(str);
+               writeBytes(bytes, 0, bytes.length);
+       }
+
+       @Override
+       public void writeString(Utf8 utf8) throws IOException {
+               writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // collection types
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeArrayStart() {}
+
+       @Override
+       public void setItemCount(long itemCount) throws IOException {
+               if (itemCount > 0) {
+                       writeVarLongCount(out, itemCount);
+               }
+       }
+
+       @Override
+       public void startItem() {}
+
+       @Override
+       public void writeArrayEnd() throws IOException {
+               // write a single byte 0, shortcut for a var-length long of 0
+               out.write(0);
+       }
+
+       @Override
+       public void writeMapStart() {}
+
+       @Override
+       public void writeMapEnd() throws IOException {
+               // write a single byte 0, shortcut for a var-length long of 0
+               out.write(0);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // union
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeIndex(int unionIndex) throws IOException {
+               out.writeInt(unionIndex);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // utils
+       // 
--------------------------------------------------------------------------------------------
+
+       public static void writeVarLongCount(DataOutput out, long val) throws 
IOException {
+               if (val < 0) {
+                       throw new IOException("Illegal count (must be 
non-negative): " + val);
+               }
+
+               while ((val & ~0x7FL) != 0) {
+                       out.write(((int) val) | 0x80);
+                       val >>>= 7;
+               }
+               out.write((int) val);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..c00fecb
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.apache.avro.file.SeekableInput;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache 
licensed as well).
+ *
+ * <p>The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+       private final FSDataInputStream stream;
+       private long pos;
+       private long len;
+
+       public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+               this.stream = stream;
+               this.pos = 0;
+               this.len = len;
+       }
+
+       public long length() throws IOException {
+               return this.len;
+       }
+
+       public int read(byte[] b, int off, int len) throws IOException {
+               int read;
+               read = stream.read(b, off, len);
+               pos += read;
+               return read;
+       }
+
+       public void seek(long p) throws IOException {
+               stream.seek(p);
+               pos = p;
+       }
+
+       public long tell() throws IOException {
+               return pos;
+       }
+
+       public void close() throws IOException {
+               stream.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/assembly/test-assembly.xml 
b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..8361693
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<assembly>
+       <id>test-jar</id>
+       <formats>
+               <format>jar</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+       <fileSets>
+               <fileSet>
+                       
<directory>${project.build.testOutputDirectory}</directory>
+                       <outputDirectory></outputDirectory>
+                       <!--modify/add include to match your package(s) -->
+                       <includes>
+                               
<include>org/apache/flink/formats/avro/testjar/**</include>
+                       </includes>
+               </fileSet>
+       </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..985471a
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class AvroExternalJarProgramITCase extends TestLogger {
+
+       private static final String JAR_FILE = "maven-test-jar.jar";
+
+       private static final String TEST_DATA_FILE = "/testdata.avro";
+
+       @Test
+       public void testExternalProgram() {
+
+               LocalFlinkMiniCluster testMiniCluster = null;
+
+               try {
+                       int parallelism = 4;
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+                       testMiniCluster = new LocalFlinkMiniCluster(config, 
false);
+                       testMiniCluster.start();
+
+                       String jarFile = JAR_FILE;
+                       String testData = 
getClass().getResource(TEST_DATA_FILE).toString();
+
+                       PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
+
+                       TestEnvironment.setAsContext(
+                               testMiniCluster,
+                               parallelism,
+                               Collections.singleton(new Path(jarFile)),
+                               Collections.<URL>emptyList());
+
+                       config.setString(JobManagerOptions.ADDRESS, 
"localhost");
+                       config.setInteger(JobManagerOptions.PORT, 
testMiniCluster.getLeaderRPCPort());
+
+                       program.invokeInteractiveModeForExecution();
+               }
+               catch (Throwable t) {
+                       System.err.println(t.getMessage());
+                       t.printStackTrace();
+                       Assert.fail("Error during the packaged program 
execution: " + t.getMessage());
+               }
+               finally {
+                       TestEnvironment.unsetAsContext();
+
+                       if (testMiniCluster != null) {
+                               try {
+                                       testMiniCluster.stop();
+                               } catch (Throwable t) {
+                                       // ignore
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
new file mode 100644
index 0000000..bc4f253
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the type extraction of the {@link AvroInputFormat}.
+ */
+public class AvroInputFormatTypeExtractionTest {
+
+       @Test
+       public void testTypeExtraction() {
+               try {
+                       InputFormat<MyAvroType, ?> format = new 
AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), 
MyAvroType.class);
+
+                       TypeInformation<?> typeInfoDirect = 
TypeExtractor.getInputFormatTypes(format);
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       DataSet<MyAvroType> input = env.createInput(format);
+                       TypeInformation<?> typeInfoDataSet = input.getType();
+
+                       Assert.assertTrue(typeInfoDirect instanceof 
PojoTypeInfo);
+                       Assert.assertTrue(typeInfoDataSet instanceof 
PojoTypeInfo);
+
+                       Assert.assertEquals(MyAvroType.class, 
typeInfoDirect.getTypeClass());
+                       Assert.assertEquals(MyAvroType.class, 
typeInfoDataSet.getTypeClass());
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+
+       /**
+        * Test type.
+        */
+       public static final class MyAvroType {
+
+               public String theString;
+
+               public MyAvroType recursive;
+
+               private double aDouble;
+
+               public double getaDouble() {
+                       return aDouble;
+               }
+
+               public void setaDouble(double aDouble) {
+                       this.aDouble = aDouble;
+               }
+
+               public void setTheString(String theString) {
+                       this.theString = theString;
+               }
+
+               public String getTheString() {
+                       return theString;
+               }
+       }
+}

Reply via email to