http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/pom.xml 
b/flink-streaming-connectors/flink-connector-flume/pom.xml
deleted file mode 100644
index 1b1b810..0000000
--- a/flink-streaming-connectors/flink-connector-flume/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-flume_2.10</artifactId>
-       <name>flink-connector-flume</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <flume-ng.version>1.5.0</flume-ng.version>
-       </properties>
-
-       <dependencies>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flume</groupId>
-                       <artifactId>flume-ng-core</artifactId>
-                       <version>${flume-ng.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       <artifactId>slf4j-log4j12</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>log4j</groupId>
-                                       <artifactId>log4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-io</groupId>
-                                       <artifactId>commons-io</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-codec</groupId>
-                                       <artifactId>commons-codec</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-cli</groupId>
-                                       <artifactId>commons-cli</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-lang</groupId>
-                                       <artifactId>commons-lang</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.avro</groupId>
-                                       <artifactId>avro</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.codehaus.jackson</groupId>
-                                       
<artifactId>jackson-core-asl</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.codehaus.jackson</groupId>
-                                       
<artifactId>jackson-mapper-asl</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       
<groupId>com.thoughtworks.paranamer</groupId>
-                                       <artifactId>paranamer</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.xerial.snappy</groupId>
-                                       <artifactId>snappy-java</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.tukaani</groupId>
-                                       <artifactId>xz</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.velocity</groupId>
-                                       <artifactId>velocity</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>commons-collections</groupId>
-                                       
<artifactId>commons-collections</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>servlet-api</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jetty-util</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.mortbay.jetty</groupId>
-                                       <artifactId>jetty</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.google.code.gson</groupId>
-                                       <artifactId>gson</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.thrift</groupId>
-                                       <artifactId>libthrift</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <plugin>
-                               <!-- Override artifactSet configuration to 
build fat-jar with all dependencies packed. -->
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>shade-flink</id>
-                                               <configuration>
-                                                       <artifactSet>
-                                                               <includes 
combine.children="append">
-                                                                       <!-- We 
include all dependencies that transitively depend on guava -->
-                                                                       
<include>org.apache.flume:*</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <transformers>
-                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-                                                       </transformers>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
 
b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
deleted file mode 100644
index 2dc043b..0000000
--- 
a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlumeSink<IN> extends RichSinkFunction<IN> {
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlumeSink.class);
-
-       private transient FlinkRpcClientFacade client;
-       boolean initDone = false;
-       String host;
-       int port;
-       SerializationSchema<IN> schema;
-
-       public FlumeSink(String host, int port, SerializationSchema<IN> schema) 
{
-               this.host = host;
-               this.port = port;
-               this.schema = schema;
-       }
-
-       /**
-        * Receives tuples from the Apache Flink {@link DataStream} and forwards
-        * them to Apache Flume.
-        * 
-        * @param value
-        *            The tuple arriving from the datastream
-        */
-       @Override
-       public void invoke(IN value) {
-
-               byte[] data = schema.serialize(value);
-               client.sendDataToFlume(data);
-
-       }
-
-       private class FlinkRpcClientFacade {
-               private RpcClient client;
-               private String hostname;
-               private int port;
-
-               /**
-                * Initializes the connection to Apache Flume.
-                * 
-                * @param hostname
-                *            The host
-                * @param port
-                *            The port.
-                */
-               public void init(String hostname, int port) {
-                       // Setup the RPC connection
-                       this.hostname = hostname;
-                       this.port = port;
-                       int initCounter = 0;
-                       while (true) {
-                               if (initCounter >= 90) {
-                                       throw new RuntimeException("Cannot 
establish connection with" + port + " at "
-                                                       + host);
-                               }
-                               try {
-                                       this.client = 
RpcClientFactory.getDefaultInstance(hostname, port);
-                               } catch (FlumeException e) {
-                                       // Wait one second if the connection 
failed before the next
-                                       // try
-                                       try {
-                                               Thread.sleep(1000);
-                                       } catch (InterruptedException e1) {
-                                               if (LOG.isErrorEnabled()) {
-                                                       LOG.error("Interrupted 
while trying to connect {} at {}", port, host);
-                                               }
-                                       }
-                               }
-                               if (client != null) {
-                                       break;
-                               }
-                               initCounter++;
-                       }
-                       initDone = true;
-               }
-
-               /**
-                * Sends byte arrays as {@link Event} series to Apache Flume.
-                * 
-                * @param data
-                *            The byte array to send to Apache FLume
-                */
-               public void sendDataToFlume(byte[] data) {
-                       Event event = EventBuilder.withBody(data);
-
-                       try {
-                               client.append(event);
-
-                       } catch (EventDeliveryException e) {
-                               // clean up and recreate the client
-                               client.close();
-                               client = null;
-                               client = 
RpcClientFactory.getDefaultInstance(hostname, port);
-                       }
-               }
-
-       }
-
-       @Override
-       public void close() {
-               client.client.close();
-       }
-
-       @Override
-       public void open(Configuration config) {
-               client = new FlinkRpcClientFacade();
-               client.init(host, port);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
deleted file mode 100644
index 04019f8..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ /dev/null
@@ -1,205 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
-       <name>flink-connector-kafka-0.10</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <kafka.version>0.10.0.1</kafka.version>
-       </properties>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.apache.kafka</groupId>
-                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- Add Kafka 0.10.x as a dependency -->
-
-               <dependency>
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka-clients</artifactId>
-                       <version>${kafka.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-                       <!-- Projects depending on this project,
-                       won't depend on flink-table. -->
-                       <optional>true</optional>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <!-- exclude Kafka dependencies -->
-                               <exclusion>
-                                       <groupId>org.apache.kafka</groupId>
-                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
-                               </exclusion>
-                       </exclusions>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <exclusions>
-                               <!-- exclude Kafka dependencies -->
-                               <exclusion>
-                                       <groupId>org.apache.kafka</groupId>
-                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
-                               </exclusion>
-                       </exclusions>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <!-- include 0.10 server for tests  -->
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka_${scala.binary.version}</artifactId>
-                       <version>${kafka.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-jmx</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <includes>
-                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-source-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>attach-test-sources</id>
-                                               <goals>
-                                                       
<goal>test-jar-no-fork</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <includes>
-                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-surefire-plugin</artifactId>
-                               <configuration>
-                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
-                                       <forkCount>1</forkCount>
-                                       <argLine>-Xms256m -Xmx1000m 
-Dlog4j.configuration=${log4j.configuration} 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
-                               </configuration>
-                       </plugin>
-               </plugins>
-       </build>
-       
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
deleted file mode 100644
index a9ce336..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.SerializedValue;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
- * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, 
each of which will pull
- * data from one or more Kafka partitions. 
- * 
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
- * during a failure, and that the computation processes elements "exactly 
once". 
- * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
- *
- * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
- * has consumed a topic.</p>
- *
- * <p>Please refer to Kafka's documentation for the available configuration 
properties:
- * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata 
when the consumer
- * is constructed. That means that the client that submits the program needs 
to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
-
-       private static final long serialVersionUID = 2324564345203409112L;
-
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param valueDeserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
-               this(Collections.singletonList(topic), valueDeserializer, 
props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
-        *
-        * This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
-        * pairs, offsets, and topic names from Kafka.
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param deserializer
-        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer010(String topic, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               this(Collections.singletonList(topic), deserializer, props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
-        *
-        * This constructor allows passing multiple topics to the consumer.
-        *
-        * @param topics
-        *           The Kafka topics to read from.
-        * @param deserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        */
-       public FlinkKafkaConsumer010(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
-               this(topics, new 
KeyedDeserializationSchemaWrapper<>(deserializer), props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.10.x
-        *
-        * This constructor allows passing multiple topics and a key/value 
deserialization schema.
-        *
-        * @param topics
-        *           The Kafka topics to read from.
-        * @param deserializer
-        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        */
-       public FlinkKafkaConsumer010(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(topics, deserializer, props);
-       }
-
-       @Override
-       protected AbstractFetcher<T, ?> createFetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext) throws 
Exception {
-
-               boolean useMetrics = 
!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
-
-               return new Kafka010Fetcher<>(
-                               sourceContext,
-                               thisSubtaskPartitions,
-                               watermarksPeriodic,
-                               watermarksPunctuated,
-                               runtimeContext.getProcessingTimeService(),
-                               
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
-                               runtimeContext.getUserCodeClassLoader(),
-                               runtimeContext.isCheckpointingEnabled(),
-                               runtimeContext.getTaskNameWithSubtasks(),
-                               runtimeContext.getMetricGroup(),
-                               deserializer,
-                               properties,
-                               pollTimeout,
-                               useMetrics);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
deleted file mode 100644
index cc0194b..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.util.Properties;
-
-import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.10.x
- *
- * Implementation note: This producer is a hybrid between a regular regular 
sink function (a)
- * and a custom operator (b).
- *
- * For (a), the class implements the SinkFunction and RichFunction interfaces.
- * For (b), it extends the StreamTask class.
- *
- * Details about approach (a):
- *
- *  Pre Kafka 0.10 producers only follow approach (a), allowing users to use 
the producer using the
- *  DataStream.addSink() method.
- *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
- *  the Kafka 0.10 producer has a second invocation option, approach (b).
- *
- * Details about approach (b):
- *  Kafka 0.10 supports writing the timestamp attached to a record to Kafka. 
When adding the
- *  FlinkKafkaProducer010 using the 
FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
- *  can access the internal record timestamp of the record and write it to 
Kafka.
- *
- * All methods and constructors in this class are marked with the approach 
they are needed for.
- */
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements 
SinkFunction<T>, RichFunction {
-
-       /**
-        * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
-        */
-       private boolean writeTimestampToKafka = false;
-
-       // ---------------------- "Constructors" for timestamp writing 
------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
-        *
-        * @param inStream The stream to write to Kafka
-        * @param topicId ID of the Kafka topic.
-        * @param serializationSchema User defined serialization schema 
supporting key/value messages
-        * @param producerConfig Properties with the producer configuration.
-        */
-       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
-                                                                               
                                                                                
        String topicId,
-                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
-                                                                               
                                                                                
        Properties producerConfig) {
-               return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FixedPartitioner<T>());
-       }
-
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
-        *
-        * @param inStream The stream to write to Kafka
-        * @param topicId ID of the Kafka topic.
-        * @param serializationSchema User defined (keyless) serialization 
schema.
-        * @param producerConfig Properties with the producer configuration.
-        */
-       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
-                                                                               
                                                                                
        String topicId,
-                                                                               
                                                                                
        SerializationSchema<T> serializationSchema,
-                                                                               
                                                                                
        Properties producerConfig) {
-               return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<T>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
-        *
-        *  @param inStream The stream to write to Kafka
-        *  @param topicId The name of the target topic
-        *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
-        */
-       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
-                                                                               
                                                                                
        String topicId,
-                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
-                                                                               
                                                                                
        Properties producerConfig,
-                                                                               
                                                                                
        KafkaPartitioner<T> customPartitioner) {
-
-               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
-               FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
-               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
-       }
-
-       // ---------------------- Regular constructors w/o timestamp support  
------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        */
-       public FlinkKafkaProducer010(String brokerList, String topicId, 
SerializationSchema<T> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<T>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-        */
-       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, KafkaPartitioner<T> 
customPartitioner) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-       }
-
-       // ------------------- Key/Value serialization schema constructors 
----------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        */
-       public FlinkKafkaProducer010(String brokerList, String topicId, 
KeyedSerializationSchema<T> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<T>());
-       }
-
-       /**
-        * Create Kafka producer
-        *
-        * This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
-        */
-       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
KafkaPartitioner<T> customPartitioner) {
-               // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
-               // invoke call.
-               super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, customPartitioner));
-       }
-
-
-       // ----------------------------- Generic element processing  
---------------------------
-
-       private void invokeInternal(T next, long elementTimestamp) throws 
Exception {
-
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-
-               internalProducer.checkErroneous();
-
-               byte[] serializedKey = 
internalProducer.schema.serializeKey(next);
-               byte[] serializedValue = 
internalProducer.schema.serializeValue(next);
-               String targetTopic = 
internalProducer.schema.getTargetTopic(next);
-               if (targetTopic == null) {
-                       targetTopic = internalProducer.defaultTopicId;
-               }
-
-               Long timestamp = null;
-               if(this.writeTimestampToKafka) {
-                       timestamp = elementTimestamp;
-               }
-
-               ProducerRecord<byte[], byte[]> record;
-               if (internalProducer.partitioner == null) {
-                       record = new ProducerRecord<>(targetTopic, null, 
timestamp, serializedKey, serializedValue);
-               } else {
-                       record = new ProducerRecord<>(targetTopic, 
internalProducer.partitioner.partition(next, serializedKey, serializedValue, 
internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
-               }
-               if (internalProducer.flushOnCheckpoint) {
-                       synchronized (internalProducer.pendingRecordsLock) {
-                               internalProducer.pendingRecords++;
-                       }
-               }
-               internalProducer.producer.send(record, 
internalProducer.callback);
-       }
-
-
-       // ----------------- Helper methods implementing methods from 
SinkFunction and RichFunction (Approach (a)) ----
-
-
-       // ---- Configuration setters
-
-       /**
-        * Defines whether the producer should fail on errors, or only log them.
-        * If this is set to true, then exceptions will be only logged, if set 
to false,
-        * exceptions will be eventually thrown and cause the streaming program 
to
-        * fail (and enter recovery).
-        *
-        * Method is only accessible for approach (a) (see above)
-        *
-        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
-        */
-       public void setLogFailuresOnly(boolean logFailuresOnly) {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.setLogFailuresOnly(logFailuresOnly);
-       }
-
-       /**
-        * If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka buffers
-        * to be acknowledged by the Kafka producer on a checkpoint.
-        * This way, the producer can guarantee that messages in the Kafka 
buffers are part of the checkpoint.
-        *
-        * Method is only accessible for approach (a) (see above)
-        *
-        * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
-        */
-       public void setFlushOnCheckpoint(boolean flush) {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.setFlushOnCheckpoint(flush);
-       }
-
-       /**
-        * This method is used for approach (a) (see above)
-        *
-        */
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.open(parameters);
-       }
-
-       /**
-        * This method is used for approach (a) (see above)
-        */
-       @Override
-       public IterationRuntimeContext getIterationRuntimeContext() {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               return internalProducer.getIterationRuntimeContext();
-       }
-
-       /**
-        * This method is used for approach (a) (see above)
-        */
-       @Override
-       public void setRuntimeContext(RuntimeContext t) {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.setRuntimeContext(t);
-       }
-
-       /**
-        * Invoke method for using the Sink as DataStream.addSink() sink.
-        *
-        * This method is used for approach (a) (see above)
-        *
-        * @param value The input record.
-        */
-       @Override
-       public void invoke(T value) throws Exception {
-               invokeInternal(value, Long.MAX_VALUE);
-       }
-
-
-       // ----------------- Helper methods and classes implementing methods 
from StreamSink (Approach (b)) ----
-
-
-       /**
-        * Process method for using the sink with timestamp support.
-        *
-        * This method is used for approach (b) (see above)
-        */
-       @Override
-       public void processElement(StreamRecord<T> element) throws Exception {
-               invokeInternal(element.getValue(), element.getTimestamp());
-       }
-
-       /**
-        * Configuration object returned by the writeToKafkaWithTimestamps() 
call.
-        */
-       public static class FlinkKafkaProducer010Configuration<T> extends 
DataStreamSink<T> {
-
-               private final FlinkKafkaProducerBase wrappedProducerBase;
-               private final FlinkKafkaProducer010 producer;
-
-               private FlinkKafkaProducer010Configuration(DataStream stream, 
FlinkKafkaProducer010<T> producer) {
-                       //noinspection unchecked
-                       super(stream, producer);
-                       this.producer = producer;
-                       this.wrappedProducerBase = (FlinkKafkaProducerBase) 
producer.userFunction;
-               }
-
-               /**
-                * Defines whether the producer should fail on errors, or only 
log them.
-                * If this is set to true, then exceptions will be only logged, 
if set to false,
-                * exceptions will be eventually thrown and cause the streaming 
program to
-                * fail (and enter recovery).
-                *
-                * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
-                */
-               public void setLogFailuresOnly(boolean logFailuresOnly) {
-                       
this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
-               }
-
-               /**
-                * If set to true, the Flink producer will wait for all 
outstanding messages in the Kafka buffers
-                * to be acknowledged by the Kafka producer on a checkpoint.
-                * This way, the producer can guarantee that messages in the 
Kafka buffers are part of the checkpoint.
-                *
-                * @param flush Flag indicating the flushing mode (true = flush 
on checkpoint)
-                */
-               public void setFlushOnCheckpoint(boolean flush) {
-                       this.wrappedProducerBase.setFlushOnCheckpoint(flush);
-               }
-
-               /**
-                * If set to true, Flink will write the (event time) timestamp 
attached to each record into Kafka.
-                * Timestamps must be positive for Kafka to accept them.
-                *
-                * @param writeTimestampToKafka Flag indicating if Flink's 
internal timestamps are written to Kafka.
-                */
-               public void setWriteTimestampToKafka(boolean 
writeTimestampToKafka) {
-                       this.producer.writeTimestampToKafka = 
writeTimestampToKafka;
-               }
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
deleted file mode 100644
index ddf1ad3..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.10.
- */
-public class Kafka010JsonTableSource extends Kafka09JsonTableSource {
-
-       /**
-        * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka010JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka010JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
-               return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
deleted file mode 100644
index 732440b..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.10.
- */
-public class Kafka010TableSource extends Kafka09TableSource {
-
-       /**
-        * Creates a Kafka 0.10 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka010TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.10 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka010TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
-               return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
deleted file mode 100644
index 71dd29a..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer 
API.
- * 
- * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It 
only additionally
- * takes the KafkaRecord-attached timestamp and attaches it to the Flink 
records.
- * 
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
-
-       public Kafka010Fetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       ProcessingTimeService processingTimeProvider,
-                       long autoWatermarkInterval,
-                       ClassLoader userCodeClassLoader,
-                       boolean enableCheckpointing,
-                       String taskNameWithSubtasks,
-                       MetricGroup metricGroup,
-                       KeyedDeserializationSchema<T> deserializer,
-                       Properties kafkaProperties,
-                       long pollTimeout,
-                       boolean useMetrics) throws Exception
-       {
-               super(
-                               sourceContext,
-                               assignedPartitions,
-                               watermarksPeriodic,
-                               watermarksPunctuated,
-                               processingTimeProvider,
-                               autoWatermarkInterval,
-                               userCodeClassLoader,
-                               enableCheckpointing,
-                               taskNameWithSubtasks,
-                               metricGroup,
-                               deserializer,
-                               kafkaProperties,
-                               pollTimeout,
-                               useMetrics);
-       }
-
-       @Override
-       protected void emitRecord(
-                       T record,
-                       KafkaTopicPartitionState<TopicPartition> partition,
-                       long offset,
-                       ConsumerRecord<?, ?> consumerRecord) throws Exception {
-
-               // we attach the Kafka 0.10 timestamp here
-               emitRecordWithTimestamp(record, partition, offset, 
consumerRecord.timestamp());
-       }
-
-       /**
-        * This method needs to be overridden because Kafka broke binary 
compatibility between 0.9 and 0.10,
-        * changing binary signatures
-        */
-       @Override
-       protected KafkaConsumerCallBridge010 createCallBridge() {
-               return new KafkaConsumerCallBridge010();
-       }
-
-       @Override
-       protected String getFetcherName() {
-               return "Kafka 0.10 Fetcher";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
deleted file mode 100644
index a81b098..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-
-/**
- * The ConsumerCallBridge simply calls the {@link 
KafkaConsumer#assign(java.util.Collection)} method.
- * 
- * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
- * changing {@code assign(List)} to {@code assign(Collection)}.
- * 
- * Because of that, we need two versions whose compiled code goes against 
different method signatures.
- */
-public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
-
-       @Override
-       public void assignPartitions(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) throws Exception {
-               consumer.assign(topicPartitions);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
deleted file mode 100644
index 6bdfb48..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
deleted file mode 100644
index 6ee0429..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-/**
- * Unit tests for the {@link Kafka010Fetcher}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConsumerThread.class)
-public class Kafka010FetcherTest {
-
-    @Test
-    public void testCommitDoesNotBlock() throws Exception {
-
-        // test data
-        final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
-        final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
-        testCommitData.put(testPartition, 11L);
-
-        // to synchronize when the consumer is in its blocking method
-        final OneShotLatch sync = new OneShotLatch();
-
-        // ----- the mock consumer with blocking poll calls ----
-        final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-        when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) 
throws InterruptedException {
-                sync.trigger();
-                blockerLatch.await();
-                return ConsumerRecords.empty();
-            }
-        });
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                blockerLatch.trigger();
-                return null;
-            }
-        }).when(mockConsumer).wakeup();
-
-        // make sure the fetcher creates the mock consumer
-        
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- create the test fetcher -----
-
-        @SuppressWarnings("unchecked")
-        SourceContext<String> sourceContext = mock(SourceContext.class);
-        List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition("test", 42));
-        KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext,
-                topics,
-                null, /* periodic assigner */
-                null, /* punctuated assigner */
-                new TestProcessingTimeService(),
-                10,
-                getClass().getClassLoader(),
-                false, /* checkpointing */
-                "taskname-with-subtask",
-                new UnregisteredMetricsGroup(),
-                schema,
-                new Properties(),
-                0L,
-                false);
-
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
-
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
-
-        // wait until the fetcher has reached the method of interest
-        sync.await();
-
-        // ----- trigger the offset commit -----
-
-        final AtomicReference<Throwable> commitError = new AtomicReference<>();
-        final Thread committer = new Thread("committer runner") {
-            @Override
-            public void run() {
-                try {
-                    fetcher.commitInternalOffsetsToKafka(testCommitData);
-                } catch (Throwable t) {
-                    commitError.set(t);
-                }
-            }
-        };
-        committer.start();
-
-        // ----- ensure that the committer finishes in time  -----
-        committer.join(30000);
-        assertFalse("The committer did not finish in time", 
committer.isAlive());
-
-        // ----- test done, wait till the fetcher is done for a clean shutdown 
-----
-        fetcher.cancel();
-        fetcherRunner.join();
-
-        // check that there were no errors in the fetcher
-        final Throwable fetcherError = error.get();
-        if (fetcherError != null && !(fetcherError instanceof 
Handover.ClosedException)) {
-            throw new Exception("Exception in the fetcher", fetcherError);
-        }
-        final Throwable committerError = commitError.get();
-        if (committerError != null) {
-            throw new Exception("Exception in the committer", committerError);
-        }
-    }
-
-    @Test
-    public void ensureOffsetsGetCommitted() throws Exception {
-
-        // test data
-        final KafkaTopicPartition testPartition1 = new 
KafkaTopicPartition("test", 42);
-        final KafkaTopicPartition testPartition2 = new 
KafkaTopicPartition("another", 99);
-
-        final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
-        testCommitData1.put(testPartition1, 11L);
-        testCommitData1.put(testPartition2, 18L);
-
-        final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
-        testCommitData2.put(testPartition1, 19L);
-        testCommitData2.put(testPartition2, 28L);
-
-        final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> 
commitStore = new LinkedBlockingQueue<>();
-
-
-        // ----- the mock consumer with poll(), wakeup(), and commit(A)sync 
calls ----
-
-        final MultiShotLatch blockerLatch = new MultiShotLatch();
-
-        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
-        when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) 
throws InterruptedException {
-                blockerLatch.await();
-                return ConsumerRecords.empty();
-            }
-        });
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                blockerLatch.trigger();
-                return null;
-            }
-        }).when(mockConsumer).wakeup();
-
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) {
-                @SuppressWarnings("unchecked")
-                Map<TopicPartition, OffsetAndMetadata> offsets =
-                        (Map<TopicPartition, OffsetAndMetadata>) 
invocation.getArguments()[0];
-
-                OffsetCommitCallback callback = (OffsetCommitCallback) 
invocation.getArguments()[1];
-
-                commitStore.add(offsets);
-                callback.onComplete(offsets, null);
-
-                return null;
-            }
-        }).when(mockConsumer).commitAsync(
-                Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), 
any(OffsetCommitCallback.class));
-
-        // make sure the fetcher creates the mock consumer
-        
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- create the test fetcher -----
-
-        @SuppressWarnings("unchecked")
-        SourceContext<String> sourceContext = mock(SourceContext.class);
-        List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition("test", 42));
-        KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext,
-                topics,
-                null, /* periodic assigner */
-                null, /* punctuated assigner */
-                new TestProcessingTimeService(),
-                10,
-                getClass().getClassLoader(),
-                false, /* checkpointing */
-                "taskname-with-subtask",
-                new UnregisteredMetricsGroup(),
-                schema,
-                new Properties(),
-                0L,
-                false);
-
-
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
-
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
-
-        // ----- trigger the first offset commit -----
-
-        fetcher.commitInternalOffsetsToKafka(testCommitData1);
-        Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
-        for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
-            TopicPartition partition = entry.getKey();
-            if (partition.topic().equals("test")) {
-                assertEquals(42, partition.partition());
-                assertEquals(12L, entry.getValue().offset());
-            }
-            else if (partition.topic().equals("another")) {
-                assertEquals(99, partition.partition());
-                assertEquals(18L, entry.getValue().offset());
-            }
-        }
-
-        // ----- trigger the second offset commit -----
-
-        fetcher.commitInternalOffsetsToKafka(testCommitData2);
-        Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
-        for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
-            TopicPartition partition = entry.getKey();
-            if (partition.topic().equals("test")) {
-                assertEquals(42, partition.partition());
-                assertEquals(20L, entry.getValue().offset());
-            }
-            else if (partition.topic().equals("another")) {
-                assertEquals(99, partition.partition());
-                assertEquals(28L, entry.getValue().offset());
-            }
-        }
-
-        // ----- test done, wait till the fetcher is done for a clean shutdown 
-----
-        fetcher.cancel();
-        fetcherRunner.join();
-
-        // check that there were no errors in the fetcher
-        final Throwable caughtError = error.get();
-        if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
-            throw new Exception("Exception in the fetcher", caughtError);
-        }
-    }
-
-    @Test
-    public void testCancellationWhenEmitBlocks() throws Exception {
-
-        // ----- some test data -----
-
-        final String topic = "test-topic";
-        final int partition = 3;
-        final byte[] payload = new byte[] {1, 2, 3, 4};
-
-        final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 15, 
payload, payload),
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 16, 
payload, payload),
-                new ConsumerRecord<byte[], byte[]>(topic, partition, 17, 
payload, payload));
-
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = 
new HashMap<>();
-        data.put(new TopicPartition(topic, partition), records);
-
-        final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
-
-        // ----- the test consumer -----
-
-        final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-        when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
-            @Override
-            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
-                return consumerRecords;
-            }
-        });
-
-        
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
-        // ----- build a fetcher -----
-
-        BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
-        List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition(topic, partition));
-        KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
-        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext,
-                topics,
-                null, /* periodic watermark extractor */
-                null, /* punctuated watermark extractor */
-                new TestProcessingTimeService(),
-                10, /* watermark interval */
-                this.getClass().getClassLoader(),
-                true, /* checkpointing */
-                "task_name",
-                new UnregisteredMetricsGroup(),
-                schema,
-                new Properties(),
-                0L,
-                false);
-
-
-        // ----- run the fetcher -----
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final Thread fetcherRunner = new Thread("fetcher runner") {
-
-            @Override
-            public void run() {
-                try {
-                    fetcher.runFetchLoop();
-                } catch (Throwable t) {
-                    error.set(t);
-                }
-            }
-        };
-        fetcherRunner.start();
-
-        // wait until the thread started to emit records to the source context
-        sourceContext.waitTillHasBlocker();
-
-        // now we try to cancel the fetcher, including the interruption 
usually done on the task thread
-        // once it has finished, there must be no more thread blocked on the 
source context
-        fetcher.cancel();
-        fetcherRunner.interrupt();
-        fetcherRunner.join();
-
-        assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
-    }
-
-    // ------------------------------------------------------------------------
-    //  test utilities
-    // ------------------------------------------------------------------------
-
-    private static final class BlockingSourceContext<T> implements 
SourceContext<T> {
-
-        private final ReentrantLock lock = new ReentrantLock();
-        private final OneShotLatch inBlocking = new OneShotLatch();
-
-        @Override
-        public void collect(T element) {
-            block();
-        }
-
-        @Override
-        public void collectWithTimestamp(T element, long timestamp) {
-            block();
-        }
-
-        @Override
-        public void emitWatermark(Watermark mark) {
-            block();
-        }
-
-        @Override
-        public Object getCheckpointLock() {
-            return new Object();
-        }
-
-        @Override
-        public void close() {}
-
-        public void waitTillHasBlocker() throws InterruptedException {
-            inBlocking.await();
-        }
-
-        public boolean isStillBlocking() {
-            return lock.isLocked();
-        }
-
-        @SuppressWarnings({"InfiniteLoopStatement", 
"SynchronizationOnLocalVariableOrMethodParameter"})
-        private void block() {
-            lock.lock();
-            try {
-                inBlocking.trigger();
-
-                // put this thread to sleep indefinitely
-                final Object o = new Object();
-                while (true) {
-                    synchronized (o) {
-                        o.wait();
-                    }
-                }
-            }
-            catch (InterruptedException e) {
-                // exit cleanly, simply reset the interruption flag
-                Thread.currentThread().interrupt();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
deleted file mode 100644
index 08511c9..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-
-public class Kafka010ITCase extends KafkaConsumerTestBase {
-
-       // 
------------------------------------------------------------------------
-       //  Suite of Tests
-       // 
------------------------------------------------------------------------
-
-
-       @Test(timeout = 60000)
-       public void testFailOnNoBroker() throws Exception {
-               runFailOnNoBrokerTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testConcurrentProducerConsumerTopology() throws Exception {
-               runSimpleConcurrentProducerConsumerTopology();
-       }
-
-       @Test(timeout = 60000)
-       public void testKeyValueSupport() throws Exception {
-               runKeyValueTest();
-       }
-
-       // --- canceling / failures ---
-
-       @Test(timeout = 60000)
-       public void testCancelingEmptyTopic() throws Exception {
-               runCancelingOnEmptyInputTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testCancelingFullTopic() throws Exception {
-               runCancelingOnFullInputTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testFailOnDeploy() throws Exception {
-               runFailOnDeployTest();
-       }
-
-
-       // --- source to partition mappings and exactly once ---
-
-       @Test(timeout = 60000)
-       public void testOneToOneSources() throws Exception {
-               runOneToOneExactlyOnceTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testOneSourceMultiplePartitions() throws Exception {
-               runOneSourceMultiplePartitionsExactlyOnceTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testMultipleSourcesOnePartition() throws Exception {
-               runMultipleSourcesOnePartitionExactlyOnceTest();
-       }
-
-       // --- broker failure ---
-
-       @Test(timeout = 60000)
-       public void testBrokerFailure() throws Exception {
-               runBrokerFailureTest();
-       }
-
-       // --- special executions ---
-
-       @Test(timeout = 60000)
-       public void testBigRecordJob() throws Exception {
-               runBigRecordTestTopology();
-       }
-
-       @Test(timeout = 60000)
-       public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
-       }
-
-       @Test(timeout = 60000)
-       public void testAllDeletes() throws Exception {
-               runAllDeletesTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testMetricsAndEndOfStream() throws Exception {
-               runEndOfStreamTest();
-       }
-
-       // --- offset committing ---
-
-       @Test(timeout = 60000)
-       public void testCommitOffsetsToKafka() throws Exception {
-               runCommitOffsetsToKafka();
-       }
-
-       @Test(timeout = 60000)
-       public void testStartFromKafkaCommitOffsets() throws Exception {
-               runStartFromKafkaCommitOffsets();
-       }
-
-       @Test(timeout = 60000)
-       public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
-               runAutoOffsetRetrievalAndCommitToKafka();
-       }
-
-       /**
-        * Kafka 0.10 specific test, ensuring Timestamps are properly written 
to and read from Kafka
-        */
-       @Test(timeout = 60000)
-       public void testTimestamps() throws Exception {
-
-               final String topic = "tstopic";
-               createTestTopic(topic, 3, 1);
-
-               // ---------- Produce an event time stream into Kafka 
-------------------
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(1);
-               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env.getConfig().disableSysoutLogging();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-               DataStream<Long> streamWithTimestamps = env.addSource(new 
SourceFunction<Long>() {
-                       boolean running = true;
-
-                       @Override
-                       public void run(SourceContext<Long> ctx) throws 
Exception {
-                               long i = 0;
-                               while(running) {
-                                       ctx.collectWithTimestamp(i, i*2);
-                                       if(i++ == 1000L) {
-                                               running = false;
-                                       }
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               });
-
-               final TypeInformationSerializationSchema<Long> longSer = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), 
env.getConfig());
-               FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, 
new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new 
KafkaPartitioner<Long>() {
-                       @Override
-                       public int partition(Long next, byte[] serializedKey, 
byte[] serializedValue, int numPartitions) {
-                               return (int)(next % 3);
-                       }
-               });
-               prod.setParallelism(3);
-               prod.setWriteTimestampToKafka(true);
-               env.execute("Produce some");
-
-               // ---------- Consume stream from Kafka -------------------
-
-               env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.setParallelism(1);
-               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env.getConfig().disableSysoutLogging();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-               FlinkKafkaConsumer010<Long> kafkaSource = new 
FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps);
-               kafkaSource.assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Long>() {
-                       @Nullable
-                       @Override
-                       public Watermark checkAndGetNextWatermark(Long 
lastElement, long extractedTimestamp) {
-                               if(lastElement % 10 == 0) {
-                                       return new Watermark(lastElement);
-                               }
-                               return null;
-                       }
-
-                       @Override
-                       public long extractTimestamp(Long element, long 
previousElementTimestamp) {
-                               return previousElementTimestamp;
-                       }
-               });
-
-               DataStream<Long> stream = env.addSource(kafkaSource);
-               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
-               stream.transform("timestamp validating operator", 
objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
-
-               env.execute("Consume again");
-
-               deleteTestTopic(topic);
-       }
-
-       private static class TimestampValidatingOperator extends 
StreamSink<Long> {
-
-               public TimestampValidatingOperator() {
-                       super(new SinkFunction<Long>() {
-                               @Override
-                               public void invoke(Long value) throws Exception 
{
-                                       throw new 
RuntimeException("Unexpected");
-                               }
-                       });
-               }
-
-               long elCount = 0;
-               long wmCount = 0;
-               long lastWM = Long.MIN_VALUE;
-
-               @Override
-               public void processElement(StreamRecord<Long> element) throws 
Exception {
-                       elCount++;
-                       if(element.getValue() * 2 != element.getTimestamp()) {
-                               throw new RuntimeException("Invalid timestamp: 
" + element);
-                       }
-               }
-
-               @Override
-               public void processWatermark(Watermark mark) throws Exception {
-                       wmCount++;
-
-                       if(lastWM <= mark.getTimestamp()) {
-                               lastWM = mark.getTimestamp();
-                       } else {
-                               throw new RuntimeException("Received watermark 
higher than the last one");
-                       }
-
-                       if( mark.getTimestamp() % 10 != 0 && 
mark.getTimestamp() != Long.MAX_VALUE ) {
-                               throw new RuntimeException("Invalid watermark: 
" + mark.getTimestamp());
-                       }
-               }
-
-               @Override
-               public void close() throws Exception {
-                       super.close();
-                       if(elCount != 1000L) {
-                               throw new RuntimeException("Wrong final element 
count " + elCount);
-                       }
-
-                       if(wmCount <= 2) {
-                               throw new RuntimeException("Almost no 
watermarks have been sent " + wmCount);
-                       }
-               }
-       }
-
-       private static class LimitedLongDeserializer implements 
KeyedDeserializationSchema<Long> {
-
-               private final TypeInformation<Long> ti;
-               private final TypeSerializer<Long> ser;
-               long cnt = 0;
-
-               public LimitedLongDeserializer() {
-                       this.ti = TypeInfoParser.parse("Long");
-                       this.ser = ti.createSerializer(new ExecutionConfig());
-               }
-               @Override
-               public TypeInformation<Long> getProducedType() {
-                       return ti;
-               }
-
-               @Override
-               public Long deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
-                       cnt++;
-                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
-                       Long e = ser.deserialize(in);
-                       return e;
-               }
-
-               @Override
-               public boolean isEndOfStream(Long nextElement) {
-                       return cnt > 1000L;
-               }
-       }
-
-}

Reply via email to