http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc 
b/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
deleted file mode 100644
index 02c11af..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/avro/user.avsc
+++ /dev/null
@@ -1,35 +0,0 @@
-[
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "Address",
- "fields": [
-     {"name": "num", "type": "int"},
-     {"name": "street", "type": "string"},
-     {"name": "city", "type": "string"},
-     {"name": "state", "type": "string"},
-     {"name": "zip", "type": "string"}
-  ]
-},
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]},
-     {"name": "type_long_test", "type": ["long", "null"]},
-     {"name": "type_double_test", "type": "double"},
-     {"name": "type_null_test", "type": ["null"]},
-     {"name": "type_bool_test", "type": ["boolean"]},
-     {"name": "type_array_string", "type" : {"type" : "array", "items" : 
"string"}},  
-     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : 
"boolean"}}, 
-     {"name": "type_nullable_array", "type": ["null", {"type":"array", 
"items":"string"}], "default":null},
-     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", 
"symbols" : ["RED", "GREEN", "BLUE"]}},
-     {"name": "type_map", "type": {"type": "map", "values": "long"}},
-     {"name": "type_fixed",
-                 "size": 16,
-                 "type": ["null", {"name": "Fixed16", "size": 16, "type": 
"fixed"}] },
-     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
-     {"name": "type_nested", "type": ["null", "Address"]}
- ]
-}]

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties 
b/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml 
b/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-batch-connectors/flink-avro/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro 
b/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro
deleted file mode 100644
index 45308b9..0000000
Binary files 
a/flink-batch-connectors/flink-avro/src/test/resources/testdata.avro and 
/dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml 
b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
deleted file mode 100644
index 8f423d9..0000000
--- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-       
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-batch-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-hadoop-compatibility_2.10</artifactId>
-       <name>flink-hadoop-compatibility</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-scala_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-hadoop2</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-               
-       </dependencies>
-
-
-       <build>
-               <plugins>
-                       <!-- activate API compatibility checks -->
-                       <plugin>
-                               <groupId>com.github.siom79.japicmp</groupId>
-                               <artifactId>japicmp-maven-plugin</artifactId>
-                       </plugin>
-                       <!-- Scala Compiler -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <version>3.1.4</version>
-                               <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                               scala classes can be resolved 
later in the (Java) compile phase -->
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms128m</jvmArg>
-                                               <jvmArg>-Xmx512m</jvmArg>
-                                       </jvmArgs>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Eclipse Integration -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-eclipse-plugin</artifactId>
-                               <version>2.8</version>
-                               <configuration>
-                                       <downloadSources>true</downloadSources>
-                                       <projectnatures>
-                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-                                       </projectnatures>
-                                       <buildcommands>
-                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-                                       </buildcommands>
-                                       <classpathContainers>
-                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                                       </classpathContainers>
-                                       <excludes>
-                                               
<exclude>org.scala-lang:scala-library</exclude>
-                                               
<exclude>org.scala-lang:scala-compiler</exclude>
-                                       </excludes>
-                                       <sourceIncludes>
-                                               
<sourceInclude>**/*.scala</sourceInclude>
-                                               
<sourceInclude>**/*.java</sourceInclude>
-                                       </sourceIncludes>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Adding scala source directories to build path -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               
<artifactId>build-helper-maven-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <!-- Add src/main/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-source</id>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>add-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/main/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <!-- Scala Code Style, most of the configuration done 
via plugin management -->
-                       <plugin>
-                               <groupId>org.scalastyle</groupId>
-                               <artifactId>scalastyle-maven-plugin</artifactId>
-                               <configuration>
-                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-                               </configuration>
-                       </plugin>
-               </plugins>
-       </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
deleted file mode 100644
index 7bcb4bf..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import org.apache.hadoop.io.Writable;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Type information for data types that extend Hadoop's {@link Writable} 
interface. The Writable
- * interface defines the serialization and deserialization routines for the 
data type.
- *
- * @param <T> The type of the class represented by this type information.
- */
-@Public
-public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> 
implements AtomicType<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> typeClass;
-
-       @PublicEvolving
-       public WritableTypeInfo(Class<T> typeClass) {
-               this.typeClass = checkNotNull(typeClass);
-
-               checkArgument(
-                       Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class),
-                       "WritableTypeInfo can only be used for subclasses of 
%s", Writable.class.getName());
-       }
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
-       @PublicEvolving
-       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
-               if(Comparable.class.isAssignableFrom(typeClass)) {
-                       return new WritableComparator(sortOrderAscending, 
typeClass);
-               }
-               else {
-                       throw new UnsupportedOperationException("Cannot create 
Comparator for "+typeClass.getCanonicalName()+". " +
-                                                                               
                        "Class does not implement Comparable interface.");
-               }
-       }
-
-       @Override
-       @PublicEvolving
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       @PublicEvolving
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       @PublicEvolving
-       public int getArity() {
-               return 1;
-       }
-       
-       @Override
-       @PublicEvolving
-       public int getTotalFields() {
-               return 1;
-       }
-
-       @Override
-       @PublicEvolving
-       public Class<T> getTypeClass() {
-               return this.typeClass;
-       }
-
-       @Override
-       @PublicEvolving
-       public boolean isKeyType() {
-               return Comparable.class.isAssignableFrom(typeClass);
-       }
-
-       @Override
-       @PublicEvolving
-       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
-               return new WritableSerializer<T>(typeClass);
-       }
-       
-       @Override
-       public String toString() {
-               return "WritableType<" + typeClass.getName() + ">";
-       }       
-       
-       @Override
-       public int hashCode() {
-               return typeClass.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof WritableTypeInfo) {
-                       @SuppressWarnings("unchecked")
-                       WritableTypeInfo<T> writableTypeInfo = 
(WritableTypeInfo<T>) obj;
-
-                       return writableTypeInfo.canEqual(this) &&
-                               typeClass == writableTypeInfo.typeClass;
-
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof WritableTypeInfo;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       @PublicEvolving
-       static <T extends Writable> TypeInformation<T> 
getWritableTypeInfo(Class<T> typeClass) {
-               if (Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class)) {
-                       return new WritableTypeInfo<T>(typeClass);
-               }
-               else {
-                       throw new InvalidTypesException("The given class is no 
subclass of " + Writable.class.getName());
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
deleted file mode 100644
index 3a95d94..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableComparator<T extends Writable & Comparable<T>> extends 
TypeComparator<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private Class<T> type;
-       
-       private final boolean ascendingComparison;
-       
-       private transient T reference;
-       
-       private transient T tempReference;
-       
-       private transient Kryo kryo;
-
-       @SuppressWarnings("rawtypes")
-       private final TypeComparator[] comparators = new TypeComparator[] 
{this};
-
-       public WritableComparator(boolean ascending, Class<T> type) {
-               this.type = type;
-               this.ascendingComparison = ascending;
-       }
-       
-       @Override
-       public int hash(T record) {
-               return record.hashCode();
-       }
-       
-       @Override
-       public void setReference(T toCompare) {
-               checkKryoInitialized();
-
-               reference = KryoUtils.copy(toCompare, kryo, new 
WritableSerializer<T>(type));
-       }
-       
-       @Override
-       public boolean equalToReference(T candidate) {
-               return candidate.equals(reference);
-       }
-       
-       @Override
-       public int compareToReference(TypeComparator<T> referencedComparator) {
-               T otherRef = ((WritableComparator<T>) 
referencedComparator).reference;
-               int comp = otherRef.compareTo(reference);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compare(T first, T second) {
-               int comp = first.compareTo(second);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-               ensureReferenceInstantiated();
-               ensureTempReferenceInstantiated();
-               
-               reference.readFields(firstSource);
-               tempReference.readFields(secondSource);
-               
-               int comp = reference.compareTo(tempReference);
-               return ascendingComparison ? comp : -comp;
-       }
-       
-       @Override
-       public boolean supportsNormalizedKey() {
-               return NormalizableKey.class.isAssignableFrom(type);
-       }
-       
-       @Override
-       public int getNormalizeKeyLen() {
-               ensureReferenceInstantiated();
-               
-               NormalizableKey<?> key = (NormalizableKey<?>) reference;
-               return key.getMaxNormalizedKeyLen();
-       }
-       
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return keyBytes < getNormalizeKeyLen();
-       }
-       
-       @Override
-       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
-               NormalizableKey<?> key = (NormalizableKey<?>) record;
-               key.copyNormalizedKey(target, offset, numBytes);
-       }
-       
-       @Override
-       public boolean invertNormalizedKey() {
-               return !ascendingComparison;
-       }
-       
-       @Override
-       public TypeComparator<T> duplicate() {
-               return new WritableComparator<T>(ascendingComparison, type);
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               target[index] = record;
-               return 1;
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public TypeComparator[] getFlatComparators() {
-               return comparators;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // unsupported normalization
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-       
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-       
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(type);
-               }
-       }
-       
-       private void ensureReferenceInstantiated() {
-               if (reference == null) {
-                       reference = InstantiationUtil.instantiate(type, 
Writable.class);
-               }
-       }
-       
-       private void ensureTempReferenceInstantiated() {
-               if (tempReference == null) {
-                       tempReference = InstantiationUtil.instantiate(type, 
Writable.class);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
deleted file mode 100644
index 9036d75..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> typeClass;
-       
-       private transient Kryo kryo;
-       
-       private transient T copyInstance;
-       
-       public WritableSerializer(Class<T> typeClass) {
-               this.typeClass = typeClass;
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       public T createInstance() {
-               if(typeClass == NullWritable.class) {
-                       return (T) NullWritable.get();
-               }
-               return InstantiationUtil.instantiate(typeClass);
-       }
-
-
-       
-       @Override
-       public T copy(T from) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, kryo, this);
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, reuse, kryo, this);
-       }
-       
-       @Override
-       public int getLength() {
-               return -1;
-       }
-       
-       @Override
-       public void serialize(T record, DataOutputView target) throws 
IOException {
-               record.write(target);
-       }
-       
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               return deserialize(createInstance(), source);
-       }
-       
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               reuse.readFields(source);
-               return reuse;
-       }
-       
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               ensureInstanceInstantiated();
-               copyInstance.readFields(source);
-               copyInstance.write(target);
-       }
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-       
-       @Override
-       public WritableSerializer<T> duplicate() {
-               return new WritableSerializer<T>(typeClass);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void ensureInstanceInstantiated() {
-               if (copyInstance == null) {
-                       copyInstance = createInstance();
-               }
-       }
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(typeClass);
-               }
-       }
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return this.typeClass.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof WritableSerializer) {
-                       WritableSerializer<?> other = (WritableSerializer<?>) 
obj;
-
-                       return other.canEqual(this) && typeClass == 
other.typeClass;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof WritableSerializer;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
deleted file mode 100644
index 9e8a3e4..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopInputs.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-
-/**
- * HadoopInputs is a utility class to use Apache Hadoop InputFormats with 
Apache Flink.
- *
- * It provides methods to create Flink InputFormat wrappers for Hadoop {@link 
org.apache.hadoop.mapred.InputFormat}
- * and {@link org.apache.hadoop.mapreduce.InputFormat}.
- *
- * Key value pairs produced by the Hadoop InputFormats are converted into Flink
- * {@link org.apache.flink.api.java.tuple.Tuple2 Tuple2} objects where the 
first field
- * ({@link org.apache.flink.api.java.tuple.Tuple2#f0 Tuple2.f0}) is the key 
and the second field
- * ({@link org.apache.flink.api.java.tuple.Tuple2#f1 Tuple2.f1}) is the value.
- *
- */
-
-public final class HadoopInputs {
-       // ----------------------------------- Hadoop Input Format 
---------------------------------------
-
-       /**
-        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapred.FileInputFormat}.
-        *
-        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-        */
-       public static <K,V> HadoopInputFormat<K, V> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath, JobConf job) {
-               // set input path in JobConf
-               org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(inputPath));
-               // return wrapping InputFormat
-               return createHadoopInput(mapredInputFormat, key, value, job);
-       }
-
-       /**
-        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapred.FileInputFormat}.
-        *
-        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-        */
-       public static <K,V> HadoopInputFormat<K, V> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath) {
-               return readHadoopFile(mapredInputFormat, key, value, inputPath, 
new JobConf());
-       }
-
-       /**
-        * Creates a Flink {@link InputFormat} to read a Hadoop sequence file 
for the given key and value classes.
-        *
-        * @return A Flink InputFormat that wraps a Hadoop 
SequenceFileInputFormat.
-        */
-       public static <K,V> HadoopInputFormat<K, V> readSequenceFile(Class<K> 
key, Class<V> value, String inputPath) throws IOException {
-               return readHadoopFile(new 
org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, 
inputPath);
-       }
-
-       /**
-        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapred.InputFormat}.
-        *
-        * @return A Flink InputFormat that wraps the Hadoop InputFormat.
-        */
-       public static <K,V> HadoopInputFormat<K, V> 
createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, JobConf job) {
-               return new HadoopInputFormat<>(mapredInputFormat, key, value, 
job);
-       }
-
-       /**
-        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
-        *
-        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-        */
-       public static <K,V> 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> 
readHadoopFile(
-                       
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) 
throws IOException
-       {
-               // set input path in Job
-               
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(inputPath));
-               // return wrapping InputFormat
-               return createHadoopInput(mapreduceInputFormat, key, value, job);
-       }
-
-       /**
-        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}.
-        *
-        * @return A Flink InputFormat that wraps the Hadoop FileInputFormat.
-        */
-       public static <K,V> 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> 
readHadoopFile(
-                       
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws 
IOException
-       {
-               return readHadoopFile(mapreduceInputFormat, key, value, 
inputPath, Job.getInstance());
-       }
-
-       /**
-        * Creates a Flink {@link InputFormat} that wraps the given Hadoop 
{@link org.apache.hadoop.mapreduce.InputFormat}.
-        *
-        * @return A Flink InputFormat that wraps the Hadoop InputFormat.
-        */
-       public static <K,V> 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> 
createHadoopInput(
-                       org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, Job job)
-       {
-               return new 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat,
 key, value, job);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
deleted file mode 100644
index 97ca329..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/HadoopUtils.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility;
-
-import org.apache.commons.cli.Option;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Utility class to work with Apache Hadoop libraries.
- */
-public class HadoopUtils {
-       /**
-        * Returns {@link ParameterTool} for the arguments parsed by {@link 
GenericOptionsParser}
-        *
-        * @param args Input array arguments. It should be parsable by {@link 
GenericOptionsParser}
-        * @return A {@link ParameterTool}
-        * @throws IOException If arguments cannot be parsed by {@link 
GenericOptionsParser}
-        * @see GenericOptionsParser
-        */
-       public static ParameterTool paramsFromGenericOptionsParser(String[] 
args) throws IOException {
-               Option[] options = new 
GenericOptionsParser(args).getCommandLine().getOptions();
-               Map<String, String> map = new HashMap<String, String>();
-               for (Option option : options) {
-                       String[] split = option.getValue().split("=");
-                       map.put(split[0], split[1]);
-               }
-               return ParameterTool.fromMap(map);
-       }
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
deleted file mode 100644
index ba8aa90..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-                                       extends 
RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
-                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
-       private transient JobConf jobConf;
-
-       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
outputCollector;
-       private transient Reporter reporter;
-       
-       /**
-        * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
-        * 
-        * @param hadoopMapper The Hadoop Mapper to wrap.
-        */
-       public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopMapper) {
-               this(hadoopMapper, new JobConf());
-       }
-       
-       /**
-        * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
-        * The Hadoop Mapper is configured with the provided JobConf.
-        * 
-        * @param hadoopMapper The Hadoop Mapper to wrap.
-        * @param conf The JobConf that is used to configure the Hadoop Mapper.
-        */
-       public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopMapper, JobConf conf) {
-               if(hadoopMapper == null) {
-                       throw new NullPointerException("Mapper may not be 
null.");
-               }
-               if(conf == null) {
-                       throw new NullPointerException("JobConf may not be 
null.");
-               }
-               
-               this.mapper = hadoopMapper;
-               this.jobConf = conf;
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               this.mapper.configure(jobConf);
-               
-               this.reporter = new HadoopDummyReporter();
-               this.outputCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
-       }
-
-       @Override
-       public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
-                       throws Exception {
-               outputCollector.setFlinkCollector(out);
-               mapper.map(value.f0, value.f1, outputCollector, reporter);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {     
-               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
-               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, 
mapper.getClass(), 3);
-               
-               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
-               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
-       }
-       
-       /**
-        * Custom serialization methods.
-        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-        */
-       private void writeObject(final ObjectOutputStream out) throws 
IOException {
-               out.writeObject(mapper.getClass());
-               jobConf.write(out);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
-                               
(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-               mapper = InstantiationUtil.instantiate(mapperClass);
-               
-               jobConf = new JobConf();
-               jobConf.readFields(in);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
deleted file mode 100644
index c1acc2b..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a 
combinable Flink GroupReduceFunction.
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> 
-       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
-       implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, 
Tuple2<KEYIN,VALUEIN>>,
-                               ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, 
Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
-       private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
-       private transient JobConf jobConf;
-       
-       private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> 
valueIterator;
-       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
reduceCollector;
-       private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
-       private transient Reporter reporter;
-
-       /**
-        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
-        * 
-        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
-        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
-        */
-       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
-                                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
-               this(hadoopReducer, hadoopCombiner, new JobConf());
-       }
-       
-       /**
-        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
-        * 
-        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
-        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
-        * @param conf The JobConf that is used to configure both Hadoop 
Reducers.
-        */
-       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
-                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
-               if(hadoopReducer == null) {
-                       throw new NullPointerException("Reducer may not be 
null.");
-               }
-               if(hadoopCombiner == null) {
-                       throw new NullPointerException("Combiner may not be 
null.");
-               }
-               if(conf == null) {
-                       throw new NullPointerException("JobConf may not be 
null.");
-               }
-               
-               this.reducer = hadoopReducer;
-               this.combiner = hadoopCombiner;
-               this.jobConf = conf;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               this.reducer.configure(jobConf);
-               this.combiner.configure(jobConf);
-               
-               this.reporter = new HadoopDummyReporter();
-               Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
-               this.valueIterator = new 
HadoopTupleUnwrappingIterator<>(keySerializer);
-               this.combineCollector = new HadoopOutputCollector<>();
-               this.reduceCollector = new HadoopOutputCollector<>();
-       }
-
-       @Override
-       public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out)
-                       throws Exception {
-               reduceCollector.setFlinkCollector(out);
-               valueIterator.set(values.iterator());
-               reducer.reduce(valueIterator.getCurrentKey(), valueIterator, 
reduceCollector, reporter);
-       }
-
-       @Override
-       public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
-               combineCollector.setFlinkCollector(out);
-               valueIterator.set(values.iterator());
-               combiner.reduce(valueIterator.getCurrentKey(), valueIterator, 
combineCollector, reporter);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
-               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
-
-               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass(outKeyClass);
-               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass(outValClass);
-               return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
-       }
-
-       /**
-        * Custom serialization methods.
-        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-        */
-       private void writeObject(final ObjectOutputStream out) throws 
IOException {
-               
-               out.writeObject(reducer.getClass());
-               out.writeObject(combiner.getClass());
-               jobConf.write(out);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               
-               Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
-                               
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-               reducer = InstantiationUtil.instantiate(reducerClass);
-               
-               Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
-                               
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
-               combiner = InstantiationUtil.instantiate(combinerClass);
-               
-               jobConf = new JobConf();
-               jobConf.readFields(in);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
deleted file mode 100644
index 55aea24..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction. 
- */
-@SuppressWarnings("rawtypes")
-@Public
-public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-                                       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
-                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
-       private transient JobConf jobConf;
-       
-       private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> 
valueIterator;
-       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
reduceCollector;
-       private transient Reporter reporter;
-       
-       /**
-        * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction.
-        * 
-        * @param hadoopReducer The Hadoop Reducer to wrap.
-        */
-       public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopReducer) {
-               this(hadoopReducer, new JobConf());
-       }
-       
-       /**
-        * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction.
-        * 
-        * @param hadoopReducer The Hadoop Reducer to wrap.
-        * @param conf The JobConf that is used to configure the Hadoop Reducer.
-        */
-       public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopReducer, JobConf conf) {
-               if(hadoopReducer == null) {
-                       throw new NullPointerException("Reducer may not be 
null.");
-               }
-               if(conf == null) {
-                       throw new NullPointerException("JobConf may not be 
null.");
-               }
-               
-               this.reducer = hadoopReducer;
-               this.jobConf = conf;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               this.reducer.configure(jobConf);
-               
-               this.reporter = new HadoopDummyReporter();
-               this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
-               Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
-               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
-       }
-
-       @Override
-       public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out)
-                       throws Exception {
-               
-               reduceCollector.setFlinkCollector(out);
-               valueIterator.set(values.iterator());
-               reducer.reduce(valueIterator.getCurrentKey(), valueIterator, 
reduceCollector, reporter);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
-               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
-
-               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
-               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
-       }
-
-       /**
-        * Custom serialization methods
-        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-        */
-       private void writeObject(final ObjectOutputStream out) throws 
IOException {
-               
-               out.writeObject(reducer.getClass());
-               jobConf.write(out);             
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               
-               Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
-                               
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-               reducer = InstantiationUtil.instantiate(reducerClass);
-               
-               jobConf = new JobConf();
-               jobConf.readFields(in);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
deleted file mode 100644
index bfe03d3..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import java.io.IOException;
-
-/**
- * A Hadoop OutputCollector that wraps a Flink OutputCollector.
- * On each call of collect() the data is forwarded to the wrapped Flink 
collector.
- */
-public final class HadoopOutputCollector<KEY,VALUE> implements 
OutputCollector<KEY,VALUE> {
-
-       private Collector<Tuple2<KEY,VALUE>> flinkCollector;
-
-       private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
-
-       /**
-        * Set the wrapped Flink collector.
-        * 
-        * @param flinkCollector The wrapped Flink OutputCollector.
-        */
-       public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> 
flinkCollector) {
-               this.flinkCollector = flinkCollector;
-       }
-       
-       /**
-        * Use the wrapped Flink collector to collect a key-value pair for 
Flink. 
-        * 
-        * @param key the key to collect
-        * @param val the value to collect
-        * @throws IOException unexpected of key or value in key-value pair.
-        */
-       @Override
-       public void collect(final KEY key, final VALUE val) throws IOException {
-               this.outTuple.f0 = key;
-               this.outTuple.f1 = val;
-               this.flinkCollector.collect(outTuple);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
deleted file mode 100644
index 2d204b8..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the 
second (value) field.
- */
-public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
-               extends TupleUnwrappingIterator<VALUE, KEY> implements 
java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private final TypeSerializer<KEY> keySerializer;
-
-       private transient Iterator<Tuple2<KEY,VALUE>> iterator;
-       
-       private transient KEY curKey;
-       private transient VALUE firstValue;
-       private transient boolean atFirst;
-
-       public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) 
{
-               this.keySerializer = checkNotNull(keySerializer);
-       }
-       
-       /**
-        * Set the Flink iterator to wrap.
-        * 
-        * @param iterator The Flink iterator to wrap.
-        */
-       @Override
-       public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
-               this.iterator = iterator;
-               if(this.hasNext()) {
-                       final Tuple2<KEY, VALUE> tuple = iterator.next();
-                       this.curKey = keySerializer.copy(tuple.f0);
-                       this.firstValue = tuple.f1;
-                       this.atFirst = true;
-               } else {
-                       this.atFirst = false;
-               }
-       }
-       
-       @Override
-       public boolean hasNext() {
-               if(this.atFirst) {
-                       return true;
-               }
-               return iterator.hasNext();
-       }
-       
-       @Override
-       public VALUE next() {
-               if(this.atFirst) {
-                       this.atFirst = false;
-                       return firstValue;
-               }
-               
-               final Tuple2<KEY, VALUE> tuple = iterator.next();
-               return tuple.f1;
-       }
-       
-       public KEY getCurrentKey() {
-               return this.curKey;
-       }
-       
-       @Override
-       public void remove() {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
deleted file mode 100644
index 133a5f4..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/hadoopcompatibility/scala/HadoopInputs.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.hadoopcompatibility.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.hadoop.mapreduce
-import org.apache.flink.api.scala.hadoop.mapred
-import org.apache.hadoop.fs.{Path => HadoopPath}
-import org.apache.hadoop.mapred.{JobConf, FileInputFormat => 
MapredFileInputFormat, InputFormat => MapredInputFormat}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
MapreduceFileInputFormat}
-import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}
-
-/**
-  * HadoopInputs is a utility class to use Apache Hadoop InputFormats with 
Apache Flink.
-  *
-  * It provides methods to create Flink InputFormat wrappers for Hadoop
-  * [[org.apache.hadoop.mapred.InputFormat]] and 
[[org.apache.hadoop.mapreduce.InputFormat]].
-  *
-  * Key value pairs produced by the Hadoop InputFormats are converted into 
[[Tuple2]] where
-  * the first field is the key and the second field is the value.
-  *
-  */
-object HadoopInputs {
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
-    * [[org.apache.hadoop.mapred.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapredInputFormat: MapredFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String,
-      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
-
-    // set input path in JobConf
-    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
-    // wrap mapredInputFormat
-    createHadoopInput(mapredInputFormat, key, value, job)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
-    * [[org.apache.hadoop.mapred.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapredInputFormat: MapredFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
-
-    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
reads a Hadoop sequence
-    * file with the given key and value classes.
-    */
-  def readSequenceFile[K, V](
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
-
-    readHadoopFile(
-      new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
-      key,
-      value,
-      inputPath
-    )
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
-    * [[org.apache.hadoop.mapred.InputFormat]].
-    */
-  def createHadoopInput[K, V](
-      mapredInputFormat: MapredInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      job: JobConf)(implicit tpe: TypeInformation[(K, V)]): 
mapred.HadoopInputFormat[K, V] = {
-
-    new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
-    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String,
-      job: Job)(implicit tpe: TypeInformation[(K, V)]): 
mapreduce.HadoopInputFormat[K, V] = {
-
-    // set input path in Job
-    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
-    // wrap mapreduceInputFormat
-    createHadoopInput(mapreduceInputFormat, key, value, job)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
-    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
-    */
-  def readHadoopFile[K, V](
-      mapreduceInputFormat: MapreduceFileInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      inputPath: String)(implicit tpe: TypeInformation[(K, V)]): 
mapreduce.HadoopInputFormat[K, V] =
-  {
-    readHadoopFile(mapreduceInputFormat, key, value, inputPath, 
Job.getInstance)
-  }
-
-  /**
-    * Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that 
wraps the given Hadoop
-    * [[org.apache.hadoop.mapreduce.InputFormat]].
-    */
-  def createHadoopInput[K, V](
-      mapreduceInputFormat: MapreduceInputFormat[K, V],
-      key: Class[K],
-      value: Class[V],
-      job: Job)(implicit tpe: TypeInformation[(K, V)]): 
mapreduce.HadoopInputFormat[K, V] = {
-
-    new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, 
job)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
deleted file mode 100644
index 2aefd9f..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableExtractionTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparator;
-
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class WritableExtractionTest {
-
-       @Test
-       public void testDetectWritable() {
-               // writable interface itself must not be writable
-               assertFalse(TypeExtractor.isHadoopWritable(Writable.class));
-
-               // various forms of extension
-               
assertTrue(TypeExtractor.isHadoopWritable(DirectWritable.class));
-               
assertTrue(TypeExtractor.isHadoopWritable(ViaInterfaceExtension.class));
-               
assertTrue(TypeExtractor.isHadoopWritable(ViaAbstractClassExtension.class));
-
-               // some non-writables
-               assertFalse(TypeExtractor.isHadoopWritable(String.class));
-               assertFalse(TypeExtractor.isHadoopWritable(List.class));
-               
assertFalse(TypeExtractor.isHadoopWritable(WritableComparator.class));
-       }
-
-       @Test
-       public void testCreateWritableInfo() {
-               TypeInformation<DirectWritable> info1 =
-                               
TypeExtractor.createHadoopWritableTypeInfo(DirectWritable.class);
-               assertEquals(DirectWritable.class, info1.getTypeClass());
-
-               TypeInformation<ViaInterfaceExtension> info2 =
-                               
TypeExtractor.createHadoopWritableTypeInfo(ViaInterfaceExtension.class);
-               assertEquals(ViaInterfaceExtension.class, info2.getTypeClass());
-
-               TypeInformation<ViaAbstractClassExtension> info3 = 
-                               
TypeExtractor.createHadoopWritableTypeInfo(ViaAbstractClassExtension.class);
-               assertEquals(ViaAbstractClassExtension.class, 
info3.getTypeClass());
-       }
-
-       @Test
-       public void testValidateTypeInfo() {
-               // validate unrelated type info
-               
TypeExtractor.validateIfWritable(BasicTypeInfo.STRING_TYPE_INFO, String.class);
-
-               // validate writable type info correctly
-               TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-                               DirectWritable.class), DirectWritable.class);
-               TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-                               ViaInterfaceExtension.class), 
ViaInterfaceExtension.class);
-               TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-                               ViaAbstractClassExtension.class), 
ViaAbstractClassExtension.class);
-
-               // incorrect case: not writable at all
-               try {
-                       TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-                                       DirectWritable.class), String.class);
-                       fail("should have failed with an exception");
-               } catch (InvalidTypesException e) {
-                       // expected
-               }
-
-               // incorrect case: wrong writable
-               try {
-                       TypeExtractor.validateIfWritable(new WritableTypeInfo<>(
-                                       ViaInterfaceExtension.class), 
DirectWritable.class);
-                       fail("should have failed with an exception");
-               } catch (InvalidTypesException e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testExtractFromFunction() {
-               RichMapFunction<DirectWritable, DirectWritable> function = new 
RichMapFunction<DirectWritable, DirectWritable>() {
-                       @Override
-                       public DirectWritable map(DirectWritable value) throws 
Exception {
-                               return null;
-                       }
-               };
-
-               TypeInformation<DirectWritable> outType = 
-                               TypeExtractor.getMapReturnTypes(function, new 
WritableTypeInfo<>(DirectWritable.class));
-
-               assertTrue(outType instanceof WritableTypeInfo);
-               assertEquals(DirectWritable.class, outType.getTypeClass());
-       }
-
-       @Test
-       public void testExtractAsPartOfPojo() {
-               PojoTypeInfo<PojoWithWritable> pojoInfo = 
-                               (PojoTypeInfo<PojoWithWritable>) 
TypeExtractor.getForClass(PojoWithWritable.class);
-
-               boolean foundWritable = false;
-               for (int i = 0; i < pojoInfo.getArity(); i++) {
-                       PojoField field = pojoInfo.getPojoFieldAt(i);
-                       String name = field.getField().getName();
-                       
-                       if (name.equals("hadoopCitizen")) {
-                               if (foundWritable) {
-                                       fail("already seen");
-                               }
-                               foundWritable = true;
-                               assertEquals(new 
WritableTypeInfo<>(DirectWritable.class), field.getTypeInformation());
-                               assertEquals(DirectWritable.class, 
field.getTypeInformation().getTypeClass());
-                               
-                       }
-               }
-               
-               assertTrue("missed the writable type", foundWritable);
-       }
-
-       @Test
-       public void testInputValidationError() {
-
-               RichMapFunction<Writable, String> function = new 
RichMapFunction<Writable, String>() {
-                       @Override
-                       public String map(Writable value) throws Exception {
-                               return null;
-                       }
-               };
-
-               @SuppressWarnings("unchecked")
-               TypeInformation<Writable> inType = 
-                               (TypeInformation<Writable>) 
(TypeInformation<?>) new WritableTypeInfo<>(DirectWritable.class);
-               
-               try {
-                       TypeExtractor.getMapReturnTypes(function, inType);
-                       fail("exception expected");
-               }
-               catch (InvalidTypesException e) {
-                       // right
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test type classes
-       // 
------------------------------------------------------------------------
-
-       public interface ExtendedWritable extends Writable {}
-
-       public static abstract class AbstractWritable implements Writable {}
-
-       public static class DirectWritable implements Writable {
-
-               @Override
-               public void write(DataOutput dataOutput) throws IOException {}
-
-               @Override
-               public void readFields(DataInput dataInput) throws IOException 
{}
-       }
-
-       public static class ViaInterfaceExtension implements ExtendedWritable {
-
-               @Override
-               public void write(DataOutput dataOutput) throws IOException {}
-
-               @Override
-               public void readFields(DataInput dataInput) throws IOException 
{}
-       }
-
-       public static class ViaAbstractClassExtension extends AbstractWritable {
-
-               @Override
-               public void write(DataOutput dataOutput) throws IOException {}
-
-               @Override
-               public void readFields(DataInput dataInput) throws IOException 
{}
-       }
-
-       public static class PojoWithWritable {
-               public String str;
-               public DirectWritable hadoopCitizen;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
deleted file mode 100644
index 3d2b652..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.Writable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class WritableInfoParserTest {
-
-       @Test
-       public void testWritableType() {
-               TypeInformation<?> ti = TypeInfoParser.parse(
-                               
"Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
-
-               Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
-               Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) 
ti).getTypeClass());
-       }
-
-       @Test
-       public void testPojoWithWritableType() {
-               TypeInformation<?> ti = TypeInfoParser.parse(
-                               
"org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
-                               + "basic=Integer,"
-                               + "tuple=Tuple2<String, Integer>,"
-                               + 
"hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
-                               + "array=String[]"
-                               + ">");
-               Assert.assertTrue(ti instanceof PojoTypeInfo);
-               PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
-               Assert.assertEquals("array", 
pti.getPojoFieldAt(0).getField().getName());
-               Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() 
instanceof BasicArrayTypeInfo);
-               Assert.assertEquals("basic", 
pti.getPojoFieldAt(1).getField().getName());
-               Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() 
instanceof BasicTypeInfo);
-               Assert.assertEquals("hadoopCitizen", 
pti.getPojoFieldAt(2).getField().getName());
-               Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() 
instanceof WritableTypeInfo);
-               Assert.assertEquals("tuple", 
pti.getPojoFieldAt(3).getField().getName());
-               Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() 
instanceof TupleTypeInfo);
-       }
-       // 
------------------------------------------------------------------------
-       //  Test types
-       // 
------------------------------------------------------------------------
-
-       public static class MyWritable implements Writable {
-
-               @Override
-               public void write(DataOutput out) throws IOException {}
-
-               @Override
-               public void readFields(DataInput in) throws IOException {}
-       }
-
-       public static class MyPojo {
-               public Integer basic;
-               public Tuple2<String, Integer> tuple;
-               public MyWritable hadoopCitizen;
-               public String[] array;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
deleted file mode 100644
index eb9cdf0..0000000
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.io.Writable;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class WritableTypeInfoTest extends TestLogger {
-       
-       @Test
-       public void testWritableTypeInfoEquality() {
-               WritableTypeInfo<TestClass> tpeInfo1 = new 
WritableTypeInfo<>(TestClass.class);
-               WritableTypeInfo<TestClass> tpeInfo2 = new 
WritableTypeInfo<>(TestClass.class);
-
-               assertEquals(tpeInfo1, tpeInfo2);
-               assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
-       }
-
-       @Test
-       public void testWritableTypeInfoInequality() {
-               WritableTypeInfo<TestClass> tpeInfo1 = new 
WritableTypeInfo<>(TestClass.class);
-               WritableTypeInfo<AlternateClass> tpeInfo2 = new 
WritableTypeInfo<>(AlternateClass.class);
-
-               assertNotEquals(tpeInfo1, tpeInfo2);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test types
-       // 
------------------------------------------------------------------------
-
-       public static class TestClass implements Writable {
-
-               @Override
-               public void write(DataOutput dataOutput) throws IOException {}
-
-               @Override
-               public void readFields(DataInput dataInput) throws IOException 
{}
-       }
-
-       public static class AlternateClass implements Writable {
-
-               @Override
-               public void write(DataOutput dataOutput) throws IOException {}
-
-               @Override
-               public void readFields(DataInput dataInput) throws IOException 
{}
-       }
-}

Reply via email to