[ 
https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656443#comment-16656443
 ] 

ASF GitHub Bot commented on FLINK-7964:
---------------------------------------

yanghua closed pull request #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 
connectors
URL: https://github.com/apache/flink/pull/6577
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index b4416e8a209..73a331e6194 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -193,7 +193,7 @@
        /**
         * User defined properties for the Producer.
         */
-       private final Properties producerConfig;
+       protected final Properties producerConfig;
 
        /**
         * The name of the default topic this producer is writing data to.
@@ -239,7 +239,7 @@
        /**
         * Semantic chosen for this instance.
         */
-       private Semantic semantic;
+       protected Semantic semantic;
 
        // -------------------------------- Runtime fields 
------------------------------------------
 
@@ -893,6 +893,10 @@ protected void finishRecoveringContext() {
                LOG.info("Recovered transactionalIds {}", 
getUserContext().get().transactionalIds);
        }
 
+       protected FlinkKafkaProducer createProducer() {
+               return new FlinkKafkaProducer<>(this.producerConfig);
+       }
+
        /**
         * After initialization make sure that all previous transactions from 
the current user context have been completed.
         */
@@ -958,7 +962,7 @@ private void 
recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> pro
        }
 
        private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean 
registerMetrics) {
-               FlinkKafkaProducer<byte[], byte[]> producer = new 
FlinkKafkaProducer<>(this.producerConfig);
+               FlinkKafkaProducer<byte[], byte[]> producer = createProducer();
 
                RuntimeContext ctx = getRuntimeContext();
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 8faff38749f..2f47bf15a62 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -106,10 +106,10 @@
 public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducer.class);
 
-       private final KafkaProducer<K, V> kafkaProducer;
+       protected final KafkaProducer<K, V> kafkaProducer;
 
        @Nullable
-       private final String transactionalId;
+       protected final String transactionalId;
 
        public FlinkKafkaProducer(Properties properties) {
                transactionalId = 
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
@@ -257,7 +257,7 @@ private TransactionalRequestResult enqueueNewPartitions() {
                }
        }
 
