http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
new file mode 100644
index 0000000..6d259fa
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 0.11 .
+ */
+public class Kafka011ITCase extends KafkaConsumerTestBase {
+
+       @BeforeClass
+       public static void prepare() throws ClassNotFoundException {
+               KafkaProducerTestBase.prepare();
+               ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Suite of Tests
+       // 
------------------------------------------------------------------------
+
+       @Test(timeout = 60000)
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testConcurrentProducerConsumerTopology() throws Exception {
+               runSimpleConcurrentProducerConsumerTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testKeyValueSupport() throws Exception {
+               runKeyValueTest();
+       }
+
+       // --- canceling / failures ---
+
+       @Test(timeout = 60000)
+       public void testCancelingEmptyTopic() throws Exception {
+               runCancelingOnEmptyInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testCancelingFullTopic() throws Exception {
+               runCancelingOnFullInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnDeploy() throws Exception {
+               runFailOnDeployTest();
+       }
+
+       // --- source to partition mappings and exactly once ---
+
+       @Test(timeout = 60000)
+       public void testOneToOneSources() throws Exception {
+               runOneToOneExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testOneSourceMultiplePartitions() throws Exception {
+               runOneSourceMultiplePartitionsExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleSourcesOnePartition() throws Exception {
+               runMultipleSourcesOnePartitionExactlyOnceTest();
+       }
+
+       // --- broker failure ---
+
+       @Test(timeout = 60000)
+       public void testBrokerFailure() throws Exception {
+               runBrokerFailureTest();
+       }
+
+       // --- special executions ---
+
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMetricsAndEndOfStream() throws Exception {
+               runEndOfStreamTest();
+       }
+
+       // --- startup mode ---
+
+       @Test(timeout = 60000)
+       public void testStartFromEarliestOffsets() throws Exception {
+               runStartFromEarliestOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromLatestOffsets() throws Exception {
+               runStartFromLatestOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromGroupOffsets() throws Exception {
+               runStartFromGroupOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromSpecificOffsets() throws Exception {
+               runStartFromSpecificOffsets();
+       }
+
+       // --- offset committing ---
+
+       @Test(timeout = 60000)
+       public void testCommitOffsetsToKafka() throws Exception {
+               runCommitOffsetsToKafka();
+       }
+
+       @Test(timeout = 60000)
+       public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+               runAutoOffsetRetrievalAndCommitToKafka();
+       }
+
+       /**
+        * Kafka 0.11 specific test, ensuring Timestamps are properly written 
to and read from Kafka.
+        */
+       @Test(timeout = 60000)
+       public void testTimestamps() throws Exception {
+
+               final String topic = "tstopic";
+               createTestTopic(topic, 3, 1);
+
+               // ---------- Produce an event time stream into Kafka 
-------------------
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Long> streamWithTimestamps = env.addSource(new 
SourceFunction<Long>() {
+                       private static final long serialVersionUID = 
-2255115836471289626L;
+                       boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<Long> ctx) throws 
Exception {
+                               long i = 0;
+                               while (running) {
+                                       ctx.collectWithTimestamp(i, i * 2);
+                                       if (i++ == 1110L) {
+                                               running = false;
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               });
+
+               final TypeInformationSerializationSchema<Long> longSer = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), 
env.getConfig());
+               FlinkKafkaProducer011<Long> prod = new 
FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), 
standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() {
+                       private static final long serialVersionUID = 
-6730989584364230617L;
+
+                       @Override
+                       public int partition(Long next, byte[] key, byte[] 
value, String targetTopic, int[] partitions) {
+                               return (int) (next % 3);
+                       }
+               }));
+               prod.setWriteTimestampToKafka(true);
+
+               streamWithTimestamps.addSink(prod).setParallelism(3);
+
+               env.execute("Produce some");
+
+               // ---------- Consume stream from Kafka -------------------
+
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               FlinkKafkaConsumer011<Long> kafkaSource = new 
FlinkKafkaConsumer011<>(topic, new LimitedLongDeserializer(), standardProps);
+               kafkaSource.assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Long>() {
+                       private static final long serialVersionUID = 
-4834111173247835189L;
+
+                       @Nullable
+                       @Override
+                       public Watermark checkAndGetNextWatermark(Long 
lastElement, long extractedTimestamp) {
+                               if (lastElement % 11 == 0) {
+                                       return new Watermark(lastElement);
+                               }
+                               return null;
+                       }
+
+                       @Override
+                       public long extractTimestamp(Long element, long 
previousElementTimestamp) {
+                               return previousElementTimestamp;
+                       }
+               });
+
+               DataStream<Long> stream = env.addSource(kafkaSource);
+               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
+               stream.transform("timestamp validating operator", 
objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+               env.execute("Consume again");
+
+               deleteTestTopic(topic);
+       }
+
+       private static class TimestampValidatingOperator extends 
StreamSink<Long> {
+
+               private static final long serialVersionUID = 
1353168781235526806L;
+
+               public TimestampValidatingOperator() {
+                       super(new SinkFunction<Long>() {
+                               private static final long serialVersionUID = 
-6676565693361786524L;
+
+                               @Override
+                               public void invoke(Long value) throws Exception 
{
+                                       throw new 
RuntimeException("Unexpected");
+                               }
+                       });
+               }
+
+               long elCount = 0;
+               long wmCount = 0;
+               long lastWM = Long.MIN_VALUE;
+
+               @Override
+               public void processElement(StreamRecord<Long> element) throws 
Exception {
+                       elCount++;
+                       if (element.getValue() * 2 != element.getTimestamp()) {
+                               throw new RuntimeException("Invalid timestamp: 
" + element);
+                       }
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+                       wmCount++;
+
+                       if (lastWM <= mark.getTimestamp()) {
+                               lastWM = mark.getTimestamp();
+                       } else {
+                               throw new RuntimeException("Received watermark 
higher than the last one");
+                       }
+
+                       if (mark.getTimestamp() % 11 != 0 && 
mark.getTimestamp() != Long.MAX_VALUE) {
+                               throw new RuntimeException("Invalid watermark: 
" + mark.getTimestamp());
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       if (elCount != 1110L) {
+                               throw new RuntimeException("Wrong final element 
count " + elCount);
+                       }
+
+                       if (wmCount <= 2) {
+                               throw new RuntimeException("Almost no 
watermarks have been sent " + wmCount);
+                       }
+               }
+       }
+
+       private static class LimitedLongDeserializer implements 
KeyedDeserializationSchema<Long> {
+
+               private static final long serialVersionUID = 
6966177118923713521L;
+               private final TypeInformation<Long> ti;
+               private final TypeSerializer<Long> ser;
+               long cnt = 0;
+
+               public LimitedLongDeserializer() {
+                       this.ti = TypeInfoParser.parse("Long");
+                       this.ser = ti.createSerializer(new ExecutionConfig());
+               }
+
+               @Override
+               public TypeInformation<Long> getProducedType() {
+                       return ti;
+               }
+
+               @Override
+               public Long deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
+                       cnt++;
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       Long e = ser.deserialize(in);
+                       return e;
+               }
+
+               @Override
+               public boolean isEndOfStream(Long nextElement) {
+                       return cnt > 1110L;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
new file mode 100644
index 0000000..c2e256c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka011JsonTableSource}.
+ */
+public class Kafka011JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+               return new Kafka011JsonTableSource(topic, properties, typeInfo);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) JsonRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer011.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
new file mode 100644
index 0000000..ad63662
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class Kafka011ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
+
+       @BeforeClass
+       public static void prepare() throws ClassNotFoundException {
+               KafkaProducerTestBase.prepare();
+               ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
+       }
+
+       @Override
+       public void testExactlyOnceRegularSink() throws Exception {
+               // disable test for at least once semantic
+       }
+
+       @Override
+       public void testExactlyOnceCustomOperator() throws Exception {
+               // disable test for at least once semantic
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
new file mode 100644
index 0000000..1167238
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
+       @BeforeClass
+       public static void prepare() throws ClassNotFoundException {
+               KafkaProducerTestBase.prepare();
+               ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+       }
+
+       @Override
+       public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+               // TODO: fix this test
+               // currently very often (~50% cases) KafkaProducer live locks 
itself on commitTransaction call.
+               // Somehow Kafka 0.11 doesn't play along with 
NetworkFailureProxy. This can either mean a bug in Kafka
+               // that it doesn't work well with some weird network failures, 
or the NetworkFailureProxy is a broken design
+               // and this test should be reimplemented in completely 
different way...
+       }
+
+       @Override
+       public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+               // TODO: fix this test
+               // currently very often (~50% cases) KafkaProducer live locks 
itself on commitTransaction call.
+               // Somehow Kafka 0.11 doesn't play along with 
NetworkFailureProxy. This can either mean a bug in Kafka
+               // that it doesn't work well with some weird network failures, 
or the NetworkFailureProxy is a broken design
+               // and this test should be reimplemented in completely 
different way...
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..e81148b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.networking.NetworkFailuresProxy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+
+import scala.collection.mutable.ArraySeq;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.11 .
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+       private File tmpZkDir;
+       private File tmpKafkaParent;
+       private List<File> tmpKafkaDirs;
+       private List<KafkaServer> brokers;
+       private TestingServer zookeeper;
+       private String zookeeperConnectionString;
+       private String brokerConnectionString = "";
+       private Properties standardProps;
+       private FlinkKafkaProducer011.Semantic producerSemantic = 
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE;
+       // 6 seconds is default. Seems to be too small for travis. 30 seconds
+       private int zkTimeout = 30000;
+       private Config config;
+
+       public String getBrokerConnectionString() {
+               return brokerConnectionString;
+       }
+
+       public void setProducerSemantic(FlinkKafkaProducer011.Semantic 
producerSemantic) {
+               this.producerSemantic = producerSemantic;
+       }
+
+       @Override
+       public Properties getStandardProperties() {
+               return standardProps;
+       }
+
+       @Override
+       public Properties getSecureProperties() {
+               Properties prop = new Properties();
+               if (config.isSecureMode()) {
+                       prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
+                       prop.put("security.protocol", "SASL_PLAINTEXT");
+                       prop.put("sasl.kerberos.service.name", "kafka");
+
+                       //add special timeout for Travis
+                       prop.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
+                       prop.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
+                       prop.setProperty("metadata.fetch.timeout.ms", "120000");
+               }
+               return prop;
+       }
+
+       @Override
+       public String getVersion() {
+               return "0.11";
+       }
+
+       @Override
+       public List<KafkaServer> getBrokers() {
+               return brokers;
+       }
+
+       @Override
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
+               return new FlinkKafkaConsumer011<>(topics, readSchema, props);
+       }
+
+       @Override
+       public <K, V> Collection<ConsumerRecord<K, V>> 
getAllRecordsFromTopic(Properties properties, String topic, int partition, long 
timeout) {
+               List<ConsumerRecord<K, V>> result = new ArrayList<>();
+
+               try (KafkaConsumer<K, V> consumer = new 
KafkaConsumer<>(properties)) {
+                       consumer.assign(Arrays.asList(new TopicPartition(topic, 
partition)));
+
+                       while (true) {
+                               boolean processedAtLeastOneRecord = false;
+
+                               // wait for new records with timeout and break 
the loop if we didn't get any
+                               Iterator<ConsumerRecord<K, V>> iterator = 
consumer.poll(timeout).iterator();
+                               while (iterator.hasNext()) {
+                                       ConsumerRecord<K, V> record = 
iterator.next();
+                                       result.add(record);
+                                       processedAtLeastOneRecord = true;
+                               }
+
+                               if (!processedAtLeastOneRecord) {
+                                       break;
+                               }
+                       }
+                       consumer.commitSync();
+               }
+
+               return UnmodifiableList.decorate(result);
+       }
+
+       @Override
+       public <T> StreamSink<T> getProducerSink(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
+               return new StreamSink<>(new FlinkKafkaProducer011<>(
+                       topic,
+                       serSchema,
+                       props,
+                       Optional.ofNullable(partitioner),
+                       producerSemantic,
+                       
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+       }
+
+       @Override
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
+               return stream.addSink(new FlinkKafkaProducer011<>(
+                       topic,
+                       serSchema,
+                       props,
+                       Optional.ofNullable(partitioner),
+                       producerSemantic,
+                       
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+       }
+
+       @Override
+       public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> 
stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+               FlinkKafkaProducer011<T> prod = new FlinkKafkaProducer011<>(
+                       topic, serSchema, props, Optional.of(new 
FlinkFixedPartitioner<>()), producerSemantic, 
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+               prod.setWriteTimestampToKafka(true);
+
+               return stream.addSink(prod);
+       }
+
+       @Override
+       public KafkaOffsetHandler createOffsetHandler() {
+               return new KafkaOffsetHandlerImpl();
+       }
+
+       @Override
+       public void restartBroker(int leaderId) throws Exception {
+               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
+       }
+
+       @Override
+       public int getLeaderToShutDown(String topic) throws Exception {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       MetadataResponse.PartitionMetadata firstPart = null;
+                       do {
+                               if (firstPart != null) {
+                                       LOG.info("Unable to find leader. error 
code {}", firstPart.error().code());
+                                       // not the first try. Sleep a bit
+                                       Thread.sleep(150);
+                               }
+
+                               List<MetadataResponse.PartitionMetadata> 
partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, 
zkUtils).partitionMetadata();
+                               firstPart = partitionMetadata.get(0);
+                       }
+                       while (firstPart.error().code() != 0);
+
+                       return firstPart.leader().id();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       @Override
+       public int getBrokerId(KafkaServer server) {
+               return server.config().brokerId();
+       }
+
+       @Override
+       public boolean isSecureRunSupported() {
+               return true;
+       }
+
+       @Override
+       public void prepare(Config config) {
+               //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
+               if (config.isSecureMode()) {
+                       //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
+                       config.setKafkaServersNumber(1);
+                       zkTimeout = zkTimeout * 15;
+               }
+               this.config = config;
+
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
+
+               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
+
+               tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+               for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
+                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
+                       tmpKafkaDirs.add(tmpDir);
+               }
+
+               zookeeper = null;
+               brokers = null;
+
+               try {
+                       zookeeper = new TestingServer(-1, tmpZkDir);
+                       zookeeperConnectionString = 
zookeeper.getConnectString();
+                       LOG.info("Starting Zookeeper with 
zookeeperConnectionString: {}", zookeeperConnectionString);
+
+                       LOG.info("Starting KafkaServer");
+                       brokers = new 
ArrayList<>(config.getKafkaServersNumber());
+
+                       ListenerName listenerName = 
ListenerName.forSecurityProtocol(config.isSecureMode() ? 
SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+                       for (int i = 0; i < config.getKafkaServersNumber(); 
i++) {
+                               KafkaServer kafkaServer = getKafkaServer(i, 
tmpKafkaDirs.get(i));
+                               brokers.add(kafkaServer);
+                               brokerConnectionString += 
hostAndPortToUrlString(KAFKA_HOST, 
kafkaServer.socketServer().boundPort(listenerName));
+                               brokerConnectionString +=  ",";
+                       }
+
+                       LOG.info("ZK and KafkaServer started.");
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       fail("Test setup failed: " + t.getMessage());
+               }
+
+               standardProps = new Properties();
+               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
+               standardProps.setProperty("group.id", "flink-tests");
+               standardProps.setProperty("enable.auto.commit", "false");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
+               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.11 value)
+               standardProps.setProperty("max.partition.fetch.bytes", "256"); 
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
+       }
+
+       @Override
+       public void shutdown() {
+               for (KafkaServer broker : brokers) {
+                       if (broker != null) {
+                               broker.shutdown();
+                       }
+               }
+               brokers.clear();
+
+               if (zookeeper != null) {
+                       try {
+                               zookeeper.stop();
+                       }
+                       catch (Exception e) {
+                               LOG.warn("ZK.stop() failed", e);
+                       }
+                       zookeeper = null;
+               }
+
+               // clean up the temp spaces
+
+               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpKafkaParent);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+               if (tmpZkDir != null && tmpZkDir.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpZkDir);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+       }
+
+       public ZkUtils getZkUtils() {
+               ZkClient creator = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
+               return ZkUtils.apply(creator, false);
+       }
+
+       @Override
+       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor, Properties topicConfig) {
+               // create topic with one client
+               LOG.info("Creating topic {}", topic);
+
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       AdminUtils.createTopic(zkUtils, topic, 
numberOfPartitions, replicationFactor, topicConfig, 
kafka.admin.RackAwareMode.Enforced$.MODULE$);
+               } finally {
+                       zkUtils.close();
+               }
+
+               // validate that the topic has been created
+               final long deadline = System.nanoTime() + 30_000_000_000L;
+               do {
+                       try {
+                               if (config.isSecureMode()) {
+                                       //increase wait time since in Travis ZK 
timeout occurs frequently
+                                       int wait = zkTimeout / 100;
+                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
+                                       Thread.sleep(wait);
+                               } else {
+                                       Thread.sleep(100);
+                               }
+                       } catch (InterruptedException e) {
+                               // restore interrupted state
+                       }
+                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
+                       // not always correct.
+
+                       // create a new ZK utils connection
+                       ZkUtils checkZKConn = getZkUtils();
+                       if (AdminUtils.topicExists(checkZKConn, topic)) {
+                               checkZKConn.close();
+                               return;
+                       }
+                       checkZKConn.close();
+               }
+               while (System.nanoTime() < deadline);
+               fail("Test topic could not be created");
+       }
+
+       @Override
+       public void deleteTestTopic(String topic) {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       LOG.info("Deleting topic {}", topic);
+
+                       ZkClient zk = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
+
+                       AdminUtils.deleteTopic(zkUtils, topic);
+
+                       zk.close();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       /**
+        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
+        */
+       protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) 
throws Exception {
+               Properties kafkaProperties = new Properties();
+
+               // properties have to be Strings
+               kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+               kafkaProperties.put("broker.id", Integer.toString(brokerId));
+               kafkaProperties.put("log.dir", tmpFolder.toString());
+               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
+               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
+               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
+               kafkaProperties.put("transaction.max.timeout.ms", 
Integer.toString(1000 * 60 * 60 * 2)); // 2hours
+
+               // for CI stability, increase zookeeper session timeout
+               kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+               kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
+               if (config.getKafkaServerProperties() != null) {
+                       
kafkaProperties.putAll(config.getKafkaServerProperties());
+               }
+
+               final int numTries = 5;
+
+               for (int i = 1; i <= numTries; i++) {
+                       int kafkaPort = NetUtils.getAvailablePort();
+                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+
+                       if (config.isHideKafkaBehindProxy()) {
+                               NetworkFailuresProxy proxy = 
createProxy(KAFKA_HOST, kafkaPort);
+                               kafkaProperties.put("advertised.port", 
proxy.getLocalPort());
+                       }
+
+                       //to support secure kafka cluster
+                       if (config.isSecureMode()) {
+                               LOG.info("Adding Kafka secure configurations");
+                               kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.putAll(getSecureProperties());
+                       }
+
+                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
+
+                       try {
+                               scala.Option<String> stringNone = 
scala.Option.apply(null);
+                               KafkaServer server = new 
KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new 
ArraySeq<KafkaMetricsReporter>(0));
+                               server.startup();
+                               return server;
+                       }
+                       catch (KafkaException e) {
+                               if (e.getCause() instanceof BindException) {
+                                       // port conflict, retry...
+                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
+                               }
+                               else {
+                                       throw e;
+                               }
+                       }
+               }
+
+               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
+       }
+
+       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+               private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+               public KafkaOffsetHandlerImpl() {
+                       Properties props = new Properties();
+                       props.putAll(standardProps);
+                       props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                       props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+                       offsetClient = new KafkaConsumer<>(props);
+               }
+
+               @Override
+               public Long getCommittedOffset(String topicName, int partition) 
{
+                       OffsetAndMetadata committed = 
offsetClient.committed(new TopicPartition(topicName, partition));
+                       return (committed != null) ? committed.offset() : null;
+               }
+
+               @Override
+               public void setCommittedOffset(String topicName, int partition, 
long offset) {
+                       Map<TopicPartition, OffsetAndMetadata> 
partitionAndOffset = new HashMap<>();
+                       partitionAndOffset.put(new TopicPartition(topicName, 
partition), new OffsetAndMetadata(offset));
+                       offsetClient.commitSync(partitionAndOffset);
+               }
+
+               @Override
+               public void close() {
+                       offsetClient.close();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index 681fe02..c3c9c07 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -23,6 +23,15 @@ package org.apache.flink.streaming.connectors.kafka;
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
+       @Override
+       public void testExactlyOnceRegularSink() throws Exception {
+               // Kafka08 does not support exactly once semantic
+       }
+
+       @Override
+       public void testExactlyOnceCustomOperator() throws Exception {
+               // Kafka08 does not support exactly once semantic
+       }
 
        @Override
        public void testOneToOneAtLeastOnceRegularSink() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index 847f818..b34132f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -24,6 +24,16 @@ package org.apache.flink.streaming.connectors.kafka;
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
        @Override
+       public void testExactlyOnceRegularSink() throws Exception {
+               // Kafka08 does not support exactly once semantic
+       }
+
+       @Override
+       public void testExactlyOnceCustomOperator() throws Exception {
+               // Kafka08 does not support exactly once semantic
+       }
+
+       @Override
        public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
                // Disable this test since FlinkKafka09Producer doesn't support 
custom operator mode
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index fda6832..e9a0331 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -174,7 +174,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        stream.print();
                        see.execute("No broker test");
                } catch (JobExecutionException jee) {
-                       if (kafkaServer.getVersion().equals("0.9") || 
kafkaServer.getVersion().equals("0.10")) {
+                       if (kafkaServer.getVersion().equals("0.9") || 
kafkaServer.getVersion().equals("0.10") || 
kafkaServer.getVersion().equals("0.11")) {
                                assertTrue(jee.getCause() instanceof 
TimeoutException);
 
                                TimeoutException te = (TimeoutException) 
jee.getCause();

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 35607dd..e1ba074 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -38,26 +38,25 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -295,38 +294,79 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
        }
 
        /**
-        * We manually handle the timeout instead of using JUnit's timeout to 
return failure instead of timeout error.
-        * After timeout we assume that there are missing records and there is 
a bug, not that the test has run out of time.
+        * Tests the exactly-once semantic for the simple writes into Kafka.
         */
-       private void assertAtLeastOnceForTopic(
-                       Properties properties,
-                       String topic,
-                       int partition,
-                       Set<Integer> expectedElements,
-                       long timeoutMillis) throws Exception {
-
-               long startMillis = System.currentTimeMillis();
-               Set<Integer> actualElements = new HashSet<>();
-
-               // until we timeout...
-               while (System.currentTimeMillis() < startMillis + 
timeoutMillis) {
-                       properties.put("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-                       properties.put("value.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-
-                       // query kafka for new records ...
-                       Collection<ConsumerRecord<Integer, Integer>> records = 
kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
-
-                       for (ConsumerRecord<Integer, Integer> record : records) 
{
-                               actualElements.add(record.value());
-                       }
+       @Test
+       public void testExactlyOnceRegularSink() throws Exception {
+               testExactlyOnce(true);
+       }
+
+       /**
+        * Tests the exactly-once semantic for the simple writes into Kafka.
+        */
+       @Test
+       public void testExactlyOnceCustomOperator() throws Exception {
+               testExactlyOnce(false);
+       }
+
+       /**
+        * This test sets KafkaProducer so that it will  automatically flush 
the data and
+        * and fails the broker to check whether flushed records since last 
checkpoint were not duplicated.
+        */
+       protected void testExactlyOnce(boolean regularSink) throws Exception {
+               final String topic = regularSink ? 
"exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator";
+               final int partition = 0;
+               final int numElements = 1000;
+               final int failAfterElements = 333;
+
+               createTestTopic(topic, 1, 1);
+
+               TypeInformationSerializationSchema<Integer> schema = new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
+               KeyedSerializationSchema<Integer> keyedSerializationSchema = 
new KeyedSerializationSchemaWrapper(schema);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(500);
+               env.setParallelism(1);
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0));
+               env.getConfig().disableSysoutLogging();
+
+               Properties properties = new Properties();
+               properties.putAll(standardProps);
+               properties.putAll(secureProps);
 
-                       // succeed if we got all expectedElements
-                       if (actualElements.containsAll(expectedElements)) {
-                               return;
+               // process exactly failAfterElements number of elements and 
then shutdown Kafka broker and fail application
+               List<Integer> expectedElements = 
getIntegersSequence(numElements);
+
+               DataStream<Integer> inputStream = env
+                       .addSource(new IntegerSource(numElements))
+                       .map(new 
FailingIdentityMapper<Integer>(failAfterElements));
+
+               FlinkKafkaPartitioner<Integer> partitioner = new 
FlinkKafkaPartitioner<Integer>() {
+                       @Override
+                       public int partition(Integer record, byte[] key, byte[] 
value, String targetTopic, int[] partitions) {
+                               return partition;
                        }
+               };
+               if (regularSink) {
+                       StreamSink<Integer> kafkaSink = 
kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, 
partitioner);
+                       inputStream.addSink(kafkaSink.getUserFunction());
+               }
+               else {
+                       kafkaServer.produceIntoKafka(inputStream, topic, 
keyedSerializationSchema, properties, partitioner);
                }
 
-               fail(String.format("Expected to contain all of: <%s>, but was: 
<%s>", expectedElements, actualElements));
+               FailingIdentityMapper.failedBefore = false;
+               TestUtils.tryExecute(env, "Exactly once test");
+
+               // assert that before failure we successfully snapshot/flushed 
all expected elements
+               assertExactlyOnceForTopic(
+                       properties,
+                       topic,
+                       partition,
+                       expectedElements,
+                       30000L);
+
+               deleteTestTopic(topic);
        }
 
        private List<Integer> getIntegersSequence(int size) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f8792e5..fcdb59b 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -39,11 +40,18 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.fail;
+
 /**
  * The base for the Kafka tests. It brings up:
  * <ul>
@@ -209,4 +217,80 @@ public abstract class KafkaTestBase extends TestLogger {
                kafkaServer.deleteTestTopic(topic);
        }
 
+       /**
+        * We manually handle the timeout instead of using JUnit's timeout to 
return failure instead of timeout error.
+        * After timeout we assume that there are missing records and there is 
a bug, not that the test has run out of time.
+        */
+       protected void assertAtLeastOnceForTopic(
+                       Properties properties,
+                       String topic,
+                       int partition,
+                       Set<Integer> expectedElements,
+                       long timeoutMillis) throws Exception {
+
+               long startMillis = System.currentTimeMillis();
+               Set<Integer> actualElements = new HashSet<>();
+
+               // until we timeout...
+               while (System.currentTimeMillis() < startMillis + 
timeoutMillis) {
+                       properties.put("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
+                       properties.put("value.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
+
+                       // query kafka for new records ...
+                       Collection<ConsumerRecord<Integer, Integer>> records = 
kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
+
+                       for (ConsumerRecord<Integer, Integer> record : records) 
{
+                               actualElements.add(record.value());
+                       }
+
+                       // succeed if we got all expectedElements
+                       if (actualElements.containsAll(expectedElements)) {
+                               return;
+                       }
+               }
+
+               fail(String.format("Expected to contain all of: <%s>, but was: 
<%s>", expectedElements, actualElements));
+       }
+
+       /**
+        * We manually handle the timeout instead of using JUnit's timeout to 
return failure instead of timeout error.
+        * After timeout we assume that there are missing records and there is 
a bug, not that the test has run out of time.
+        */
+       protected void assertExactlyOnceForTopic(
+                       Properties properties,
+                       String topic,
+                       int partition,
+                       List<Integer> expectedElements,
+                       long timeoutMillis) throws Exception {
+
+               long startMillis = System.currentTimeMillis();
+               List<Integer> actualElements = new ArrayList<>();
+
+               Properties consumerProperties = new Properties();
+               consumerProperties.putAll(properties);
+               consumerProperties.put("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
+               consumerProperties.put("value.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
+               consumerProperties.put("isolation.level", "read_committed");
+
+               // until we timeout...
+               while (System.currentTimeMillis() < startMillis + 
timeoutMillis) {
+                       // query kafka for new records ...
+                       Collection<ConsumerRecord<Integer, Integer>> records = 
kafkaServer.getAllRecordsFromTopic(consumerProperties, topic, partition, 1000);
+
+                       for (ConsumerRecord<Integer, Integer> record : records) 
{
+                               actualElements.add(record.value());
+                       }
+
+                       // succeed if we got all expectedElements
+                       if (actualElements.equals(expectedElements)) {
+                               return;
+                       }
+                       // fail early if we already have too many elements
+                       if (actualElements.size() > expectedElements.size()) {
+                               break;
+                       }
+               }
+
+               fail(String.format("Expected number of elements: <%s>, but was: 
<%s>", expectedElements.size(), actualElements.size()));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
new file mode 100644
index 0000000..ef50766
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A Flink source that servers integers, but it completes only after a 
completed checkpoint after serving
+ * all of the elements.
+ */
+public class IntegerSource
+       extends RichParallelSourceFunction<Integer>
+       implements ListCheckpointed<Integer>, CheckpointListener {
+
+       /**
+        * Blocker when the generator needs to wait for the checkpoint to 
happen.
+        * Eager initialization means it must be serializable (pick any 
serializable type).
+        */
+       private final Object blocker = new SerializableObject();
+
+       /**
+        * The total number of events to generate.
+        */
+       private final int numEventsTotal;
+
+       /**
+        * The current position in the sequence of numbers.
+        */
+       private int currentPosition = -1;
+
+       private long lastCheckpointTriggered;
+
+       private long lastCheckpointConfirmed;
+
+       private boolean restored;
+
+       private volatile boolean running = true;
+
+       public IntegerSource(int numEventsTotal) {
+               this.numEventsTotal = numEventsTotal;
+       }
+
+       @Override
+       public void run(SourceContext<Integer> ctx) throws Exception {
+
+               // each source subtask emits only the numbers where (num % 
parallelism == subtask_index)
+               final int stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
+               int current = this.currentPosition >= 0 ? this.currentPosition 
: getRuntimeContext().getIndexOfThisSubtask();
+
+               while (this.running && current < this.numEventsTotal) {
+                       // emit the next element
+                       synchronized (ctx.getCheckpointLock()) {
+                               ctx.collect(current);
+                               current += stepSize;
+                               this.currentPosition = current;
+                       }
+                       // give some time to trigger checkpoint while we are 
not holding the lock (to prevent starvation)
+                       if (!restored && current % 10 == 0) {
+                               Thread.sleep(1);
+                       }
+               }
+
+               // after we are done, we need to wait for two more checkpoint 
to complete
+               // before finishing the program - that is to be on the safe 
side that
+               // the sink also got the "commit" notification for all relevant 
checkpoints
+               // and committed the data
+               final long lastCheckpoint;
+               synchronized (ctx.getCheckpointLock()) {
+                       lastCheckpoint = this.lastCheckpointTriggered;
+               }
+
+               synchronized (this.blocker) {
+                       while (this.lastCheckpointConfirmed <= lastCheckpoint + 
1) {
+                               this.blocker.wait();
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               this.running = false;
+       }
+
+       @Override
+       public List<Integer> snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+               this.lastCheckpointTriggered = checkpointId;
+
+               return Collections.singletonList(this.currentPosition);
+       }
+
+       @Override
+       public void restoreState(List<Integer> state) throws Exception {
+               this.currentPosition = state.get(0);
+
+               // at least one checkpoint must have happened so far
+               this.lastCheckpointTriggered = 1L;
+               this.lastCheckpointConfirmed = 1L;
+               this.restored = true;
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               synchronized (blocker) {
+                       this.lastCheckpointConfirmed = checkpointId;
+                       blocker.notifyAll();
+               }
+       }
+}

Reply via email to