http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
deleted file mode 100644
index 42b9682..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.junit.Test;
-
-
-@SuppressWarnings("serial")
-public class Kafka010ProducerITCase extends KafkaProducerTestBase {
-
-       @Test
-       public void testCustomPartitioning() {
-               runCustomPartitioningTest();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
deleted file mode 100644
index f15fd45..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * An implementation of the KafkaServerProvider for Kafka 0.10
- */
-public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
-       private File tmpZkDir;
-       private File tmpKafkaParent;
-       private List<File> tmpKafkaDirs;
-       private List<KafkaServer> brokers;
-       private TestingServer zookeeper;
-       private String zookeeperConnectionString;
-       private String brokerConnectionString = "";
-       private Properties standardProps;
-       private Properties additionalServerProperties;
-       private boolean secureMode = false;
-       // 6 seconds is default. Seems to be too small for travis. 30 seconds
-       private int zkTimeout = 30000;
-
-       public String getBrokerConnectionString() {
-               return brokerConnectionString;
-       }
-
-       @Override
-       public Properties getStandardProperties() {
-               return standardProps;
-       }
-
-       @Override
-       public Properties getSecureProperties() {
-               Properties prop = new Properties();
-               if(secureMode) {
-                       prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
-                       prop.put("security.protocol", "SASL_PLAINTEXT");
-                       prop.put("sasl.kerberos.service.name", "kafka");
-
-                       //add special timeout for Travis
-                       prop.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
-                       prop.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
-                       prop.setProperty("metadata.fetch.timeout.ms","120000");
-               }
-               return prop;
-       }
-
-       @Override
-       public String getVersion() {
-               return "0.10";
-       }
-
-       @Override
-       public List<KafkaServer> getBrokers() {
-               return brokers;
-       }
-
-       @Override
-       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
-               return new FlinkKafkaConsumer010<>(topics, readSchema, props);
-       }
-
-       @Override
-       public <T> StreamSink<T> getProducerSink(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
-               FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
-               prod.setFlushOnCheckpoint(true);
-               return new StreamSink<>(prod);
-       }
-
-
-       @Override
-       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
-               FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
-               prod.setFlushOnCheckpoint(true);
-               return stream.addSink(prod);
-       }
-
-       @Override
-       public KafkaOffsetHandler createOffsetHandler(Properties props) {
-               return new KafkaOffsetHandlerImpl(props);
-       }
-
-       @Override
-       public void restartBroker(int leaderId) throws Exception {
-               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
-       }
-
-       @Override
-       public int getLeaderToShutDown(String topic) throws Exception {
-               ZkUtils zkUtils = getZkUtils();
-               try {
-                       MetadataResponse.PartitionMetadata firstPart = null;
-                       do {
-                               if (firstPart != null) {
-                                       LOG.info("Unable to find leader. error 
code {}", firstPart.error().code());
-                                       // not the first try. Sleep a bit
-                                       Thread.sleep(150);
-                               }
-
-                               List<MetadataResponse.PartitionMetadata> 
partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, 
zkUtils).partitionMetadata();
-                               firstPart = partitionMetadata.get(0);
-                       }
-                       while (firstPart.error().code() != 0);
-
-                       return firstPart.leader().id();
-               } finally {
-                       zkUtils.close();
-               }
-       }
-
-       @Override
-       public int getBrokerId(KafkaServer server) {
-               return server.config().brokerId();
-       }
-
-       @Override
-       public boolean isSecureRunSupported() {
-               return true;
-       }
-
-       @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
-               //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
-               if(secureMode) {
-                       //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
-                       numKafkaServers = 1;
-                       zkTimeout = zkTimeout * 15;
-               }
-
-               this.additionalServerProperties = additionalServerProperties;
-               this.secureMode = secureMode;
-               File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
-               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
-               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
-
-               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + 
(UUID.randomUUID().toString()));
-               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
-
-               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-               for (int i = 0; i < numKafkaServers; i++) {
-                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
-                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
-                       tmpKafkaDirs.add(tmpDir);
-               }
-
-               zookeeper = null;
-               brokers = null;
-
-               try {
-                       zookeeper = new TestingServer(- 1, tmpZkDir);
-                       zookeeperConnectionString = 
zookeeper.getConnectString();
-                       LOG.info("Starting Zookeeper with 
zookeeperConnectionString: {}", zookeeperConnectionString);
-
-                       LOG.info("Starting KafkaServer");
-                       brokers = new ArrayList<>(numKafkaServers);
-
-                       for (int i = 0; i < numKafkaServers; i++) {
-                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
-
-                               if(secureMode) {
-                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
-                               } else {
-                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
-                               }
-                       }
-
-                       LOG.info("ZK and KafkaServer started.");
-               }
-               catch (Throwable t) {
-                       t.printStackTrace();
-                       fail("Test setup failed: " + t.getMessage());
-               }
-
-               standardProps = new Properties();
-               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
-               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
-               standardProps.setProperty("group.id", "flink-tests");
-               standardProps.setProperty("enable.auto.commit", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
-               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.10 value)
-               standardProps.setProperty("max.partition.fetch.bytes", "256"); 
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
-       }
-
-       @Override
-       public void shutdown() {
-               for (KafkaServer broker : brokers) {
-                       if (broker != null) {
-                               broker.shutdown();
-                       }
-               }
-               brokers.clear();
-
-               if (zookeeper != null) {
-                       try {
-                               zookeeper.stop();
-                       }
-                       catch (Exception e) {
-                               LOG.warn("ZK.stop() failed", e);
-                       }
-                       zookeeper = null;
-               }
-
-               // clean up the temp spaces
-
-               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpKafkaParent);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-               if (tmpZkDir != null && tmpZkDir.exists()) {
-                       try {
-                               FileUtils.deleteDirectory(tmpZkDir);
-                       }
-                       catch (Exception e) {
-                               // ignore
-                       }
-               }
-       }
-
-       public ZkUtils getZkUtils() {
-               ZkClient creator = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
-               return ZkUtils.apply(creator, false);
-       }
-
-       @Override
-       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor, Properties topicConfig) {
-               // create topic with one client
-               LOG.info("Creating topic {}", topic);
-
-               ZkUtils zkUtils = getZkUtils();
-               try {
-                       AdminUtils.createTopic(zkUtils, topic, 
numberOfPartitions, replicationFactor, topicConfig, 
kafka.admin.RackAwareMode.Enforced$.MODULE$);
-               } finally {
-                       zkUtils.close();
-               }
-
-               // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 30000;
-               do {
-                       try {
-                               if(secureMode) {
-                                       //increase wait time since in Travis ZK 
timeout occurs frequently
-                                       int wait = zkTimeout / 100;
-                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
-                                       Thread.sleep(wait);
-                               } else {
-                                       Thread.sleep(100);
-                               }
-                       } catch (InterruptedException e) {
-                               // restore interrupted state
-                       }
-                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
-                       // not always correct.
-
-                       // create a new ZK utils connection
-                       ZkUtils checkZKConn = getZkUtils();
-                       if(AdminUtils.topicExists(checkZKConn, topic)) {
-                               checkZKConn.close();
-                               return;
-                       }
-                       checkZKConn.close();
-               }
-               while (System.currentTimeMillis() < deadline);
-               fail("Test topic could not be created");
-       }
-
-       @Override
-       public void deleteTestTopic(String topic) {
-               ZkUtils zkUtils = getZkUtils();
-               try {
-                       LOG.info("Deleting topic {}", topic);
-
-                       ZkClient zk = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
-                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
-
-                       AdminUtils.deleteTopic(zkUtils, topic);
-
-                       zk.close();
-               } finally {
-                       zkUtils.close();
-               }
-       }
-
-       /**
-        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-        */
-       protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) 
throws Exception {
-               Properties kafkaProperties = new Properties();
-
-               // properties have to be Strings
-               kafkaProperties.put("advertised.host.name", KAFKA_HOST);
-               kafkaProperties.put("broker.id", Integer.toString(brokerId));
-               kafkaProperties.put("log.dir", tmpFolder.toString());
-               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
-               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
-               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
-
-               // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
-               kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
-               if(additionalServerProperties != null) {
-                       kafkaProperties.putAll(additionalServerProperties);
-               }
-
-               final int numTries = 5;
-
-               for (int i = 1; i <= numTries; i++) {
-                       int kafkaPort = NetUtils.getAvailablePort();
-                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
-
-                       //to support secure kafka cluster
-                       if(secureMode) {
-                               LOG.info("Adding Kafka secure configurations");
-                               kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-                               kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-                               kafkaProperties.putAll(getSecureProperties());
-                       }
-
-                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
-
-                       try {
-                               scala.Option<String> stringNone = 
scala.Option.apply(null);
-                               KafkaServer server = new 
KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
-                               server.startup();
-                               return server;
-                       }
-                       catch (KafkaException e) {
-                               if (e.getCause() instanceof BindException) {
-                                       // port conflict, retry...
-                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
-                               }
-                               else {
-                                       throw e;
-                               }
-                       }
-               }
-
-               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
-       }
-
-       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
-
-               private final KafkaConsumer<byte[], byte[]> offsetClient;
-
-               public KafkaOffsetHandlerImpl(Properties props) {
-                       offsetClient = new KafkaConsumer<>(props);
-               }
-
-               @Override
-               public Long getCommittedOffset(String topicName, int partition) 
{
-                       OffsetAndMetadata committed = 
offsetClient.committed(new TopicPartition(topicName, partition));
-                       return (committed != null) ? committed.offset() : null;
-               }
-
-               @Override
-               public void close() {
-                       offsetClient.close();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
deleted file mode 100644
index fbeb110..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.zookeeper=OFF, testlogger
-log4j.logger.state.change.logger=OFF, testlogger
-log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
deleted file mode 100644
index f17f9ae..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ /dev/null
@@ -1,219 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
-       <name>flink-connector-kafka-0.8</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <kafka.version>0.8.2.2</kafka.version>
-       </properties>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-curator-recipes</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-                       <!-- Projects depending on this project,
-                       won't depend on flink-table. -->
-                       <optional>true</optional>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka_${scala.binary.version}</artifactId>
-                       <version>${kafka.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>com.sun.jmx</groupId>
-                                       <artifactId>jmxri</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.sun.jdmk</groupId>
-                                       <artifactId>jmxtools</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>log4j</groupId>
-                                       <artifactId>log4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       <artifactId>slf4j-simple</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>net.sf.jopt-simple</groupId>
-                                       <artifactId>jopt-simple</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.scala-lang</groupId>
-                                       <artifactId>scala-reflect</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.scala-lang</groupId>
-                                       <artifactId>scala-compiler</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.yammer.metrics</groupId>
-                                       
<artifactId>metrics-annotation</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.xerial.snappy</groupId>
-                                       <artifactId>snappy-java</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.curator</groupId>
-                       <artifactId>curator-test</artifactId>
-                       <version>${curator.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-jmx</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-kafka-base_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-       </dependencies>
-
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-surefire-plugin</artifactId>
-                               <configuration>
-                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
-                                       <forkCount>1</forkCount>
-                               </configuration>
-                       </plugin>
-                       <!-- Relocate curator -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>shade-flink</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>shade</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <artifactSet>
-                                                               <includes 
combine.children="append">
-                                                                       
<include>org.apache.flink:flink-shaded-curator-recipes</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <relocations 
combine.children="append">
-                                                               <relocation>
-                                                                       
<pattern>org.apache.curator</pattern>
-                                                                       
<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
-                                                               </relocation>
-                                                       </relocations>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-       
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
deleted file mode 100644
index 0aacccd..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.api.OffsetRequest;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.PropertiesUtil;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-
-import static org.apache.flink.util.PropertiesUtil.getInt;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
- * Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, 
each of which will pull
- * data from one or more Kafka partitions. 
- * 
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
- * during a failure, and that the computation processes elements "exactly 
once". 
- * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
- * 
- * <p>Flink's Kafka Consumer is designed to be compatible with Kafka's 
High-Level Consumer API (0.8.x).
- * Most of Kafka's configuration variables can be used with this consumer as 
well:
- *         <ul>
- *             <li>socket.timeout.ms</li>
- *             <li>socket.receive.buffer.bytes</li>
- *             <li>fetch.message.max.bytes</li>
- *             <li>auto.offset.reset with the values "largest", "smallest"</li>
- *             <li>fetch.wait.max.ms</li>
- *         </ul>
- *     </li>
- * </ul>
- * 
- * <h1>Offset handling</h1>
- * 
- * <p>Offsets whose records have been read and are checkpointed will be 
committed back to ZooKeeper
- * by the offset handler. In addition, the offset handler finds the point 
where the source initially
- * starts reading from the stream, when the streaming job is started.</p>
- *
- * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
- * has consumed a topic.</p>
- *
- * <p>If checkpointing is disabled, the consumer will periodically commit the 
current offset
- * to Zookeeper.</p>
- *
- * <p>When using a Kafka topic to send data between Flink jobs, we recommend 
using the
- * {@see TypeInformationSerializationSchema} and {@see 
TypeInformationKeyValueSerializationSchema}.</p>
- * 
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata 
when the consumer
- * is constructed. That means that the client that submits the program needs 
to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
-
-       private static final long serialVersionUID = -6272159445203409112L;
-
-       /** Configuration key for the number of retries for getting the 
partition info */
-       public static final String GET_PARTITIONS_RETRIES_KEY = 
"flink.get-partitions.retry";
-
-       /** Default number of retries for getting the partition info. One retry 
means going through the full list of brokers */
-       public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
-
-       // 
------------------------------------------------------------------------
-
-       /** The properties to parametrize the Kafka consumer and ZooKeeper 
client */ 
-       private final Properties kafkaProperties;
-
-       /** The behavior when encountering an invalid offset (see {@link 
OffsetRequest}) */
-       private final long invalidOffsetBehavior;
-
-       /** The interval in which to automatically commit (-1 if deactivated) */
-       private final long autoCommitInterval;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param valueDeserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
-               this(Collections.singletonList(topic), valueDeserializer, 
props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
-        *
-        * This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
-        * pairs, offsets, and topic names from Kafka.
-        *
-        * @param topic
-        *           The name of the topic that should be consumed.
-        * @param deserializer
-        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
-        * @param props
-        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
-        */
-       public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> 
deserializer, Properties props) {
-               this(Collections.singletonList(topic), deserializer, props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
-        *
-        * This constructor allows passing multiple topics to the consumer.
-        *
-        * @param topics
-        *           The Kafka topics to read from.
-        * @param deserializer
-        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        */
-       public FlinkKafkaConsumer08(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
-               this(topics, new 
KeyedDeserializationSchemaWrapper<>(deserializer), props);
-       }
-
-       /**
-        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
-        *
-        * This constructor allows passing multiple topics and a key/value 
deserialization schema.
-        * 
-        * @param topics
-        *           The Kafka topics to read from.
-        * @param deserializer
-        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
-        */
-       public FlinkKafkaConsumer08(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(topics, deserializer);
-
-               checkNotNull(topics, "topics");
-               this.kafkaProperties = checkNotNull(props, "props");
-
-               // validate the zookeeper properties
-               validateZooKeeperConfig(props);
-
-               this.invalidOffsetBehavior = getInvalidOffsetBehavior(props);
-               this.autoCommitInterval = PropertiesUtil.getLong(props, 
"auto.commit.interval.ms", 60000);
-       }
-
-       @Override
-       protected AbstractFetcher<T, ?> createFetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> thisSubtaskPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext) throws 
Exception {
-
-               boolean useMetrics = 
!Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false"));
-
-               return new Kafka08Fetcher<>(sourceContext, 
thisSubtaskPartitions,
-                               watermarksPeriodic, watermarksPunctuated,
-                               runtimeContext, deserializer, kafkaProperties,
-                               invalidOffsetBehavior, autoCommitInterval, 
useMetrics);
-       }
-
-       @Override
-       protected List<KafkaTopicPartition> getKafkaPartitions(List<String> 
topics) {
-               // Connect to a broker to get the partitions for all topics
-               List<KafkaTopicPartition> partitionInfos =
-                       
KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, 
kafkaProperties));
-
-               if (partitionInfos.size() == 0) {
-                       throw new RuntimeException(
-                               "Unable to retrieve any partitions for the 
requested topics " + topics +
-                                       ". Please check previous log entries");
-               }
-
-               if (LOG.isInfoEnabled()) {
-                       logPartitionInfo(LOG, partitionInfos);
-               }
-
-               return partitionInfos;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Kafka / ZooKeeper communication utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Send request to Kafka to get partitions for topic.
-        * 
-        * @param topics The name of the topics.
-        * @param properties The properties for the Kafka Consumer that is used 
to query the partitions for the topic. 
-        */
-       public static List<KafkaTopicPartitionLeader> 
getPartitionsForTopic(List<String> topics, Properties properties) {
-               String seedBrokersConfString = 
properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-               final int numRetries = getInt(properties, 
GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-               
-               checkNotNull(seedBrokersConfString, "Configuration property %s 
not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-               String[] seedBrokers = seedBrokersConfString.split(",");
-               List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
-
-               final String clientId = "flink-kafka-consumer-partition-lookup";
-               final int soTimeout = getInt(properties, "socket.timeout.ms", 
30000);
-               final int bufferSize = getInt(properties, 
"socket.receive.buffer.bytes", 65536);
-
-               Random rnd = new Random();
-               retryLoop: for (int retry = 0; retry < numRetries; retry++) {
-                       // we pick a seed broker randomly to avoid overloading 
the first broker with all the requests when the
-                       // parallel source instances start. Still, we try all 
available brokers.
-                       int index = rnd.nextInt(seedBrokers.length);
-                       brokersLoop: for (int arrIdx = 0; arrIdx < 
seedBrokers.length; arrIdx++) {
-                               String seedBroker = seedBrokers[index];
-                               LOG.info("Trying to get topic metadata from 
broker {} in try {}/{}", seedBroker, retry, numRetries);
-                               if (++index == seedBrokers.length) {
-                                       index = 0;
-                               }
-
-                               URL brokerUrl = 
NetUtils.getCorrectHostnamePort(seedBroker);
-                               SimpleConsumer consumer = null;
-                               try {
-                                       consumer = new 
SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, 
clientId);
-
-                                       TopicMetadataRequest req = new 
TopicMetadataRequest(topics);
-                                       kafka.javaapi.TopicMetadataResponse 
resp = consumer.send(req);
-
-                                       List<TopicMetadata> metaData = 
resp.topicsMetadata();
-
-                                       // clear in case we have an incomplete 
list from previous tries
-                                       partitions.clear();
-                                       for (TopicMetadata item : metaData) {
-                                               if (item.errorCode() != 
ErrorMapping.NoError()) {
-                                                       // warn and try more 
brokers
-                                                       LOG.warn("Error while 
getting metadata from broker " + seedBroker + " to find partitions " +
-                                                                       "for " 
+ topics.toString() + ". Error: " + 
ErrorMapping.exceptionFor(item.errorCode()).getMessage());
-                                                       continue brokersLoop;
-                                               }
-                                               if 
(!topics.contains(item.topic())) {
-                                                       LOG.warn("Received 
metadata from topic " + item.topic() + " even though it was not requested. 
Skipping ...");
-                                                       continue brokersLoop;
-                                               }
-                                               for (PartitionMetadata part : 
item.partitionsMetadata()) {
-                                                       Node leader = 
brokerToNode(part.leader());
-                                                       KafkaTopicPartition ktp 
= new KafkaTopicPartition(item.topic(), part.partitionId());
-                                                       
KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
-                                                       partitions.add(pInfo);
-                                               }
-                                       }
-                                       break retryLoop; // leave the loop 
through the brokers
-                               } catch (Exception e) {
-                                       //validates seed brokers in case of a 
ClosedChannelException
-                                       validateSeedBrokers(seedBrokers, e);
-                                       LOG.warn("Error communicating with 
broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
-                                                       "" + e.getClass() + ". 
Message: " + e.getMessage());
-                                       LOG.debug("Detailed trace", e);
-                                       // we sleep a bit. Retrying immediately 
doesn't make sense in cases where Kafka is reorganizing the leader metadata
-                                       try {
-                                               Thread.sleep(500);
-                                       } catch (InterruptedException e1) {
-                                               // sleep shorter.
-                                       }
-                               } finally {
-                                       if (consumer != null) {
-                                               consumer.close();
-                                       }
-                               }
-                       } // brokers loop
-               } // retries loop
-               return partitions;
-       }
-
-       /**
-        * Turn a broker instance into a node instance
-        * @param broker broker instance
-        * @return Node representing the given broker
-        */
-       private static Node brokerToNode(Broker broker) {
-               return new Node(broker.id(), broker.host(), broker.port());
-       }
-
-       /**
-        * Validate the ZK configuration, checking for required parameters
-        * @param props Properties to check
-        */
-       protected static void validateZooKeeperConfig(Properties props) {
-               if (props.getProperty("zookeeper.connect") == null) {
-                       throw new IllegalArgumentException("Required property 
'zookeeper.connect' has not been set in the properties");
-               }
-               if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-                       throw new IllegalArgumentException("Required property 
'" + ConsumerConfig.GROUP_ID_CONFIG
-                                       + "' has not been set in the 
properties");
-               }
-               
-               try {
-                       //noinspection ResultOfMethodCallIgnored
-                       
Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
-               }
-               catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Property 
'zookeeper.session.timeout.ms' is not a valid integer");
-               }
-               
-               try {
-                       //noinspection ResultOfMethodCallIgnored
-                       
Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
-               }
-               catch (NumberFormatException e) {
-                       throw new IllegalArgumentException("Property 
'zookeeper.connection.timeout.ms' is not a valid integer");
-               }
-       }
-
-       /**
-        * Validate that at least one seed broker is valid in case of a
-        * ClosedChannelException.
-        * 
-        * @param seedBrokers
-        *            array containing the seed brokers e.g. ["host1:port1",
-        *            "host2:port2"]
-        * @param exception
-        *            instance
-        */
-       private static void validateSeedBrokers(String[] seedBrokers, Exception 
exception) {
-               if (!(exception instanceof ClosedChannelException)) {
-                       return;
-               }
-               int unknownHosts = 0;
-               for (String broker : seedBrokers) {
-                       URL brokerUrl = 
NetUtils.getCorrectHostnamePort(broker.trim());
-                       try {
-                               InetAddress.getByName(brokerUrl.getHost());
-                       } catch (UnknownHostException e) {
-                               unknownHosts++;
-                       }
-               }
-               // throw meaningful exception if all the provided hosts are 
invalid
-               if (unknownHosts == seedBrokers.length) {
-                       throw new IllegalArgumentException("All the servers 
provided in: '"
-                                       + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown 
hosts)");
-               }
-       }
-
-       private static long getInvalidOffsetBehavior(Properties config) {
-               final String val = 
config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
-               if (val.equals("none")) {
-                       throw new IllegalArgumentException("Cannot use '" + 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
-                                       + "' value 'none'. Possible values: 
'latest', 'largest', or 'earliest'.");
-               }
-               else if (val.equals("largest") || val.equals("latest")) { // 
largest is kafka 0.8, latest is kafka 0.9
-                       return OffsetRequest.LatestTime();
-               } else {
-                       return OffsetRequest.EarliestTime();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
deleted file mode 100644
index 56ccd0b..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
- */
-@Deprecated
-public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> {
-
-       private static final long serialVersionUID = -5649906773771949146L;
-
-       /**
-        * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
-        */
-       @Deprecated
-       public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
-               super(topic, valueDeserializer, props);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
deleted file mode 100644
index 0520336..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
- */
-@Deprecated
-public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> {
-
-       private static final long serialVersionUID = -5649906773771949146L;
-
-       /**
-        * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
-        */
-       @Deprecated
-       public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
-               super(topic, valueDeserializer, props);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
deleted file mode 100644
index 1c2e0b7..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import java.util.Properties;
-
-
-/**
- * THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead.
- */
-@Deprecated
-public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
-
-       @Deprecated
-       public FlinkKafkaProducer(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), null);
-       }
-
-       @Deprecated
-       public FlinkKafkaProducer(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
-       }
-
-       @Deprecated
-       public FlinkKafkaProducer(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner 
customPartitioner) {
-               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-
-       }
-
-       @Deprecated
-       public FlinkKafkaProducer(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               super(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), null);
-       }
-
-       @Deprecated
-       public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               super(topicId, serializationSchema, producerConfig, null);
-       }
-
-       @Deprecated
-       public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner 
customPartitioner) {
-               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
deleted file mode 100644
index 65de5fc..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.8.
- *
- * Please note that this producer does not have any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
-
-       private static final long serialVersionUID = 1L;
-
-       // ------------------- Keyless serialization schema constructors 
----------------------
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        */
-       public FlinkKafkaProducer08(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
-       }
-
-       /**
-        * The main constructor for creating a FlinkKafkaProducer.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
-        */
-       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-
-       }
-
-       // ------------------- Key/Value serialization schema constructors 
----------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param brokerList
-        *                      Comma separated addresses of the brokers
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        */
-       public FlinkKafkaProducer08(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
-        * the topic.
-        *
-        * @param topicId
-        *                      ID of the Kafka topic.
-        * @param serializationSchema
-        *                      User defined serialization schema supporting 
key/value messages
-        * @param producerConfig
-        *                      Properties with the producer configuration.
-        */
-       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
-       }
-
-       /**
-        * The main constructor for creating a FlinkKafkaProducer.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
-        */
-       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
-               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
-       }
-
-       @Override
-       protected void flush() {
-               // The Kafka 0.8 producer doesn't support flushing, we wait here
-               // until all pending records are confirmed
-               synchronized (pendingRecordsLock) {
-                       while (pendingRecords > 0) {
-                               try {
-                                       pendingRecordsLock.wait();
-                               } catch (InterruptedException e) {
-                                       // this can be interrupted when the 
Task has been cancelled.
-                                       // by throwing an exception, we ensure 
that this checkpoint doesn't get confirmed
-                                       throw new RuntimeException("Flushing 
got interrupted while checkpointing", e);
-                               }
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
deleted file mode 100644
index b155576..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
- */
-public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
-       /**
-        * Creates {@link KafkaTableSink} for Kafka 0.8
-        *
-        * @param topic topic in Kafka to which table is written
-        * @param properties properties to connect to Kafka
-        * @param partitioner Kafka partitioner
-        */
-       public Kafka08JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
-               super(topic, properties, partitioner);
-       }
-
-       @Override
-       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
-               return new FlinkKafkaProducer08<>(topic, serializationSchema, 
properties, partitioner);
-       }
-
-       @Override
-       protected Kafka08JsonTableSink createCopy() {
-               return new Kafka08JsonTableSink(topic, properties, partitioner);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
deleted file mode 100644
index 63bb57e..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.8.
- */
-public class Kafka08JsonTableSource extends KafkaJsonTableSource {
-
-       /**
-        * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka08JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka08JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
-               return new FlinkKafkaConsumer08<>(topic, deserializationSchema, 
properties);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
deleted file mode 100644
index 8f51237..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Kafka {@link StreamTableSource} for Kafka 0.8.
- */
-public class Kafka08TableSource extends KafkaTableSource {
-
-       /**
-        * Creates a Kafka 0.8 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka08TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.8 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka08TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
-               return new FlinkKafkaConsumer08<>(topic, deserializationSchema, 
properties);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
deleted file mode 100644
index 23ff276..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A special form of blocking queue with two additions:
- * <ol>
- *     <li>The queue can be closed atomically when empty. Adding elements 
after the queue
- *         is closed fails. This allows queue consumers to atomically discover 
that no elements
- *         are available and mark themselves as shut down.</li>
- *     <li>The queue allows to poll batches of elements in one polling 
call.</li>
- * </ol>
- * 
- * The queue has no capacity restriction and is safe for multiple producers 
and consumers.
- * 
- * <p>Note: Null elements are prohibited.
- * 
- * @param <E> The type of elements in the queue.
- */
-public class ClosableBlockingQueue<E> {
-
-       /** The lock used to make queue accesses and open checks atomic */
-       private final ReentrantLock lock;
-       
-       /** The condition on which blocking get-calls wait if the queue is 
empty */
-       private final Condition nonEmpty;
-       
-       /** The deque of elements */
-       private final ArrayDeque<E> elements;
-       
-       /** Flag marking the status of the queue */
-       private volatile boolean open;
-       
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new empty queue.
-        */
-       public ClosableBlockingQueue() {
-               this(10);
-       }
-
-       /**
-        * Creates a new empty queue, reserving space for at least the 
specified number
-        * of elements. The queu can still grow, of more elements are added 
than the
-        * reserved space.
-        * 
-        * @param initialSize The number of elements to reserve space for.
-        */
-       public ClosableBlockingQueue(int initialSize) {
-               this.lock = new ReentrantLock(true);
-               this.nonEmpty = this.lock.newCondition();
-               
-               this.elements = new ArrayDeque<>(initialSize);
-               this.open = true;
-               
-               
-       }
-
-       /**
-        * Creates a new queue that contains the given elements.
-        * 
-        * @param initialElements The elements to initially add to the queue.
-        */
-       public ClosableBlockingQueue(Collection<? extends E> initialElements) {
-               this(initialElements.size());
-               this.elements.addAll(initialElements);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Size and status
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the number of elements currently in the queue.
-        * @return The number of elements currently in the queue.
-        */
-       public int size() {
-               lock.lock();
-               try {
-                       return elements.size();
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Checks whether the queue is empty (has no elements).
-        * @return True, if the queue is empty; false, if it is non-empty.
-        */
-       public boolean isEmpty() {
-               return size() == 0;
-       }
-
-       /**
-        * Checks whether the queue is currently open, meaning elements can be 
added and polled.
-        * @return True, if the queue is open; false, if it is closed.
-        */
-       public boolean isOpen() {
-               return open;
-       }
-       
-       /**
-        * Tries to close the queue. Closing the queue only succeeds when no 
elements are
-        * in the queue when this method is called. Checking whether the queue 
is empty, and
-        * marking the queue as closed is one atomic operation.
-        *
-        * @return True, if the queue is closed, false if the queue remains 
open.
-        */
-       public boolean close() {
-               lock.lock();
-               try {
-                       if (open) {
-                               if (elements.isEmpty()) {
-                                       open = false;
-                                       nonEmpty.signalAll();
-                                       return true;
-                               } else {
-                                       return false;
-                               }
-                       }
-                       else {
-                               // already closed
-                               return true;
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Adding / Removing elements
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Tries to add an element to the queue, if the queue is still open. 
Checking whether the queue
-        * is open and adding the element is one atomic operation.
-        * 
-        * <p>Unlike the {@link #add(Object)} method, this method never throws 
an exception,
-        * but only indicates via the return code if the element was added or 
the
-        * queue was closed.
-        * 
-        * @param element The element to add.
-        * @return True, if the element was added, false if the queue was 
closes.
-        */
-       public boolean addIfOpen(E element) {
-               requireNonNull(element);
-               
-               lock.lock();
-               try {
-                       if (open) {
-                               elements.addLast(element);
-                               if (elements.size() == 1) {
-                                       nonEmpty.signalAll();
-                               }
-                       }
-                       return open;
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Adds the element to the queue, or fails with an exception, if the 
queue is closed.
-        * Checking whether the queue is open and adding the element is one 
atomic operation.
-        * 
-        * @param element The element to add.
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        */
-       public void add(E element) throws IllegalStateException {
-               requireNonNull(element);
-
-               lock.lock();
-               try {
-                       if (open) {
-                               elements.addLast(element);
-                               if (elements.size() == 1) {
-                                       nonEmpty.signalAll();
-                               }
-                       } else {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Returns the queue's next element without removing it, if the queue 
is non-empty.
-        * Otherwise, returns null. 
-        *
-        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
-        * Checking whether the queue is open and getting the next element is 
one atomic operation.
-        * 
-        * <p>This method never blocks.
-        * 
-        * @return The queue's next element, or null, if the queue is empty.
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        */
-       public E peek() {
-               lock.lock();
-               try {
-                       if (open) {
-                               if (elements.size() > 0) {
-                                       return elements.getFirst();
-                               } else {
-                                       return null;
-                               }
-                       } else {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Returns the queue's next element and removes it, the queue is 
non-empty.
-        * Otherwise, this method returns null. 
-        *
-        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
-        * Checking whether the queue is open and removing the next element is 
one atomic operation.
-        *
-        * <p>This method never blocks.
-        *
-        * @return The queue's next element, or null, if the queue is empty.
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        */
-       public E poll() {
-               lock.lock();
-               try {
-                       if (open) {
-                               if (elements.size() > 0) {
-                                       return elements.removeFirst();
-                               } else {
-                                       return null;
-                               }
-                       } else {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Returns all of the queue's current elements in a list, if the queue 
is non-empty.
-        * Otherwise, this method returns null. 
-        *
-        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
-        * Checking whether the queue is open and removing the elements is one 
atomic operation.
-        *
-        * <p>This method never blocks.
-        *
-        * @return All of the queue's elements, or null, if the queue is empty.
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        */
-       public List<E> pollBatch() {
-               lock.lock();
-               try {
-                       if (open) {
-                               if (elements.size() > 0) {
-                                       ArrayList<E> result = new 
ArrayList<>(elements);
-                                       elements.clear();
-                                       return result;
-                               } else {
-                                       return null;
-                               }
-                       } else {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Returns the next element in the queue. If the queue is empty, this 
method
-        * waits until at least one element is added.
-        * 
-        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
-        * Checking whether the queue is open and removing the next element is 
one atomic operation.
-        * 
-        * @return The next element in the queue, never null.
-        * 
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
-        *                              element to be added.
-        */
-       public E getElementBlocking() throws InterruptedException {
-               lock.lock();
-               try {
-                       while (open && elements.isEmpty()) {
-                               nonEmpty.await();
-                       }
-                       
-                       if (open) {
-                               return elements.removeFirst();
-                       } else {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Returns the next element in the queue. If the queue is empty, this 
method
-        * waits at most a certain time until an element becomes available. If 
no element
-        * is available after that time, the method returns null.
-        * 
-        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
-        * Checking whether the queue is open and removing the next element is 
one atomic operation.
-        * 
-        * @param timeoutMillis The number of milliseconds to block, at most.
-        * @return The next element in the queue, or null, if the timeout 
expires  before an element is available.
-        * 
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
-        *                              element to be added.
-        */
-       public E getElementBlocking(long timeoutMillis) throws 
InterruptedException {
-               if (timeoutMillis == 0L) {
-                       // wait forever case
-                       return getElementBlocking();
-               } else if (timeoutMillis < 0L) {
-                       throw new IllegalArgumentException("invalid timeout");
-               }
-               
-               final long deadline = System.currentTimeMillis() + 
timeoutMillis;
-               
-               lock.lock();
-               try {
-                       while (open && elements.isEmpty() && timeoutMillis > 0) 
{ 
-                               nonEmpty.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
-                               timeoutMillis = deadline - 
System.currentTimeMillis();
-                       }
-                       
-                       if (!open) {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-                       else if (elements.isEmpty()) {
-                               return null;
-                       } else {
-                               return elements.removeFirst();
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Gets all the elements found in the list, or blocks until at least 
one element
-        * was added. If the queue is empty when this method is called, it 
blocks until
-        * at least one element is added.
-        *
-        * <p>This method always returns a list with at least one element.
-        * 
-        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
-        * Checking whether the queue is open and removing the next element is 
one atomic operation.
-        * 
-        * @return A list with all elements in the queue, always at least one 
element.
-        * 
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
-        *                              element to be added.
-        */
-       public List<E> getBatchBlocking() throws InterruptedException {
-               lock.lock();
-               try {
-                       while (open && elements.isEmpty()) {
-                               nonEmpty.await();
-                       }
-                       if (open) {
-                               ArrayList<E> result = new ArrayList<>(elements);
-                               elements.clear();
-                               return result;
-                       } else {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-
-       /**
-        * Gets all the elements found in the list, or blocks until at least 
one element
-        * was added. This method is similar as {@link #getBatchBlocking()}, 
but takes
-        * a number of milliseconds that the method will maximally wait before 
returning.
-        * 
-        * <p>This method never returns null, but an empty list, if the queue 
is empty when
-        * the method is called and the request times out before an element was 
added.
-        * 
-        * <p>The method throws an {@code IllegalStateException} if the queue 
is closed.
-        * Checking whether the queue is open and removing the next element is 
one atomic operation.
-        * 
-        * @param timeoutMillis The number of milliseconds to wait, at most.
-        * @return A list with all elements in the queue, possible an empty 
list.
-        *
-        * @throws IllegalStateException Thrown, if the queue is closed.
-        * @throws InterruptedException Throw, if the thread is interrupted 
while waiting for an
-        *                              element to be added.
-        */
-       public List<E> getBatchBlocking(long timeoutMillis) throws 
InterruptedException {
-               if (timeoutMillis == 0L) {
-                       // wait forever case
-                       return getBatchBlocking();
-               } else if (timeoutMillis < 0L) {
-                       throw new IllegalArgumentException("invalid timeout");
-               }
-
-               final long deadline = System.currentTimeMillis() + 
timeoutMillis;
-
-               lock.lock();
-               try {
-                       while (open && elements.isEmpty() && timeoutMillis > 0) 
{
-                               nonEmpty.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
-                               timeoutMillis = deadline - 
System.currentTimeMillis();
-                       }
-
-                       if (!open) {
-                               throw new IllegalStateException("queue is 
closed");
-                       }
-                       else if (elements.isEmpty()) {
-                               return Collections.emptyList();
-                       }
-                       else {
-                               ArrayList<E> result = new ArrayList<>(elements);
-                               elements.clear();
-                               return result;
-                       }
-               } finally {
-                       lock.unlock();
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Standard Utilities
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               int hashCode = 17;
-               for (E element : elements) {
-                       hashCode = 31 * hashCode + element.hashCode();
-               }
-               return hashCode;
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj == this) {
-                       return true;
-               } else if (obj != null && obj.getClass() == 
ClosableBlockingQueue.class) {
-                       @SuppressWarnings("unchecked")
-                       ClosableBlockingQueue<E> that = 
(ClosableBlockingQueue<E>) obj;
-                       
-                       if (this.elements.size() == that.elements.size()) {
-                               Iterator<E> thisElements = 
this.elements.iterator();
-                               for (E thatNext : that.elements) {
-                                       E thisNext = thisElements.next();
-                                       if (!(thisNext == null ? thatNext == 
null : thisNext.equals(thatNext))) {
-                                               return false;
-                                       }
-                               }
-                               return true;
-                       } else {
-                               return false;
-                       }
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public String toString() {
-               return elements.toString();
-       }
-}

Reply via email to