-       private static Enum<?> getEnum(String enumFullName) {
+       protected static Enum<?> getEnum(String enumFullName) {
                String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
                if (x.length == 2) {
                        String enumClassName = x[0];
@@ -272,7 +272,7 @@ private TransactionalRequestResult enqueueNewPartitions() {
                return null;
        }
 
-       private static Object invoke(Object object, String methodName, 
Object... args) {
+       protected static Object invoke(Object object, String methodName, 
Object... args) {
                Class<?>[] argTypes = new Class[args.length];
                for (int i = 0; i < args.length; i++) {
                        argTypes[i] = args[i].getClass();
@@ -290,7 +290,7 @@ private static Object invoke(Object object, String 
methodName, Class<?>[] argTyp
                }
        }
 
-       private static Object getValue(Object object, String fieldName) {
+       protected static Object getValue(Object object, String fieldName) {
                return getValue(object, object.getClass(), fieldName);
        }
 
@@ -304,7 +304,7 @@ private static Object getValue(Object object, Class<?> 
clazz, String fieldName)
                }
        }
 
-       private static void setValue(Object object, String fieldName, Object 
value) {
+       protected static void setValue(Object object, String fieldName, Object 
value) {
                try {
                        Field field = 
object.getClass().getDeclaredField(fieldName);
                        field.setAccessible(true);
diff --git a/flink-connectors/flink-connector-kafka-1.0/pom.xml 
b/flink-connectors/flink-connector-kafka-1.0/pom.xml
new file mode 100644
index 00000000000..b82dfefcf38
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-1.0/pom.xml
@@ -0,0 +1,315 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <artifactId>flink-connectors</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.7-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-connector-kafka-1.0_${scala.binary.version}</artifactId>
+       <name>flink-connector-kafka-1.0</name>
+
+       <packaging>jar</packaging>
+
+       <properties>
+               <kafka.version>1.0.0</kafka.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- streaming-java dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Add Kafka 1.0.x as a dependency -->
+
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>${kafka.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <!-- Projects depending on this project, won't depend 
on flink-table. -->
+                       <optional>true</optional>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <!-- Projects depending on this project, won't depend 
on flink-avro. -->
+                       <optional>true</optional>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-json</artifactId>
+                       <version>${project.version}</version>
+                       <!-- Projects depending on this project, won't depend 
on flink-json. -->
+                       <optional>true</optional>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-json</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- exclude Kafka dependencies -->
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- exclude Kafka dependencies -->
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <!-- include 1.0 server for tests  -->
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_${scala.binary.version}</artifactId>
+                       <version>${kafka.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-tests_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-jmx</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <profiles>
+               <!-- Create SQL Client uber jars by default -->
+               <profile>
+                       <id>sql-jars</id>
+                       <activation>
+                               <property>
+                                       <name>!skipSqlJars</name>
+                               </property>
+                       </activation>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-shade-plugin</artifactId>
+                                               <executions>
+                                                       <execution>
+                                                               
<phase>package</phase>
+                                                               <goals>
+                                                                       
<goal>shade</goal>
+                                                               </goals>
+                                                               <configuration>
+                                                                       
<shadedArtifactAttached>true</shadedArtifactAttached>
+                                                                       
<shadedClassifierName>sql-jar</shadedClassifierName>
+                                                                       
<artifactSet>
+                                                                               
<includes combine.children="append">
+                                                                               
        <include>org.apache.kafka:*</include>
+                                                                               
        
<include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include>
+                                                                               
        
<include>org.apache.flink:flink-connector-kafka-0.9_${scala.binary.version}</include>
+                                                                               
        
<include>org.apache.flink:flink-connector-kafka-0.10_${scala.binary.version}</include>
+                                                                               
        
<include>org.apache.flink:flink-connector-kafka-0.11_${scala.binary.version}</include>
+                                                                               
</includes>
+                                                                       
</artifactSet>
+                                                                       
<filters>
+                                                                               
<filter>
+                                                                               
        <artifact>*:*</artifact>
+                                                                               
        <excludes>
+                                                                               
                <exclude>kafka/kafka-version.properties</exclude>
+                                                                               
        </excludes>
+                                                                               
</filter>
+                                                                       
</filters>
+                                                                       
<relocations>
+                                                                               
<relocation>
+                                                                               
        <pattern>org.apache.kafka</pattern>
+                                                                               
        
<shadedPattern>org.apache.flink.kafka10.shaded.org.apache.kafka</shadedPattern>
+                                                                               
</relocation>
+                                                                       
</relocations>
+                                                               </configuration>
+                                                       </execution>
+                                               </executions>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
+       </profiles>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <includes>
+                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-source-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>attach-test-sources</id>
+                                               <goals>
+                                                       
<goal>test-jar-no-fork</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <includes>
+                                                               
<include>**/KafkaTestEnvironmentImpl*</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
+                                       <forkCount>1</forkCount>
+                                       <argLine>-Xms256m -Xmx2048m 
-Dlog4j.configuration=${log4j.configuration} 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java
new file mode 100644
index 00000000000..e45b97846c7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
+ * Apache Kafka 1.0.x. The consumer can run in multiple parallel instances, 
each of which will pull
+ * data from one or more Kafka partitions.
+ */
+public class FlinkKafkaConsumer10<T> extends FlinkKafkaConsumer011<T> {
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 1.0.x.
+        *
+        * @param topic             The name of the topic that should be 
consumed.
+        * @param valueDeserializer The de-/serializer used to convert between 
Kafka's byte messages and Flink's objects.
+        * @param props
+        */
+       public FlinkKafkaConsumer10(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
+               super(topic, valueDeserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 1.0.x
+        *
+        * <p>This constructor allows passing a {@see 
KeyedDeserializationSchema} for reading key/value
+        * pairs, offsets, and topic names from Kafka.
+        *
+        * @param topic        The name of the topic that should be consumed.
+        * @param deserializer The keyed de-/serializer used to convert between 
Kafka's byte messages and Flink's objects.
+        * @param props
+        */
+       public FlinkKafkaConsumer10(String topic, KeyedDeserializationSchema<T> 
deserializer, Properties props) {
+               super(topic, deserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 1.0.x
+        *
+        * <p>This constructor allows passing multiple topics to the consumer.
+        *
+        * @param topics       The Kafka topics to read from.
+        * @param deserializer The de-/serializer used to convert between 
Kafka's byte messages and Flink's objects.
+        * @param props
+        */
+       public FlinkKafkaConsumer10(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
+               super(topics, deserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 1.0.x
+        *
+        * <p>This constructor allows passing multiple topics and a key/value 
deserialization schema.
+        *
+        * @param topics       The Kafka topics to read from.
+        * @param deserializer The keyed de-/serializer used to convert between 
Kafka's byte messages and Flink's objects.
+        * @param props
+        */
+       public FlinkKafkaConsumer10(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               super(topics, deserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 1.0.x. Use 
this constructor to
+        * subscribe to multiple topics based on a regular expression pattern.
+        *
+        * <p>If partition discovery is enabled (by setting a non-negative 
value for
+        * {@link FlinkKafkaConsumer10#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} 
in the properties), topics
+        * with names matching the pattern will also be subscribed to as they 
are created on the fly.
+        *
+        * @param subscriptionPattern The regular expression for a pattern of 
topic names to subscribe to.
+        * @param valueDeserializer   The de-/serializer used to convert 
between Kafka's byte messages and Flink's objects.
+        * @param props
+        */
+       public FlinkKafkaConsumer10(Pattern subscriptionPattern, 
DeserializationSchema<T> valueDeserializer, Properties props) {
+               super(subscriptionPattern, valueDeserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 1.0.x. Use 
this constructor to
+        * subscribe to multiple topics based on a regular expression pattern.
+        *
+        * <p>If partition discovery is enabled (by setting a non-negative 
value for
+        * {@link FlinkKafkaConsumer10#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} 
in the properties), topics
+        * with names matching the pattern will also be subscribed to as they 
are created on the fly.
+        *
+        * <p>This constructor allows passing a {@see 
KeyedDeserializationSchema} for reading key/value
+        * pairs, offsets, and topic names from Kafka.
+        *
+        * @param subscriptionPattern The regular expression for a pattern of 
topic names to subscribe to.
+        * @param deserializer        The keyed de-/serializer used to convert 
between Kafka's byte messages and Flink's objects.
+        * @param props
+        */
+       public FlinkKafkaConsumer10(Pattern subscriptionPattern, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               super(subscriptionPattern, deserializer, props);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java
new file mode 100644
index 00000000000..de7e0b7b7c4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafka10Producer;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 1.0.x. By default producer
+ * will use {@link FlinkKafkaProducer10.Semantic#AT_LEAST_ONCE} semantic.
+ * Before using {@link FlinkKafkaProducer10.Semantic#EXACTLY_ONCE} please 
refer to Flink's
+ * Kafka connector documentation.
+ */
+public class FlinkKafkaProducer10<IN> extends FlinkKafkaProducer011<IN> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducer10.class);
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
a DataStream to
+        * the topic.
+        *
+        * @param brokerList          Comma separated addresses of the brokers
+        * @param topicId             ID of the Kafka topic.
+        * @param serializationSchema
+        */
+       public FlinkKafkaProducer10(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
+               super(brokerList, topicId, serializationSchema);
+       }
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
a DataStream to
+        * the topic.
+        *
+        * <p>Using this constructor, the default
+        * {@link 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer10(String, SerializationSchema, 
Properties, Optional)} instead.
+        *
+        * @param topicId             ID of the Kafka topic.
+        * @param serializationSchema User defined key-less serialization 
schema.
+        * @param producerConfig
+        */
+       public FlinkKafkaProducer10(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
+               super(topicId, serializationSchema, producerConfig);
+       }
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a key-less {@link SerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>Since a key-less {@link SerializationSchema} is used, all records 
sent to Kafka will not have an
+        * attached key. Therefore, if a partitioner is also not provided, 
records will be distributed to Kafka
+        * partitions in a round-robin fashion.
+        *
+        * @param topicId             The topic to write data to
+        * @param serializationSchema A key-less serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig      Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner   A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                            If a partitioner is not provided, records 
will be distributed to Kafka partitions
+        */
+       public FlinkKafkaProducer10(
+               String topicId,
+               SerializationSchema<IN> serializationSchema,
+               Properties producerConfig,
+               Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
a DataStream to
+        * the topic.
+        *
+        * <p>Using this constructor, the default
+        * {@link 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema, 
Properties, Optional)} instead.
+        *
+        * @param brokerList          Comma separated addresses of the brokers
+        * @param topicId             ID of the Kafka topic.
+        * @param serializationSchema
+        */
+       public FlinkKafkaProducer10(
+               String brokerList,
+               String topicId,
+               KeyedSerializationSchema<IN> serializationSchema) {
+               super(brokerList, topicId, serializationSchema);
+       }
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
a DataStream to
+        * the topic.
+        *
+        * <p>Using this constructor, the default
+        * {@link 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema, 
Properties, Optional)} instead.
+        *
+        * @param topicId             ID of the Kafka topic.
+        * @param serializationSchema User defined serialization schema 
supporting key/value messages
+        * @param producerConfig
+        */
+       public FlinkKafkaProducer10(
+               String topicId,
+               KeyedSerializationSchema<IN> serializationSchema,
+               Properties producerConfig) {
+               super(topicId, serializationSchema, producerConfig);
+       }
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
a DataStream to
+        * the topic.
+        *
+        * <p>Using this constructor, the default
+        * {@link 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema, 
Properties, Optional, Semantic, int)} instead.
+        *
+        * @param topicId             ID of the Kafka topic.
+        * @param serializationSchema User defined serialization schema 
supporting key/value messages
+        * @param producerConfig      Properties with the producer 
configuration.
+        * @param semantic            Defines semantic that will be used by 
this producer (see {@link Semantic}).
+        */
+       public FlinkKafkaProducer10(
+               String topicId,
+               KeyedSerializationSchema<IN> serializationSchema,
+               Properties producerConfig,
+               Semantic semantic) {
+               super(topicId, serializationSchema, producerConfig, semantic);
+       }
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
+        *
+        * @param defaultTopicId      The default topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig      Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner   A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                            If a partitioner is not provided, records 
will be partitioned by the key of each record
+        *                            (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                            are {@code null}, then records will be 
distributed to Kafka partitions in a
+        */
+       public FlinkKafkaProducer10(
+               String defaultTopicId,
+               KeyedSerializationSchema<IN> serializationSchema,
+               Properties producerConfig,
+               Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+               super(defaultTopicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+
+       /**
+        * Creates a FlinkKafka10Producer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
+        *
+        * @param defaultTopicId         The default topic to write data to
+        * @param serializationSchema    A serializable serialization schema 
for turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig         Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner      A serializable partitioner for 
assigning messages to Kafka partitions.
+        *                               If a partitioner is not provided, 
records will be partitioned by the key of each record
+        *                               (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                               are {@code null}, then records will be 
distributed to Kafka partitions in a
+        *                               round-robin fashion.
+        * @param semantic               Defines semantic that will be used by 
this producer (see {@link Semantic}).
+        * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link Semantic#EXACTLY_ONCE}).
+        */
+       public FlinkKafkaProducer10(
+               String defaultTopicId,
+               KeyedSerializationSchema<IN> serializationSchema,
+               Properties producerConfig,
+               Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
+               Semantic semantic,
+               int kafkaProducersPoolSize) {
+               super(defaultTopicId, serializationSchema, producerConfig, 
customPartitioner, semantic, kafkaProducersPoolSize);
+       }
+
+       @Override
+       protected FlinkKafka10Producer createProducer() {
+               return new FlinkKafka10Producer(this.producerConfig);
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java
new file mode 100644
index 00000000000..4f61b33bba1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka 1.0 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka10TableSink extends KafkaTableSink {
+
+       public Kafka10TableSink(
+               TableSchema schema,
+               String topic,
+               Properties properties,
+               Optional<FlinkKafkaPartitioner<Row>> partitioner,
+               SerializationSchema<Row> serializationSchema) {
+
+               super(schema, topic, properties, partitioner, 
serializationSchema);
+       }
+
+       @Override
+       protected SinkFunction<Row> createKafkaProducer(
+               String topic,
+               Properties properties,
+               SerializationSchema<Row> serializationSchema,
+               Optional<FlinkKafkaPartitioner<Row>> partitioner) {
+               return new FlinkKafkaProducer10<>(
+                       topic,
+                       new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+                       properties,
+                       partitioner);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java
new file mode 100644
index 00000000000..2421ac764a5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 1.0.
+ */
+@Internal
+public class Kafka10TableSource extends KafkaTableSource {
+
+       /**
+        * Creates a generic Kafka {@link StreamTableSource}.
+        *
+        * @param schema                      Schema of the produced table.
+        * @param proctimeAttribute           Field name of the processing time 
attribute.
+        * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+        * @param fieldMapping                Mapping for the fields of the 
table schema to
+        *                                    fields of the physical returned 
type.
+        * @param topic                       Kafka topic to consume.
+        * @param properties                  Properties for the Kafka consumer.
+        * @param deserializationSchema       Deserialization schema for 
decoding records from Kafka.
+        * @param startupMode                 Startup mode for the contained 
consumer.
+        * @param specificStartupOffsets      Specific startup offsets; only 
relevant when startup
+        *                                    mode is {@link 
StartupMode#SPECIFIC_OFFSETS}.
+        */
+       public Kafka10TableSource(
+               TableSchema schema,
+               Optional<String> proctimeAttribute,
+               List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+               Optional<Map<String, String>> fieldMapping,
+               String topic,
+               Properties properties,
+               DeserializationSchema<Row> deserializationSchema,
+               StartupMode startupMode,
+               Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               super(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       fieldMapping,
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets);
+       }
+
+       /**
+        * Creates a generic Kafka {@link StreamTableSource}.
+        *
+        * @param schema                Schema of the produced table.
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema for decoding 
records from Kafka.
+        */
+       public Kafka10TableSource(
+               TableSchema schema,
+               String topic,
+               Properties properties,
+               DeserializationSchema<Row> deserializationSchema) {
+
+               super(schema, topic, properties, deserializationSchema);
+       }
+
+       @Override
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(
+               String topic,
+               Properties properties,
+               DeserializationSchema<Row> deserializationSchema) {
+
+               return new FlinkKafkaConsumer10<Row>(topic, 
deserializationSchema, properties);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java
new file mode 100644
index 00000000000..5bfe479ccbe
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka10TableSource}.
+ */
+public class Kafka10TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBase {
+
+       @Override
+       protected String kafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_10;
+       }
+
+       @Override
+       protected boolean supportsKafkaTimestamps() {
+               return true;
+       }
+
+       @Override
+       protected KafkaTableSource createKafkaTableSource(
+               TableSchema schema,
+               Optional<String> proctimeAttribute,
+               List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+               Map<String, String> fieldMapping,
+               String topic,
+               Properties properties,
+               DeserializationSchema<Row> deserializationSchema,
+               StartupMode startupMode,
+               Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka10TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets);
+       }
+
+       @Override
+       protected KafkaTableSink createKafkaTableSink(
+               TableSchema schema,
+               String topic,
+               Properties properties,
+               Optional<FlinkKafkaPartitioner<Row>> partitioner,
+               SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka10TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java
new file mode 100644
index 00000000000..5eca36e57a4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Inner flink kafka producer.
+ */
+@PublicEvolving
+public class FlinkKafka10Producer<K, V> extends FlinkKafkaProducer<K, V> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafka10Producer.class);
+
+       public FlinkKafka10Producer(Properties properties) {
+               super(properties);
+       }
+
+       /**
+        * Instead of obtaining producerId and epoch from the transaction 
coordinator, re-use previously obtained ones,
+        * so that we can resume transaction after a restart. Implementation of 
this method is based on
+        * {@link KafkaProducer#initTransactions}.
+        * 
https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
+        */
+       public void resumeTransaction(long producerId, short epoch) {
+               Preconditions.checkState(producerId >= 0 && epoch >= 0, 
"Incorrect values for producerId {} and epoch {}", producerId, epoch);
+               LOG.info("Attempting to resume transaction {} with producerId 
{} and epoch {}", transactionalId, producerId, epoch);
+
+               Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
+               synchronized (transactionManager) {
+                       Object nextSequence = getValue(transactionManager, 
"nextSequence");
+
+                       invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+                       invoke(nextSequence, "clear");
+
+                       Object producerIdAndEpoch = 
getValue(transactionManager, "producerIdAndEpoch");
+                       setValue(producerIdAndEpoch, "producerId", producerId);
+                       setValue(producerIdAndEpoch, "epoch", epoch);
+
+                       invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+                       invoke(transactionManager, "transitionTo", 
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+                       setValue(transactionManager, "transactionStarted", 
true);
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 00000000000..b8951b9c3ed
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka10TableSourceSinkFactory
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties
 
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties
new file mode 100644
index 00000000000..6eef1747ddf
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java
new file mode 100644
index 00000000000..cd137f7c7bc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafka10Producer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for our own {@link FlinkKafka10Producer}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafka10ProducerITCase extends KafkaTestBase {
+       protected String transactionalId;
+       protected Properties extraProperties;
+
+       @Before
+       public void before() {
+               transactionalId = UUID.randomUUID().toString();
+               extraProperties = new Properties();
+               extraProperties.putAll(standardProps);
+               extraProperties.put("transactional.id", transactionalId);
+               extraProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("isolation.level", "read_committed");
+       }
+
+       @Test(timeout = 30000L)
+       public void testHappyPath() throws IOException {
+               String topicName = "flink-kafka-producer-happy-path";
+               try (Producer<String, String> kafkaProducer = new 
FlinkKafka10Producer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
+                       kafkaProducer.commitTransaction();
+               }
+               assertRecord(topicName, "42", "42");
+               deleteTestTopic(topicName);
+       }
+
+       @Test(timeout = 30000L)
+       public void testResumeTransaction() throws IOException {
+               String topicName = "flink-kafka-producer-resume-transaction";
+               try (FlinkKafka10Producer<String, String> kafkaProducer = new 
FlinkKafka10Producer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
+                       kafkaProducer.flush();
+                       long producerId = kafkaProducer.getProducerId();
+                       short epoch = kafkaProducer.getEpoch();
+
+                       try (FlinkKafka10Producer<String, String> 
resumeProducer = new FlinkKafka10Producer<>(extraProperties)) {
+                               resumeProducer.resumeTransaction(producerId, 
epoch);
+                               resumeProducer.commitTransaction();
+                       }
+
+                       assertRecord(topicName, "42", "42");
+
+                       // this shouldn't throw - in case of network split, old 
producer might attempt to commit it's transaction
+                       kafkaProducer.commitTransaction();
+
+                       // this shouldn't fail also, for same reason as above
+                       try (FlinkKafka10Producer<String, String> 
resumeProducer = new FlinkKafka10Producer<>(extraProperties)) {
+                               resumeProducer.resumeTransaction(producerId, 
epoch);
+                               resumeProducer.commitTransaction();
+                       }
+               }
+               deleteTestTopic(topicName);
+       }
+
+       private void assertRecord(String topicName, String expectedKey, String 
expectedValue) {
+               try (KafkaConsumer<String, String> kafkaConsumer = new 
KafkaConsumer<>(extraProperties)) {
+                       
kafkaConsumer.subscribe(Collections.singletonList(topicName));
+                       ConsumerRecords<String, String> records = 
kafkaConsumer.poll(10000);
+
+                       ConsumerRecord<String, String> record = 
Iterables.getOnlyElement(records);
+                       assertEquals(expectedKey, record.key());
+                       assertEquals(expectedValue, record.value());
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java
new file mode 100644
index 00000000000..b178890773f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+import kafka.server.KafkaServer;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer10}.
+ */
+public class FlinkKafkaProducer10ITCase extends KafkaTestBase {
+
+       protected String transactionalId;
+       protected Properties extraProperties;
+
+       protected TypeInformationSerializationSchema<Integer> 
integerSerializationSchema =
+               new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
+       protected KeyedSerializationSchema<Integer> 
integerKeyedSerializationSchema =
+               new 
KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
+
+       @Before
+       public void before() {
+               transactionalId = UUID.randomUUID().toString();
+               extraProperties = new Properties();
+               extraProperties.putAll(standardProps);
+               extraProperties.put("transactional.id", transactionalId);
+               extraProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("isolation.level", "read_committed");
+       }
+
+       @Test
+       public void resourceCleanUpNone() throws Exception {
+               resourceCleanUp(FlinkKafkaProducer10.Semantic.NONE);
+       }
+
+       @Test
+       public void resourceCleanUpAtLeastOnce() throws Exception {
+               resourceCleanUp(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE);
+       }
+
+       /**
+        * This tests checks whether there is some resource leak in form of 
growing threads number.
+        */
+       public void resourceCleanUp(FlinkKafkaProducer10.Semantic semantic) 
throws Exception {
+               String topic = "flink-kafka-producer-resource-cleanup-" + 
semantic;
+
+               final int allowedEpsilonThreadCountGrow = 50;
+
+               Optional<Integer> initialActiveThreads = Optional.empty();
+               for (int i = 0; i < allowedEpsilonThreadCountGrow * 2; i++) {
+                       try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness1 =
+                               createTestHarness(topic, 1, 1, 0, semantic)) {
+                               testHarness1.setup();
+                               testHarness1.open();
+                       }
+
+                       if (initialActiveThreads.isPresent()) {
+                               assertThat("active threads count",
+                                       Thread.activeCount(),
+                                       lessThan(initialActiveThreads.get() + 
allowedEpsilonThreadCountGrow));
+                       }
+                       else {
+                               initialActiveThreads = 
Optional.of(Thread.activeCount());
+                       }
+               }
+       }
+
+       /**
+        * This test ensures that transactions reusing transactional.ids (after 
returning to the pool) will not clash
+        * with previous transactions using same transactional.ids.
+        */
+       @Test
+       public void testRestoreToCheckpointAfterExceedingProducersPool() throws 
Exception {
+               String topic = "flink-kafka-producer-fail-before-notify";
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness1 = createTestHarness(topic)) {
+                       testHarness1.setup();
+                       testHarness1.open();
+                       testHarness1.processElement(42, 0);
+                       OperatorSubtaskState snapshot = 
testHarness1.snapshot(0, 0);
+                       testHarness1.processElement(43, 0);
+                       testHarness1.notifyOfCompletedCheckpoint(0);
+                       try {
+                               for (int i = 0; i < 
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; i++) {
+                                       testHarness1.snapshot(i + 1, 0);
+                                       testHarness1.processElement(i, 0);
+                               }
+                               throw new IllegalStateException("This should 
not be reached.");
+                       }
+                       catch (Exception ex) {
+                               if 
(!isCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex)) {
+                                       throw ex;
+                               }
+                       }
+
+                       // Resume transactions before testHarness1 is being 
closed (in case of failures close() might not be called)
+                       try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness2 = createTestHarness(topic)) {
+                               testHarness2.setup();
+                               // restore from snapshot1, transactions with 
records 43 and 44 should be aborted
+                               testHarness2.initializeState(snapshot);
+                               testHarness2.open();
+                       }
+
+                       assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42), 30_000L);
+                       deleteTestTopic(topic);
+               }
+               catch (Exception ex) {
+                       // testHarness1 will be fenced off after creating and 
closing testHarness2
+                       if (!findThrowable(ex, 
ProducerFencedException.class).isPresent()) {
+                               throw ex;
+                       }
+               }
+       }
+
+       @Test
+       public void testFlinkKafkaProducer10FailBeforeNotify() throws Exception 
{
+               String topic = "flink-kafka-producer-fail-before-notify";
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+
+               testHarness.setup();
+               testHarness.open();
+               testHarness.processElement(42, 0);
+               testHarness.snapshot(0, 1);
+               testHarness.processElement(43, 2);
+               OperatorSubtaskState snapshot = testHarness.snapshot(1, 3);
+
+               int leaderId = kafkaServer.getLeaderToShutDown(topic);
+               failBroker(leaderId);
+
+               try {
+                       testHarness.processElement(44, 4);
+                       testHarness.snapshot(2, 5);
+                       assertFalse(true);
+               }
+               catch (Exception ex) {
+                       // expected
+               }
+               try {
+                       testHarness.close();
+               }
+               catch (Exception ex) {
+               }
+
+               kafkaServer.restartBroker(leaderId);
+
+               testHarness = createTestHarness(topic);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.close();
+
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43), 30_000L);
+
+               deleteTestTopic(topic);
+       }
+
+       @Test
+       public void 
testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify() throws 
Exception {
+               String topic = 
"flink-kafka-producer-fail-transaction-coordinator-before-notify";
+
+               Properties properties = createProperties();
+
+               FlinkKafkaProducer10<Integer> kafkaProducer = new 
FlinkKafkaProducer10<>(
+                       topic,
+                       integerKeyedSerializationSchema,
+                       properties,
+                       FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 
= new OneInputStreamOperatorTestHarness<>(
+                       new StreamSink<>(kafkaProducer),
+                       IntSerializer.INSTANCE);
+
+               testHarness1.setup();
+               testHarness1.open();
+               testHarness1.processElement(42, 0);
+               testHarness1.snapshot(0, 1);
+               testHarness1.processElement(43, 2);
+               int transactionCoordinatorId = 
kafkaProducer.getTransactionCoordinatorId();
+               OperatorSubtaskState snapshot = testHarness1.snapshot(1, 3);
+
+               failBroker(transactionCoordinatorId);
+
+               try {
+                       testHarness1.processElement(44, 4);
+                       testHarness1.notifyOfCompletedCheckpoint(1);
+                       testHarness1.close();
+               }
+               catch (Exception ex) {
+                       // Expected... some random exception could be thrown by 
any of the above operations.
+               }
+               finally {
+                       kafkaServer.restartBroker(transactionCoordinatorId);
+               }
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness2 = createTestHarness(topic)) {
+                       testHarness2.setup();
+                       testHarness2.initializeState(snapshot);
+                       testHarness2.open();
+               }
+
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43), 30_000L);
+
+               deleteTestTopic(topic);
+       }
+
+       /**
+        * This tests checks whether FlinkKafkaProducer10 correctly aborts 
lingering transactions after a failure.
+        * If such transactions were left alone lingering it consumers would be 
unable to read committed records
+        * that were created after this lingering transaction.
+        */
+       @Test
+       public void testFailBeforeNotifyAndResumeWorkAfterwards() throws 
Exception {
+               String topic = "flink-kafka-producer-fail-before-notify";
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+
+               testHarness.setup();
+               testHarness.open();
+               testHarness.processElement(42, 0);
+               testHarness.snapshot(0, 1);
+               testHarness.processElement(43, 2);
+               OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
+
+               testHarness.processElement(44, 4);
+               testHarness.snapshot(2, 5);
+               testHarness.processElement(45, 6);
+
+               // do not close previous testHarness to make sure that closing 
do not clean up something (in case of failure
+               // there might not be any close)
+               testHarness = createTestHarness(topic);
+               testHarness.setup();
+               // restore from snapshot1, transactions with records 44 and 45 
should be aborted
+               testHarness.initializeState(snapshot1);
+               testHarness.open();
+
+               // write and commit more records, after potentially lingering 
transactions
+               testHarness.processElement(46, 7);
+               testHarness.snapshot(4, 8);
+               testHarness.processElement(47, 9);
+               testHarness.notifyOfCompletedCheckpoint(4);
+
+               //now we should have:
+               // - records 42 and 43 in committed transactions
+               // - aborted transactions with records 44 and 45
+               // - committed transaction with record 46
+               // - pending transaction with record 47
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 46), 30_000L);
+
+               testHarness.close();
+               deleteTestTopic(topic);
+       }
+
+       @Test
+       public void testFailAndRecoverSameCheckpointTwice() throws Exception {
+               String topic = 
"flink-kafka-producer-fail-and-recover-same-checkpoint-twice";
+
+               OperatorSubtaskState snapshot1;
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+                       testHarness.setup();
+                       testHarness.open();
+                       testHarness.processElement(42, 0);
+                       testHarness.snapshot(0, 1);
+                       testHarness.processElement(43, 2);
+                       snapshot1 = testHarness.snapshot(1, 3);
+
+                       testHarness.processElement(44, 4);
+               }
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+                       testHarness.setup();
+                       // restore from snapshot1, transactions with records 44 
and 45 should be aborted
+                       testHarness.initializeState(snapshot1);
+                       testHarness.open();
+
+                       // write and commit more records, after potentially 
lingering transactions
+                       testHarness.processElement(44, 7);
+                       testHarness.snapshot(2, 8);
+                       testHarness.processElement(45, 9);
+               }
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+                       testHarness.setup();
+                       // restore from snapshot1, transactions with records 44 
and 45 should be aborted
+                       testHarness.initializeState(snapshot1);
+                       testHarness.open();
+
+                       // write and commit more records, after potentially 
lingering transactions
+                       testHarness.processElement(44, 7);
+                       testHarness.snapshot(3, 8);
+                       testHarness.processElement(45, 9);
+               }
+
+               //now we should have:
+               // - records 42 and 43 in committed transactions
+               // - aborted transactions with records 44 and 45
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43), 30_000L);
+               deleteTestTopic(topic);
+       }
+
+       /**
+        * This tests checks whether FlinkKafkaProducer10 correctly aborts 
lingering transactions after a failure,
+        * which happened before first checkpoint and was followed up by 
reducing the parallelism.
+        * If such transactions were left alone lingering it consumers would be 
unable to read committed records
+        * that were created after this lingering transaction.
+        */
+       @Test
+       public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+               String topic = "scale-down-before-first-checkpoint";
+
+               List<AutoCloseable> operatorsToClose = new ArrayList<>();
+               int preScaleDownParallelism = Math.max(2, 
FlinkKafkaProducer10.SAFE_SCALE_DOWN_FACTOR);
+               for (int subtaskIndex = 0; subtaskIndex < 
preScaleDownParallelism; subtaskIndex++) {
+                       OneInputStreamOperatorTestHarness<Integer, Object> 
preScaleDownOperator = createTestHarness(
+                               topic,
+                               preScaleDownParallelism,
+                               preScaleDownParallelism,
+                               subtaskIndex,
+                               FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+
+                       preScaleDownOperator.setup();
+                       preScaleDownOperator.open();
+                       preScaleDownOperator.processElement(subtaskIndex * 2, 
0);
+                       preScaleDownOperator.snapshot(0, 1);
+                       preScaleDownOperator.processElement(subtaskIndex * 2 + 
1, 2);
+
+                       operatorsToClose.add(preScaleDownOperator);
+               }
+
+               // do not close previous testHarnesses to make sure that 
closing do not clean up something (in case of failure
+               // there might not be any close)
+
+               // After previous failure simulate restarting application with 
smaller parallelism
+               OneInputStreamOperatorTestHarness<Integer, Object> 
postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0, 
FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+
+               postScaleDownOperator1.setup();
+               postScaleDownOperator1.open();
+
+               // write and commit more records, after potentially lingering 
transactions
+               postScaleDownOperator1.processElement(46, 7);
+               postScaleDownOperator1.snapshot(4, 8);
+               postScaleDownOperator1.processElement(47, 9);
+               postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+
+               //now we should have:
+               // - records 42, 43, 44 and 45 in aborted transactions
+               // - committed transaction with record 46
+               // - pending transaction with record 47
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(46), 30_000L);
+
+               postScaleDownOperator1.close();
+               // ignore ProducerFencedExceptions, because 
postScaleDownOperator1 could reuse transactional ids.
+               for (AutoCloseable operatorToClose : operatorsToClose) {
+                       closeIgnoringProducerFenced(operatorToClose);
+               }
+               deleteTestTopic(topic);
+       }
+
+       /**
+        * Each instance of FlinkKafkaProducer10 uses it's own pool of 
transactional ids. After the restore from checkpoint
+        * transactional ids are redistributed across the subtasks. In case of 
scale down, the surplus transactional ids
+        * are dropped. In case of scale up, new one are generated (for the new 
subtasks). This test make sure that sequence
+        * of scaling down and up again works fine. Especially it checks 
whether the newly generated ids in scaling up
+        * do not overlap with ids that were used before scaling down. For 
example we start with 4 ids and parallelism 4:
+        * [1], [2], [3], [4] - one assigned per each subtask
+        * we scale down to parallelism 2:
+        * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
+        * surplus ids are dropped from the pools and we scale up to 
parallelism 3:
+        * [1 or 2], [3 or 4], [???]
+        * new subtask have to generate new id(s), but he can not use ids that 
are potentially in use, so it has to generate
+        * new ones that are greater then 4.
+        */
+       @Test
+       public void testScaleUpAfterScalingDown() throws Exception {
+               String topic = "scale-down-before-first-checkpoint";
+
+               final int parallelism1 = 4;
+               final int parallelism2 = 2;
+               final int parallelism3 = 3;
+               final int maxParallelism = Math.max(parallelism1, 
Math.max(parallelism2, parallelism3));
+
+               List<OperatorStateHandle> operatorSubtaskState = 
repartitionAndExecute(
+                       topic,
+                       Collections.emptyList(),
+                       parallelism1,
+                       maxParallelism,
+                       IntStream.range(0, parallelism1).boxed().iterator());
+
+               operatorSubtaskState = repartitionAndExecute(
+                       topic,
+                       operatorSubtaskState,
+                       parallelism2,
+                       maxParallelism,
+                       IntStream.range(parallelism1,  parallelism1 + 
parallelism2).boxed().iterator());
+
+               operatorSubtaskState = repartitionAndExecute(
+                       topic,
+                       operatorSubtaskState,
+                       parallelism3,
+                       maxParallelism,
+                       IntStream.range(parallelism1 + parallelism2,  
parallelism1 + parallelism2 + parallelism3).boxed().iterator());
+
+               // After each previous repartitionAndExecute call, we are left 
with some lingering transactions, that would
+               // not allow us to read all committed messages from the topic. 
Thus we initialize operators from
+               // OperatorSubtaskState once more, but without any new data. 
This should terminate all ongoing transactions.
+
+               operatorSubtaskState = repartitionAndExecute(
+                       topic,
+                       operatorSubtaskState,
+                       1,
+                       maxParallelism,
+                       Collections.emptyIterator());
+
+               assertExactlyOnceForTopic(
+                       createProperties(),
+                       topic,
+                       0,
+                       IntStream.range(0, parallelism1 + parallelism2 + 
parallelism3).boxed().collect(Collectors.toList()),
+                       30_000L);
+               deleteTestTopic(topic);
+       }
+
+       private List<OperatorStateHandle> repartitionAndExecute(
+               String topic,
+               List<OperatorStateHandle> inputStates,
+               int parallelism,
+               int maxParallelism,
+               Iterator<Integer> inputData) throws Exception {
+
+               List<OperatorStateHandle> outputStates = new ArrayList<>();
+               List<OneInputStreamOperatorTestHarness<Integer, Object>> 
testHarnesses = new ArrayList<>();
+
+               for (int subtaskIndex = 0; subtaskIndex < parallelism; 
subtaskIndex++) {
+                       OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness =
+                               createTestHarness(topic, maxParallelism, 
parallelism, subtaskIndex, FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+                       testHarnesses.add(testHarness);
+
+                       testHarness.setup();
+
+                       testHarness.initializeState(new OperatorSubtaskState(
+                               new StateObjectCollection<>(inputStates),
+                               StateObjectCollection.empty(),
+                               StateObjectCollection.empty(),
+                               StateObjectCollection.empty()));
+                       testHarness.open();
+
+                       if (inputData.hasNext()) {
+                               int nextValue = inputData.next();
+                               testHarness.processElement(nextValue, 0);
+                               OperatorSubtaskState snapshot = 
testHarness.snapshot(0, 0);
+
+                               
outputStates.addAll(snapshot.getManagedOperatorState());
+                               
checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator 
state");
+                               
checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed 
state");
+                               
checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state");
+
+                               for (int i = 1; i < 
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
+                                       testHarness.processElement(-nextValue, 
0);
+                                       testHarness.snapshot(i, 0);
+                               }
+                       }
+               }
+
+               for (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness : testHarnesses) {
+                       testHarness.close();
+               }
+
+               return outputStates;
+       }
+
+       @Test
+       public void testRecoverCommittedTransaction() throws Exception {
+               String topic = 
"flink-kafka-producer-recover-committed-transaction";
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+
+               testHarness.setup();
+               testHarness.open(); // producerA - start transaction (txn) 0
+               testHarness.processElement(42, 0); // producerA - write 42 in 
txn 0
+               OperatorSubtaskState checkpoint0 = testHarness.snapshot(0, 1); 
// producerA - pre commit txn 0, producerB - start txn 1
+               testHarness.processElement(43, 2); // producerB - write 43 in 
txn 1
+               testHarness.notifyOfCompletedCheckpoint(0); // producerA - 
commit txn 0 and return to the pool
+               testHarness.snapshot(1, 3); // producerB - pre txn 1,  
producerA - start txn 2
+               testHarness.processElement(44, 4); // producerA - write 44 in 
txn 2
+               testHarness.close(); // producerA - abort txn 2
+
+               testHarness = createTestHarness(topic);
+               testHarness.initializeState(checkpoint0); // recover state 0 - 
producerA recover and commit txn 0
+               testHarness.close();
+
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42), 30_000L);
+
+               deleteTestTopic(topic);
+       }
+
+       @Test
+       public void testRunOutOfProducersInThePool() throws Exception {
+               String topic = "flink-kafka-run-out-of-producers";
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       for (int i = 0; i < 
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
+                               testHarness.processElement(i, i * 2);
+                               testHarness.snapshot(i, i * 2 + 1);
+                       }
+               }
+               catch (Exception ex) {
+                       if (!ex.getCause().getMessage().startsWith("Too many 
ongoing")) {
+                               throw ex;
+                       }
+               }
+               deleteTestTopic(topic);
+       }
+
+       // shut down a Kafka broker
+       private void failBroker(int brokerId) {
+               KafkaServer toShutDown = null;
+               for (KafkaServer server : kafkaServer.getBrokers()) {
+
+                       if (kafkaServer.getBrokerId(server) == brokerId) {
+                               toShutDown = server;
+                               break;
+                       }
+               }
+
+               if (toShutDown == null) {
+                       StringBuilder listOfBrokers = new StringBuilder();
+                       for (KafkaServer server : kafkaServer.getBrokers()) {
+                               
listOfBrokers.append(kafkaServer.getBrokerId(server));
+                               listOfBrokers.append(" ; ");
+                       }
+
+                       throw new IllegalArgumentException("Cannot find broker 
to shut down: " + brokerId
+                               + " ; available brokers: " + 
listOfBrokers.toString());
+               } else {
+                       toShutDown.shutdown();
+                       toShutDown.awaitShutdown();
+               }
+       }
+
+       private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) 
throws Exception {
+               try {
+                       autoCloseable.close();
+               }
+               catch (Exception ex) {
+                       if (!(ex.getCause() instanceof 
ProducerFencedException)) {
+                               throw ex;
+                       }
+               }
+       }
+
+       private OneInputStreamOperatorTestHarness<Integer, Object> 
createTestHarness(String topic) throws Exception {
+               return createTestHarness(topic, 1, 1, 0, 
FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+       }
+
+       private OneInputStreamOperatorTestHarness<Integer, Object> 
createTestHarness(
+               String topic,
+               int maxParallelism,
+               int parallelism,
+               int subtaskIndex,
+               FlinkKafkaProducer10.Semantic semantic) throws Exception {
+               Properties properties = createProperties();
+
+               FlinkKafkaProducer10<Integer> kafkaProducer = new 
FlinkKafkaProducer10<>(
+                       topic,
+                       integerKeyedSerializationSchema,
+                       properties,
+                       semantic);
+
+               return new OneInputStreamOperatorTestHarness<>(
+                       new StreamSink<>(kafkaProducer),
+                       maxParallelism,
+                       parallelism,
+                       subtaskIndex,
+                       IntSerializer.INSTANCE,
+                       new OperatorID(42, 44));
+       }
+
+       private Properties createProperties() {
+               Properties properties = new Properties();
+               properties.putAll(standardProps);
+               properties.putAll(secureProps);
+               properties.put(FlinkKafkaProducer10.KEY_DISABLE_METRICS, 
"true");
+               return properties;
+       }
+
+       private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, 
Throwable ex) {
+               Optional<FlinkKafka011Exception> cause = findThrowable(ex, 
FlinkKafka011Exception.class);
+               if (cause.isPresent()) {
+                       return 
cause.get().getErrorCode().equals(expectedErrorCode);
+               }
+               return false;
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java
new file mode 100644
index 00000000000..47cc2ea972e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder;
+
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * A test for the {@link TypeSerializer TypeSerializers} used for the Kafka 
producer state.
+ */
+public class FlinkKafkaProducer10StateSerializerTest
+       extends SerializerTestBase<
+               TwoPhaseCommitSinkFunction.State<
+                       FlinkKafkaProducer10.KafkaTransactionState,
+                       FlinkKafkaProducer10.KafkaTransactionContext>> {
+
+       @Override
+       protected TypeSerializer<
+               TwoPhaseCommitSinkFunction.State<
+                       FlinkKafkaProducer10.KafkaTransactionState,
+                       FlinkKafkaProducer10.KafkaTransactionContext>> 
createSerializer() {
+               return new TwoPhaseCommitSinkFunction.StateSerializer<>(
+                       new FlinkKafkaProducer10.TransactionStateSerializer(),
+                       new FlinkKafkaProducer10.ContextStateSerializer());
+       }
+
+       @Override
+       protected Class<TwoPhaseCommitSinkFunction.State<
+                       FlinkKafkaProducer10.KafkaTransactionState,
+                       FlinkKafkaProducer10.KafkaTransactionContext>> 
getTypeClass() {
+               return (Class) TwoPhaseCommitSinkFunction.State.class;
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected TwoPhaseCommitSinkFunction.State<
+               FlinkKafkaProducer10.KafkaTransactionState,
+               FlinkKafkaProducer10.KafkaTransactionContext>[] getTestData() {
+               //noinspection unchecked
+               return new TwoPhaseCommitSinkFunction.State[] {
+                       new TwoPhaseCommitSinkFunction.State<
+                               FlinkKafkaProducer10.KafkaTransactionState,
+                               FlinkKafkaProducer10.KafkaTransactionContext>(
+                                       new TransactionHolder(new 
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+                                       Collections.emptyList(),
+                                       Optional.empty()),
+                       new TwoPhaseCommitSinkFunction.State<
+                               FlinkKafkaProducer10.KafkaTransactionState,
+                               FlinkKafkaProducer10.KafkaTransactionContext>(
+                               new TransactionHolder(new 
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 2711),
+                               Collections.singletonList(new 
TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, 
(short) 42, null), 42)),
+                               Optional.empty()),
+                       new TwoPhaseCommitSinkFunction.State<
+                               FlinkKafkaProducer10.KafkaTransactionState,
+                               FlinkKafkaProducer10.KafkaTransactionContext>(
+                               new TransactionHolder(new 
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+                               Collections.emptyList(),
+                               Optional.of(new 
FlinkKafkaProducer10.KafkaTransactionContext(Collections.emptySet()))),
+                       new TwoPhaseCommitSinkFunction.State<
+                               FlinkKafkaProducer10.KafkaTransactionState,
+                               FlinkKafkaProducer10.KafkaTransactionContext>(
+                               new TransactionHolder(new 
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+                               Collections.emptyList(),
+                               Optional.of(new 
FlinkKafkaProducer10.KafkaTransactionContext(Collections.singleton("hello")))),
+                       new TwoPhaseCommitSinkFunction.State<
+                               FlinkKafkaProducer10.KafkaTransactionState,
+                               FlinkKafkaProducer10.KafkaTransactionContext>(
+                               new TransactionHolder(new 
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+                               Collections.singletonList(new 
TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, 
(short) 42, null), 0)),
+                               Optional.of(new 
FlinkKafkaProducer10.KafkaTransactionContext(Collections.emptySet()))),
+                       new TwoPhaseCommitSinkFunction.State<
+                               FlinkKafkaProducer10.KafkaTransactionState,
+                               FlinkKafkaProducer10.KafkaTransactionContext>(
+                               new TransactionHolder(new 
FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0),
+                               Collections.singletonList(new 
TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, 
(short) 42, null), 0)),
+                               Optional.of(new 
FlinkKafkaProducer10.KafkaTransactionContext(Collections.singleton("hello"))))
+               };
+       }
+
+       @Override
+       public void testInstantiate() {
+               // this serializer does not support instantiation
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
new file mode 100644
index 00000000000..d5521728061
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 1.0 .
+ */
+public class Kafka10ITCase extends KafkaConsumerTestBase {
+
+       @BeforeClass
+       public static void prepare() throws ClassNotFoundException {
+               KafkaProducerTestBase.prepare();
+               ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Suite of Tests
+       // 
------------------------------------------------------------------------
+
+       @Test(timeout = 60000)
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testConcurrentProducerConsumerTopology() throws Exception {
+               runSimpleConcurrentProducerConsumerTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testKeyValueSupport() throws Exception {
+               runKeyValueTest();
+       }
+
+       // --- canceling / failures ---
+
+       @Test(timeout = 60000)
+       public void testCancelingEmptyTopic() throws Exception {
+               runCancelingOnEmptyInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testCancelingFullTopic() throws Exception {
+               runCancelingOnFullInputTest();
+       }
+
+       // --- source to partition mappings and exactly once ---
+
+       @Test(timeout = 60000)
+       public void testOneToOneSources() throws Exception {
+               runOneToOneExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testOneSourceMultiplePartitions() throws Exception {
+               runOneSourceMultiplePartitionsExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleSourcesOnePartition() throws Exception {
+               runMultipleSourcesOnePartitionExactlyOnceTest();
+       }
+
+       // --- broker failure ---
+
+       @Test(timeout = 60000)
+       public void testBrokerFailure() throws Exception {
+               runBrokerFailureTest();
+       }
+
+       // --- special executions ---
+
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMetricsAndEndOfStream() throws Exception {
+               runEndOfStreamTest();
+       }
+
+       // --- startup mode ---
+
+       @Test(timeout = 60000)
+       public void testStartFromEarliestOffsets() throws Exception {
+               runStartFromEarliestOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromLatestOffsets() throws Exception {
+               runStartFromLatestOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromGroupOffsets() throws Exception {
+               runStartFromGroupOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromSpecificOffsets() throws Exception {
+               runStartFromSpecificOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromTimestamp() throws Exception {
+               runStartFromTimestamp();
+       }
+
+       // --- offset committing ---
+
+       @Test(timeout = 60000)
+       public void testCommitOffsetsToKafka() throws Exception {
+               runCommitOffsetsToKafka();
+       }
+
+       @Test(timeout = 60000)
+       public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+               runAutoOffsetRetrievalAndCommitToKafka();
+       }
+
+       /**
+        * Kafka 10 specific test, ensuring Timestamps are properly written to 
and read from Kafka.
+        */
+       @Test(timeout = 60000)
+       public void testTimestamps() throws Exception {
+
+               final String topic = "tstopic";
+               createTestTopic(topic, 3, 1);
+
+               // ---------- Produce an event time stream into Kafka 
-------------------
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Long> streamWithTimestamps = env.addSource(new 
SourceFunction<Long>() {
+                       private static final long serialVersionUID = 
-2255115836471289626L;
+                       boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<Long> ctx) throws 
Exception {
+                               long i = 0;
+                               while (running) {
+                                       ctx.collectWithTimestamp(i, i * 2);
+                                       if (i++ == 1110L) {
+                                               running = false;
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               final TypeInformationSerializationSchema<Long> longSer = new 
TypeInformationSerializationSchema<>(Types.LONG, env.getConfig());
+               FlinkKafkaProducer10<Long> prod = new 
FlinkKafkaProducer10<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), 
standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() {
+                       private static final long serialVersionUID = 
-6730989584364230617L;
+
+                       @Override
+                       public int partition(Long next, byte[] key, byte[] 
value, String targetTopic, int[] partitions) {
+                               return (int) (next % 3);
+                       }
+               }));
+               prod.setWriteTimestampToKafka(true);
+
+               streamWithTimestamps.addSink(prod).setParallelism(3);
+
+               env.execute("Produce some");
+
+               // ---------- Consume stream from Kafka -------------------
+
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               FlinkKafkaConsumer10<Long> kafkaSource = new 
FlinkKafkaConsumer10<>(topic, new Kafka10ITCase.LimitedLongDeserializer(), 
standardProps);
+               kafkaSource.assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Long>() {
+                       private static final long serialVersionUID = 
-4834111173247835189L;
+
+                       @Nullable
+                       @Override
+                       public Watermark checkAndGetNextWatermark(Long 
lastElement, long extractedTimestamp) {
+                               if (lastElement % 11 == 0) {
+                                       return new Watermark(lastElement);
+                               }
+                               return null;
+                       }
+
+                       @Override
+                       public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                               return previousElementTimestamp;
+                       }
+               });
+
+               DataStream<Long> stream = env.addSource(kafkaSource);
+               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
+               stream.transform("timestamp validating operator", 
objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+               env.execute("Consume again");
+
+               deleteTestTopic(topic);
+       }
+
+       private static class TimestampValidatingOperator extends 
StreamSink<Long> {
+
+               private static final long serialVersionUID = 
1353168781235526806L;
+
+               public TimestampValidatingOperator() {
+                       super(new SinkFunction<Long>() {
+                               private static final long serialVersionUID = 
-6676565693361786524L;
+
+                               @Override
+                               public void invoke(Long value) throws Exception 
{
+                                       throw new 
RuntimeException("Unexpected");
+                               }
+                       });
+               }
+
+               long elCount = 0;
+               long wmCount = 0;
+               long lastWM = Long.MIN_VALUE;
+
+               @Override
+               public void processElement(StreamRecord<Long> element) throws 
Exception {
+                       elCount++;
+                       if (element.getValue() * 2 != element.getTimestamp()) {
+                               throw new RuntimeException("Invalid timestamp: 
" + element);
+                       }
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+                       wmCount++;
+
+                       if (lastWM <= mark.getTimestamp()) {
+                               lastWM = mark.getTimestamp();
+                       } else {
+                               throw new RuntimeException("Received watermark 
higher than the last one");
+                       }
+
+                       if (mark.getTimestamp() % 11 != 0 && 
mark.getTimestamp() != Long.MAX_VALUE) {
+                               throw new RuntimeException("Invalid watermark: 
" + mark.getTimestamp());
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       if (elCount != 1110L) {
+                               throw new RuntimeException("Wrong final element 
count " + elCount);
+                       }
+
+                       if (wmCount <= 2) {
+                               throw new RuntimeException("Almost no 
watermarks have been sent " + wmCount);
+                       }
+               }
+       }
+
+       private static class LimitedLongDeserializer implements 
KeyedDeserializationSchema<Long> {
+
+               private static final long serialVersionUID = 
6966177118923713521L;
+               private final TypeInformation<Long> ti;
+               private final TypeSerializer<Long> ser;
+               long cnt = 0;
+
+               public LimitedLongDeserializer() {
+                       this.ti = Types.LONG;
+                       this.ser = ti.createSerializer(new ExecutionConfig());
+               }
+
+               @Override
+               public TypeInformation<Long> getProducedType() {
+                       return ti;
+               }
+
+               @Override
+               public Long deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
+                       cnt++;
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       Long e = ser.deserialize(in);
+                       return e;
+               }
+
+               @Override
+               public boolean isEndOfStream(Long nextElement) {
+                       return cnt > 1110L;
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java
new file mode 100644
index 00000000000..b064d9fa6b6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer10}.
+ */
+@SuppressWarnings("serial")
+public class Kafka10ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
+
+       @BeforeClass
+       public static void prepare() throws ClassNotFoundException {
+               KafkaProducerTestBase.prepare();
+               ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE);
+       }
+
+       @Override
+       public void testExactlyOnceRegularSink() throws Exception {
+               // disable test for at least once semantic
+       }
+
+       @Override
+       public void testExactlyOnceCustomOperator() throws Exception {
+               // disable test for at least once semantic
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java
new file mode 100644
index 00000000000..8365e25e6f4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer10}.
+ */
+@SuppressWarnings("serial")
+public class Kafka10ProducerExactlyOnceITCase extends KafkaProducerTestBase {
+       @BeforeClass
+       public static void prepare() throws ClassNotFoundException {
+               KafkaProducerTestBase.prepare();
+               ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.EXACTLY_ONCE);
+       }
+
+       @Override
+       public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+               // TODO: fix this test
+               // currently very often (~50% cases) KafkaProducer live locks 
itself on commitTransaction call.
+               // Somehow Kafka 10 doesn't play along with 
NetworkFailureProxy. This can either mean a bug in Kafka
+               // that it doesn't work well with some weird network failures, 
or the NetworkFailureProxy is a broken design
+               // and this test should be reimplemented in completely 
different way...
+       }
+
+       @Override
+       public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+               // TODO: fix this test
+               // currently very often (~50% cases) KafkaProducer live locks 
itself on commitTransaction call.
+               // Somehow Kafka 10 doesn't play along with 
NetworkFailureProxy. This can either mean a bug in Kafka
+               // that it doesn't work well with some weird network failures, 
or the NetworkFailureProxy is a broken design
+               // and this test should be reimplemented in completely 
different way...
+       }
+
+       @Test
+       public void testMultipleSinkOperators() throws Exception {
+               testExactlyOnce(false, 2);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java
new file mode 100644
index 00000000000..e557e519570
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka10TableSource} and {@link Kafka10TableSink} created
+ * by {@link Kafka10TableSourceSinkFactory}.
+ */
+public class Kafka10TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFactoryTestBase {
+
+       @Override
+       protected String getKafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_10;
+       }
+
+       @Override
+       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer10.class;
+       }
+
+       @Override
+       protected Class<?> getExpectedFlinkKafkaProducer() {
+               return FlinkKafkaProducer10.class;
+       }
+
+       @Override
+       protected KafkaTableSource getExpectedKafkaTableSource(
+               TableSchema schema,
+               Optional<String> proctimeAttribute,
+               List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+               Map<String, String> fieldMapping,
+               String topic,
+               Properties properties,
+               DeserializationSchema<Row> deserializationSchema,
+               StartupMode startupMode,
+               Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka10TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets);
+       }
+
+       @Override
+       protected KafkaTableSink getExpectedKafkaTableSink(
+               TableSchema schema,
+               String topic,
+               Properties properties,
+               Optional<FlinkKafkaPartitioner<Row>> partitioner,
+               SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka10TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 00000000000..02d5d0a3a81
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.networking.NetworkFailuresProxy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.commons.collections.list.UnmodifiableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+
+import scala.collection.mutable.ArraySeq;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 1.0 .
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+       private File tmpZkDir;
+       private File tmpKafkaParent;
+       private List<File> tmpKafkaDirs;
+       private List<KafkaServer> brokers;
+       private TestingServer zookeeper;
+       private String zookeeperConnectionString;
+       private String brokerConnectionString = "";
+       private Properties standardProps;
+       private FlinkKafkaProducer10.Semantic producerSemantic = 
FlinkKafkaProducer10.Semantic.EXACTLY_ONCE;
+       // 6 seconds is default. Seems to be too small for travis. 30 seconds
+       private int zkTimeout = 30000;
+       private Config config;
+
+       public void setProducerSemantic(FlinkKafkaProducer10.Semantic 
producerSemantic) {
+               this.producerSemantic = producerSemantic;
+       }
+
+       @Override
+       public void prepare(Config config) {
+               //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
+               if (config.isSecureMode()) {
+                       //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
+                       config.setKafkaServersNumber(1);
+                       zkTimeout = zkTimeout * 15;
+               }
+               this.config = config;
+
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
+
+               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
+
+               tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+               for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
+                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
+                       tmpKafkaDirs.add(tmpDir);
+               }
+
+               zookeeper = null;
+               brokers = null;
+
+               try {
+                       zookeeper = new TestingServer(-1, tmpZkDir);
+                       zookeeperConnectionString = 
zookeeper.getConnectString();
+                       LOG.info("Starting Zookeeper with 
zookeeperConnectionString: {}", zookeeperConnectionString);
+
+                       LOG.info("Starting KafkaServer");
+                       brokers = new 
ArrayList<>(config.getKafkaServersNumber());
+
+                       ListenerName listenerName = 
ListenerName.forSecurityProtocol(config.isSecureMode() ? 
SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+                       for (int i = 0; i < config.getKafkaServersNumber(); 
i++) {
+                               KafkaServer kafkaServer = getKafkaServer(i, 
tmpKafkaDirs.get(i));
+                               brokers.add(kafkaServer);
+                               brokerConnectionString += 
hostAndPortToUrlString(KAFKA_HOST, 
kafkaServer.socketServer().boundPort(listenerName));
+                               brokerConnectionString +=  ",";
+                       }
+
+                       LOG.info("ZK and KafkaServer started.");
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       fail("Test setup failed: " + t.getMessage());
+               }
+
+               standardProps = new Properties();
+               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
+               standardProps.setProperty("group.id", "flink-tests");
+               standardProps.setProperty("enable.auto.commit", "false");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
+               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 1.0 value)
+               standardProps.setProperty("max.partition.fetch.bytes", "256"); 
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
+       }
+
+       @Override
+       public void deleteTestTopic(String topic) {
+               LOG.info("Deleting topic {}", topic);
+               try (AdminClient adminClient = 
AdminClient.create(getStandardProperties())) {
+                       
adminClient.deleteTopics(Collections.singleton(topic)).all().get();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Delete test topic : " + topic + " failed, " + 
e.getMessage());
+               }
+       }
+
+       @Override
+       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor, Properties properties) {
+               LOG.info("Creating topic {}", topic);
+               try (AdminClient adminClient = 
AdminClient.create(getStandardProperties())) {
+                       NewTopic topicObj = new NewTopic(topic, 
numberOfPartitions, (short) replicationFactor);
+                       
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Create test topic : " + topic + " failed, " + 
e.getMessage());
+               }
+       }
+
+       @Override
+       public Properties getStandardProperties() {
+               return standardProps;
+       }
+
+       @Override
+       public Properties getSecureProperties() {
+               Properties prop = new Properties();
+               if (config.isSecureMode()) {
+                       prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
+                       prop.put("security.protocol", "SASL_PLAINTEXT");
+                       prop.put("sasl.kerberos.service.name", "kafka");
+
+                       //add special timeout for Travis
+                       prop.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
+                       prop.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
+                       prop.setProperty("metadata.fetch.timeout.ms", "120000");
+               }
+               return prop;
+       }
+
+       @Override
+       public String getBrokerConnectionString() {
+               return brokerConnectionString;
+       }
+
+       @Override
+       public String getVersion() {
+               return "1.0";
+       }
+
+       @Override
+       public List<KafkaServer> getBrokers() {
+               return brokers;
+       }
+
+       @Override
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
+               return new FlinkKafkaConsumer10<T>(topics, readSchema, props);
+       }
+
+       @Override
+       public <K, V> Collection<ConsumerRecord<K, V>> 
getAllRecordsFromTopic(Properties properties, String topic, int partition, long 
timeout) {
+               List<ConsumerRecord<K, V>> result = new ArrayList<>();
+
+               try (KafkaConsumer<K, V> consumer = new 
KafkaConsumer<>(properties)) {
+                       consumer.assign(Arrays.asList(new TopicPartition(topic, 
partition)));
+
+                       while (true) {
+                               boolean processedAtLeastOneRecord = false;
+
+                               // wait for new records with timeout and break 
the loop if we didn't get any
+                               Iterator<ConsumerRecord<K, V>> iterator = 
consumer.poll(timeout).iterator();
+                               while (iterator.hasNext()) {
+                                       ConsumerRecord<K, V> record = 
iterator.next();
+                                       result.add(record);
+                                       processedAtLeastOneRecord = true;
+                               }
+
+                               if (!processedAtLeastOneRecord) {
+                                       break;
+                               }
+                       }
+                       consumer.commitSync();
+               }
+
+               return UnmodifiableList.decorate(result);
+       }
+
+       @Override
+       public <T> StreamSink<T> getProducerSink(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
+               return new StreamSink<>(new FlinkKafkaProducer10<T>(
+                       topic,
+                       serSchema,
+                       props,
+                       Optional.ofNullable(partitioner),
+                       producerSemantic,
+                       
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+       }
+
+       @Override
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
+               return stream.addSink(new FlinkKafkaProducer10<T>(
+                       topic,
+                       serSchema,
+                       props,
+                       Optional.ofNullable(partitioner),
+                       producerSemantic,
+                       
FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+       }
+
+       @Override
+       public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> 
stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+               FlinkKafkaProducer10<T> prod = new FlinkKafkaProducer10<T>(
+                       topic,
+                       serSchema,
+                       props,
+                       Optional.of(new FlinkFixedPartitioner<>()),
+                       producerSemantic,
+                       FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+               prod.setWriteTimestampToKafka(true);
+
+               return stream.addSink(prod);
+       }
+
+       @Override
+       public KafkaOffsetHandler createOffsetHandler() {
+               return new KafkaOffsetHandlerImpl();
+       }
+
+       @Override
+       public void restartBroker(int leaderId) throws Exception {
+               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
+       }
+
+       @Override
+       public int getLeaderToShutDown(String topic) throws Exception {
+               AdminClient client = 
AdminClient.create(getStandardProperties());
+               TopicDescription result = 
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
+               return result.partitions().get(0).leader().id();
+       }
+
+       @Override
+       public int getBrokerId(KafkaServer server) {
+               return server.config().brokerId();
+       }
+
+       @Override
+       public boolean isSecureRunSupported() {
+               return true;
+       }
+
+       @Override
+       public void shutdown() throws Exception {
+               for (KafkaServer broker : brokers) {
+                       if (broker != null) {
+                               broker.shutdown();
+                       }
+               }
+               brokers.clear();
+
+               if (zookeeper != null) {
+                       try {
+                               zookeeper.stop();
+                       }
+                       catch (Exception e) {
+                               LOG.warn("ZK.stop() failed", e);
+                       }
+                       zookeeper = null;
+               }
+
+               // clean up the temp spaces
+
+               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpKafkaParent);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+               if (tmpZkDir != null && tmpZkDir.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpZkDir);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+       }
+
+       protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) 
throws Exception {
+               Properties kafkaProperties = new Properties();
+
+               // properties have to be Strings
+               kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+               kafkaProperties.put("broker.id", Integer.toString(brokerId));
+               kafkaProperties.put("log.dir", tmpFolder.toString());
+               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
+               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
+               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
+               kafkaProperties.put("transaction.max.timeout.ms", 
Integer.toString(1000 * 60 * 60 * 2)); // 2hours
+
+               // for CI stability, increase zookeeper session timeout
+               kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+               kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
+               if (config.getKafkaServerProperties() != null) {
+                       
kafkaProperties.putAll(config.getKafkaServerProperties());
+               }
+
+               final int numTries = 5;
+
+               for (int i = 1; i <= numTries; i++) {
+                       int kafkaPort = NetUtils.getAvailablePort();
+                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+
+                       if (config.isHideKafkaBehindProxy()) {
+                               NetworkFailuresProxy proxy = 
createProxy(KAFKA_HOST, kafkaPort);
+                               kafkaProperties.put("advertised.port", 
proxy.getLocalPort());
+                       }
+
+                       //to support secure kafka cluster
+                       if (config.isSecureMode()) {
+                               LOG.info("Adding Kafka secure configurations");
+                               kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.putAll(getSecureProperties());
+                       }
+
+                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
+
+                       try {
+                               scala.Option<String> stringNone = 
scala.Option.apply(null);
+                               KafkaServer server = new 
KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new 
ArraySeq<KafkaMetricsReporter>(0));
+                               server.startup();
+                               return server;
+                       }
+                       catch (KafkaException e) {
+                               if (e.getCause() instanceof BindException) {
+                                       // port conflict, retry...
+                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
+                               }
+                               else {
+                                       throw e;
+                               }
+                       }
+               }
+
+               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
+       }
+
+       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+               private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+               public KafkaOffsetHandlerImpl() {
+                       Properties props = new Properties();
+                       props.putAll(standardProps);
+                       props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                       props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+                       offsetClient = new KafkaConsumer<>(props);
+               }
+
+               @Override
+               public Long getCommittedOffset(String topicName, int partition) 
{
+                       OffsetAndMetadata committed = 
offsetClient.committed(new TopicPartition(topicName, partition));
+                       return (committed != null) ? committed.offset() : null;
+               }
+
+               @Override
+               public void setCommittedOffset(String topicName, int partition, 
long offset) {
+                       Map<TopicPartition, OffsetAndMetadata> 
partitionAndOffset = new HashMap<>();
+                       partitionAndOffset.put(new TopicPartition(topicName, 
partition), new OffsetAndMetadata(offset));
+                       offsetClient.commitSync(partitionAndOffset);
+               }
+
+               @Override
+               public void close() {
+                       offsetClient.close();
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..fbeb110350f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index cad37f8f8cd..ca308d13ea9 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -36,6 +36,7 @@
        public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
        public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
        public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
+       public static final String CONNECTOR_VERSION_VALUE_10 = "1.0";
        public static final String CONNECTOR_TOPIC = "connector.topic";
        public static final String CONNECTOR_STARTUP_MODE = 
"connector.startup-mode";
        public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
@@ -73,7 +74,8 @@ private void validateVersion(DescriptorProperties properties) 
{
                        CONNECTOR_VERSION_VALUE_08,
                        CONNECTOR_VERSION_VALUE_09,
                        CONNECTOR_VERSION_VALUE_010,
-                       CONNECTOR_VERSION_VALUE_011);
+                       CONNECTOR_VERSION_VALUE_011,
+                       CONNECTOR_VERSION_VALUE_10);
                properties.validateEnumValues(CONNECTOR_VERSION(), false, 
versions);
                properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
        }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7d88f0d94ea..c0df351c625 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -184,7 +184,10 @@ public void runFailOnNoBrokerTest() throws Exception {
                        stream.print();
                        see.execute("No broker test");
                } catch (JobExecutionException jee) {
-                       if (kafkaServer.getVersion().equals("0.9") || 
kafkaServer.getVersion().equals("0.10") || 
kafkaServer.getVersion().equals("0.11")) {
+                       if (kafkaServer.getVersion().equals("0.9") ||
+                               kafkaServer.getVersion().equals("0.10") ||
+                               kafkaServer.getVersion().equals("0.11") ||
+                               kafkaServer.getVersion().equals("1.0")) {
                                assertTrue(jee.getCause() instanceof 
TimeoutException);
 
                                TimeoutException te = (TimeoutException) 
jee.getCause();
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index cacea91578e..87a90683988 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -56,6 +56,7 @@ under the License.
                <module>flink-connector-nifi</module>
                <module>flink-connector-cassandra</module>
                <module>flink-connector-filesystem</module>
+               <module>flink-connector-kafka-1.0</module>
        </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
index 211d374de58..8b69e750a9c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
@@ -46,7 +46,7 @@ object ConnectorDescriptorValidator {
 
   /**
     * Key for describing the version of the connector. This property can be 
used for different
-    * connector versions (e.g. Kafka 0.8 or Kafka 0.11).
+    * connector versions (e.g. Kafka 0.8 or Kafka 1.0).
     */
   val CONNECTOR_VERSION = "connector.version"
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Apache Kafka 1.0/1.1 connectors
> -----------------------------------
>
>                 Key: FLINK-7964
>                 URL: https://issues.apache.org/jira/browse/FLINK-7964
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: Hai Zhou
>            Assignee: vinoyang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project 
> Management Committee has packed a number of valuable enhancements into the 
> release. Here is a summary of a few of them:
> * Since its introduction in version 0.10, the Streams API has become hugely 
> popular among Kafka users, including the likes of Pinterest, Rabobank, 
> Zalando, and The New York Times. In 1.0, the the API continues to evolve at a 
> healthy pace. To begin with, the builder API has been improved (KIP-120). A 
> new API has been added to expose the state of active tasks at runtime 
> (KIP-130). The new cogroup API makes it much easier to deal with partitioned 
> aggregates with fewer StateStores and fewer moving parts in your code 
> (KIP-150). Debuggability gets easier with enhancements to the print() and 
> writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 
> and KIP-161 too. For more on streams, check out the Apache Kafka Streams 
> documentation, including some helpful new tutorial videos.
> * Operating Kafka at scale requires that the system remain observable, and to 
> make that easier, we’ve made a number of improvements to metrics. These are 
> too many to summarize without becoming tedious, but Connect metrics have been 
> significantly improved (KIP-196), a litany of new health check metrics are 
> now exposed (KIP-188), and we now have a global topic and partition count 
> (KIP-168). Check out KIP-164 and KIP-187 for even more.
> * We now support Java 9, leading, among other things, to significantly faster 
> TLS and CRC32C implementations. Over-the-wire encryption will be faster now, 
> which will keep Kafka fast and compute costs low when encryption is enabled.
> * In keeping with the security theme, KIP-152 cleans up the error handling on 
> Simple Authentication Security Layer (SASL) authentication attempts. 
> Previously, some authentication error conditions were indistinguishable from 
> broker failures and were not logged in a clear way. This is cleaner now.
> * Kafka can now tolerate disk failures better. Historically, JBOD storage 
> configurations have not been recommended, but the architecture has 
> nevertheless been tempting: after all, why not rely on Kafka’s own 
> replication mechanism to protect against storage failure rather than using 
> RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single 
> disk failure in a JBOD broker will not bring the entire broker down; rather, 
> the broker will continue serving any log files that remain on functioning 
> disks.
> * Since release 0.11.0, the idempotent producer (which is the producer used 
> in the presence of a transaction, which of course is the producer we use for 
> exactly-once processing) required max.in.flight.requests.per.connection to be 
> equal to one. As anyone who has written or tested a wire protocol can attest, 
> this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be 
> as large as five, relaxing the throughput constraint quite a bit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to