[FLINK-6988][kafka] Add flink-connector-kafka-0.11 with exactly-once semantic


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f651e9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f651e9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f651e9a

Branch: refs/heads/master
Commit: 2f651e9a69a9929ef154e7bf6fcba624b0e8b9a1
Parents: d20728b
Author: Piotr Nowojski <[email protected]>
Authored: Fri Jun 23 09:14:28 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |   82 ++
 .../kafka/Kafka010ProducerITCase.java           |    9 +
 .../connectors/kafka/FlinkKafkaConsumer011.java |  113 ++
 .../connectors/kafka/FlinkKafkaProducer011.java | 1039 ++++++++++++++++++
 .../kafka/Kafka011AvroTableSource.java          |   58 +
 .../kafka/Kafka011JsonTableSource.java          |   53 +
 .../connectors/kafka/Kafka011TableSource.java   |   55 +
 .../metrics/KafkaMetricMuttableWrapper.java     |   43 +
 .../kafka/FlinkKafkaProducer011Tests.java       |  366 ++++++
 .../kafka/Kafka011AvroTableSourceTest.java      |   54 +
 .../connectors/kafka/Kafka011ITCase.java        |  353 ++++++
 .../kafka/Kafka011JsonTableSourceTest.java      |   49 +
 .../Kafka011ProducerAtLeastOnceITCase.java      |   44 +
 .../Kafka011ProducerExactlyOnceITCase.java      |   51 +
 .../kafka/KafkaTestEnvironmentImpl.java         |  497 +++++++++
 .../connectors/kafka/Kafka08ProducerITCase.java |    9 +
 .../connectors/kafka/Kafka09ProducerITCase.java |   10 +
 .../connectors/kafka/KafkaConsumerTestBase.java |    2 +-
 .../connectors/kafka/KafkaProducerTestBase.java |  100 +-
 .../connectors/kafka/KafkaTestBase.java         |   84 ++
 .../kafka/testutils/IntegerSource.java          |  130 +++
 21 files changed, 3170 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index f95c8c0..aabb1ba 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -72,6 +72,14 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
         <td>0.10.x</td>
         <td>This connector supports <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message";>Kafka
 messages with timestamps</a> both for producing and consuming.</td>
     </tr>
+    <tr>
+        <td>flink-connector-kafka-0.11_2.11</td>
+        <td>1.4.0</td>
+        <td>FlinkKafkaConsumer011<br>
+        FlinkKafkaProducer011</td>
+        <td>0.11.x</td>
+        <td>Since 0.11.x Kafka does not support scala 2.10. This connector 
supports <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging";>Kafka
 transactional messaging</a> to provide exactly once semantic for the 
producer.</td>
+    </tr>
   </tbody>
 </table>
 
@@ -518,6 +526,80 @@ into a Kafka topic.
   for more explanation.
 </div>
 
+#### Kafka 0.11
+
+With Flink's checkpointing enabled, the `FlinkKafkaProducer011` can provide
+exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating
+chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011`:
+
+ * `Semantic.NONE`: Flink will not guarantee anything. Produced records can be 
lost or they can
+ be duplicated.
+ * `Semantic.AT_LEAST_ONCE` (default setting): similar to 
`setFlushOnCheckpoint(true)` in
+ `FlinkKafkaProducer010`. his guarantees that no records will be lost 
(although they can be duplicated).
+ * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once 
semantic.
+
+<div class="alert alert-warning">
+  <strong>Attention:</strong> Depending on your Kafka configuration, even 
after Kafka acknowledges
+  writes you can still experience data losses. In particular keep in mind 
about following properties
+  in Kafka config:
+  <ul>
+    <li><tt>acks</tt></li>
+    <li><tt>log.flush.interval.messages</tt></li>
+    <li><tt>log.flush.interval.ms</tt></li>
+    <li><tt>log.flush.*</tt></li>
+  </ul>
+  Default values for the above options can easily lead to data loss. Please 
refer to the Kafka documentation
+  for more explanation.
+</div>
+
+
+##### Caveats
+
+`Semantic.EXACTLY_ONCE` mode relies on the ability to commit transactions
+that were started before taking a checkpoint, after recovering from the said 
checkpoint. If the time
+between Flink application crash and completed restart is larger then Kafka's 
transaction timeout
+there will be data loss (Kafka will automatically abort transactions that 
exceeded timeout time).
+Having this in mind, please configure your transaction timeout appropriately 
to your expected down
+times.
+
+Kafka brokers by default have `transaction.max.timeout.ms` set to 15 minutes. 
This property will
+not allow to set transaction timeouts for the producers larger then it's value.
+`FlinkKafkaProducer011` by default sets the `transaction.timeout.ms` property 
in producer config to
+1 hour, thus `transaction.max.timeout.ms` should be increased before using the
+`Semantic.EXACTLY_ONCE` mode.
+
+In `read_committed` mode of `KafkaConsumer`, any transactions that were not 
finished
+(neither aborted nor completed) will block all reads from the given Kafka 
topic past any
+un-finished transaction. In other words after following sequence of events:
+
+1. User started `transaction1` and written some records using it
+2. User started `transaction2` and written some further records using it
+3. User committed `transaction2`
+
+Even if records from `transaction2` are already committed, they will not be 
visible to
+the consumers until `transaction1` is committed or aborted. This hastwo 
implications:
+
+ * First of all, during normal working of Flink applications, user can expect 
a delay in visibility
+ of the records produced into Kafka topics, equal to average time between 
completed checkpoints.
+ * Secondly in case of Flink application failure, topics into which this 
application was writting, 
+ will be blocked for the readers until the application restarts or the 
configured transaction 
+ timeout time will pass. This remark only applies for the cases when there are 
multiple
+ agents/applications writing to the same Kafka topic.
+
+**Note**:  `Semantic.EXACTLY_ONCE` mode uses a fixed size pool of 
KafkaProducers
+per each `FlinkKafkaProducer011` instance. One of each of those producers is 
used per one
+checkpoint. If the number of concurrent checkpoints exceeds the pool size, 
`FlinkKafkaProducer011`
+will throw an exception and will fail the whole application. Please configure 
max pool size and max
+number of concurrent checkpoints accordingly.
+
+**Note**: `Semantic.EXACTLY_ONCE` takes all possible measures to not leave any 
lingering transactions
+that would block the consumers from reading from Kafka topic more then it is 
necessary. However in the
+event of failure of Flink application before first checkpoint, after 
restarting such application there
+is no information in the system about previous pool sizes. Thus it is unsafe 
to scale down Flink
+application before first checkpoint completes, by factor larger then 
`FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`.
+
 ## Using Kafka timestamps and Flink event time in Kafka 0.10
 
 Since Apache Kafka 0.10+, Kafka's messages can carry 
[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
 indicating

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index f811893..cf35a59 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -23,4 +23,13 @@ package org.apache.flink.streaming.connectors.kafka;
  */
 @SuppressWarnings("serial")
 public class Kafka010ProducerITCase extends KafkaProducerTestBase {
+       @Override
+       public void testExactlyOnceRegularSink() throws Exception {
+               // Kafka010 does not support exactly once semantic
+       }
+
+       @Override
+       public void testExactlyOnceCustomOperator() throws Exception {
+               // Kafka010 does not support exactly once semantic
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
new file mode 100644
index 0000000..8d165c3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
+ * Apache Kafka 0.11.x. The consumer can run in multiple parallel instances, 
each of which will pull
+ * data from one or more Kafka partitions.
+ *
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
+ * during a failure, and that the computation processes elements "exactly 
once".
+ * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration 
properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
+ */
+public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
+
+       private static final long serialVersionUID = 2324564345203409112L;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.11.x.
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param valueDeserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer011(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
+               this(Collections.singletonList(topic), valueDeserializer, 
props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.11.x
+        *
+        * <p>This constructor allows passing a {@see 
KeyedDeserializationSchema} for reading key/value
+        * pairs, offsets, and topic names from Kafka.
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer011(String topic, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               this(Collections.singletonList(topic), deserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.11.x
+        *
+        * <p>This constructor allows passing multiple topics to the consumer.
+        *
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer011(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
+               this(topics, new 
KeyedDeserializationSchemaWrapper<>(deserializer), props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.11.x
+        *
+        * <p>This constructor allows passing multiple topics and a key/value 
deserialization schema.
+        *
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer011(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               super(topics, deserializer, props);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
new file mode 100644
index 0000000..67e237d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#AT_LEAST_ONCE} semantic. Before using {@link 
Semantic#EXACTLY_ONCE} please refer to Flink's
+ * Kafka connector documentation.
+ */
+public class FlinkKafkaProducer011<IN>
+               extends TwoPhaseCommitSinkFunction<IN, 
FlinkKafkaProducer011.KafkaTransactionState, 
FlinkKafkaProducer011.KafkaTransactionContext> {
+
+       /**
+        *  Semantics that can be chosen.
+        *  <li>{@link #EXACTLY_ONCE}</li>
+        *  <li>{@link #AT_LEAST_ONCE}</li>
+        *  <li>{@link #NONE}</li>
+        */
+       public enum Semantic {
+
+               /**
+                * Semantic.EXACTLY_ONCE the Flink producer will write all 
messages in a Kafka transaction that will be
+                * committed to the Kafka on a checkpoint.
+                *
+                * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool 
of {@link FlinkKafkaProducer}. Between each
+                * checkpoint there is created new Kafka transaction, which is 
being committed on
+                * {@link 
FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete 
notifications are
+                * running late, {@link FlinkKafkaProducer011} can run out of 
{@link FlinkKafkaProducer}s in the pool. In that
+                * case any subsequent {@link 
FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail
+                * and {@link FlinkKafkaProducer011} will keep using the {@link 
FlinkKafkaProducer} from previous checkpoint.
+                * To decrease chances of failing checkpoints there are three 
options:
+                * <li>decrease number of max concurrent checkpoints</li>
+                * <li>make checkpoints more reliable (so that they complete 
faster)</li>
+                * <li>increase delay between checkpoints</li>
+                * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
+                */
+               EXACTLY_ONCE,
+
+               /**
+                * Semantic.AT_LEAST_ONCE the Flink producer will wait for all 
outstanding messages in the Kafka buffers
+                * to be acknowledged by the Kafka producer on a checkpoint.
+                */
+               AT_LEAST_ONCE,
+
+               /**
+                * Semantic.NONE means that nothing will be guaranteed. 
Messages can be lost and/or duplicated in case
+                * of failure.
+                */
+               NONE
+       }
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * This coefficient determines what is the safe scale down factor.
+        *
+        * <p>If the Flink application previously failed before first 
checkpoint completed or we are starting new batch
+        * of {@link FlinkKafkaProducer011} from scratch without clean shutdown 
of the previous one,
+        * {@link FlinkKafkaProducer011} doesn't know what was the set of 
previously used Kafka's transactionalId's. In
+        * that case, it will try to play safe and abort all of the possible 
transactionalIds from the range of:
+        * {@code [0, getNumberOfParallelSubtasks() * kafkaProducersPoolSize * 
SAFE_SCALE_DOWN_FACTOR) }
+        *
+        * <p>The range of available to use transactional ids is:
+        * {@code [0, getNumberOfParallelSubtasks() * kafkaProducersPoolSize) }
+        *
+        * <p>This means that if we decrease {@code 
getNumberOfParallelSubtasks()} by a factor larger then
+        * {@code SAFE_SCALE_DOWN_FACTOR} we can have a left some lingering 
transaction.
+        */
+       public static final int SAFE_SCALE_DOWN_FACTOR = 5;
+
+       /**
+        * Default number of KafkaProducers in the pool. See {@link 
Semantic#EXACTLY_ONCE}.
+        */
+       public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
+
+       /**
+        * Default value for kafka transaction timeout.
+        */
+       public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = 
Time.hours(1);
+
+       /**
+        * Configuration key for disabling the metrics reporting.
+        */
+       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
+
+       /**
+        * Descriptor of the transacionalIds list.
+        */
+       private static final ListStateDescriptor<NextTransactionalIdHint> 
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR =
+               new ListStateDescriptor<>("next-transactional-id-hint", 
TypeInformation.of(NextTransactionalIdHint.class));
+
+       /**
+        * State for nextTransactionalIdHint.
+        */
+       private transient ListState<NextTransactionalIdHint> 
nextTransactionalIdHintState;
+
+       /**
+        * Hint for picking next transactional id.
+        */
+       private NextTransactionalIdHint nextTransactionalIdHint;
+
+       /**
+        * User defined properties for the Producer.
+        */
+       private final Properties producerConfig;
+
+       /**
+        * The name of the default topic this producer is writing data to.
+        */
+       private final String defaultTopicId;
+
+       /**
+        * (Serializable) SerializationSchema for turning objects used with 
Flink into.
+        * byte[] for Kafka.
+        */
+       private final KeyedSerializationSchema<IN> schema;
+
+       /**
+        * User-provided partitioner for assigning an object to a Kafka 
partition for each topic.
+        */
+       private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
+
+       /**
+        * Partitions of each topic.
+        */
+       private final Map<String, int[]> topicPartitionsMap;
+
+       /**
+        * Max number of producers in the pool. If all producers are in use, 
snapshoting state will throw an exception.
+        */
+       private final int kafkaProducersPoolSize;
+
+       /**
+        * Available transactional ids.
+        */
+       private final BlockingDeque<String> availableTransactionalIds = new 
LinkedBlockingDeque<>();
+
+       /**
+        * Pool of KafkaProducers objects.
+        */
+       private transient ProducersPool producersPool = new ProducersPool();
+
+       /**
+        * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+        */
+       private boolean writeTimestampToKafka = false;
+
+       /**
+        * Flag indicating whether to accept failures (and log them), or to 
fail on failures.
+        */
+       private boolean logFailuresOnly;
+
+       /**
+        * Semantic chosen for this instance.
+        */
+       private Semantic semantic;
+
+       // -------------------------------- Runtime fields 
------------------------------------------
+
+       /** The callback than handles error propagation or logging callbacks. */
+       @Nullable
+       private transient Callback callback;
+
+       /** Errors encountered in the async producer are stored here. */
+       @Nullable
+       private transient volatile Exception asyncException;
+
+       /** Lock for accessing the pending records. */
+       private final SerializableObject pendingRecordsLock = new 
SerializableObject();
+
+       /** Number of unacknowledged records. */
+       private final AtomicLong pendingRecords = new AtomicLong();
+
+       /** Cache of metrics to replace already registered metrics instead of 
overwriting existing ones. */
+       private final Map<String, KafkaMetricMuttableWrapper> 
previouslyCreatedMetrics = new HashMap<>();
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        */
+       public FlinkKafkaProducer011(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
+               this(
+                       topicId,
+                       new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+                       getPropertiesFromBrokerList(brokerList),
+                       Optional.of(new FlinkFixedPartitioner<IN>()));
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
+               this(
+                       topicId,
+                       new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+                       producerConfig,
+                       Optional.of(new FlinkFixedPartitioner<IN>()));
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        */
+       public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, 
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+       }
+
+       // ------------------- Key/Value serialization schema constructors 
----------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        */
+       public FlinkKafkaProducer011(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
+               this(
+                       topicId,
+                       serializationSchema,
+                       getPropertiesFromBrokerList(brokerList),
+                       Optional.of(new FlinkFixedPartitioner<IN>()));
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer011(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+               this(
+                       topicId,
+                       serializationSchema,
+                       producerConfig,
+                       Optional.of(new FlinkFixedPartitioner<IN>()));
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer011(
+                       String topicId,
+                       KeyedSerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       Semantic semantic) {
+               this(topicId,
+                       serializationSchema,
+                       producerConfig,
+                       Optional.of(new FlinkFixedPartitioner<IN>()),
+                       semantic,
+                       DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+       }
+
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param defaultTopicId The default topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+        */
+       public FlinkKafkaProducer011(
+                       String defaultTopicId,
+                       KeyedSerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+               this(
+                       defaultTopicId,
+                       serializationSchema,
+                       producerConfig,
+                       customPartitioner,
+                       Semantic.AT_LEAST_ONCE,
+                       DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+       }
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param defaultTopicId The default topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+        * @param semantic Defines semantic that will be used by this producer 
(see {@link Semantic}).
+        * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link Semantic#EXACTLY_ONCE}).
+        */
+       public FlinkKafkaProducer011(
+                       String defaultTopicId,
+                       KeyedSerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
+                       Semantic semantic,
+                       int kafkaProducersPoolSize) {
+               super(TypeInformation.of(new 
TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {}));
+
+               this.defaultTopicId = checkNotNull(defaultTopicId, 
"defaultTopicId is null");
+               this.schema = checkNotNull(serializationSchema, 
"serializationSchema is null");
+               this.producerConfig = checkNotNull(producerConfig, 
"producerConfig is null");
+               this.flinkKafkaPartitioner = checkNotNull(customPartitioner, 
"customPartitioner is null").orElse(null);
+               this.semantic = checkNotNull(semantic, "semantic is null");
+               this.kafkaProducersPoolSize = kafkaProducersPoolSize;
+               checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize 
must be non empty");
+
+               ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
+               ClosureCleaner.ensureSerializable(serializationSchema);
+
+               // set the producer configuration properties for kafka record 
key value serializers.
+               if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+                       
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+               }
+
+               if 
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+                       
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+               }
+
+               // eagerly ensure that bootstrap servers are set.
+               if 
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+                       throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
+               }
+
+               if 
(!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
+                       long timeout = 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
+                       checkState(timeout < Integer.MAX_VALUE && timeout > 0, 
"timeout does not fit into 32 bit integer");
+                       
this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) 
timeout);
+                       LOG.warn("Property [%s] not specified. Setting it to 
%s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
+               }
+
+               this.topicPartitionsMap = new HashMap<>();
+       }
+
+       // ---------------------------------- Properties 
--------------------------
+
+       /**
+        * If set to true, Flink will write the (event time) timestamp attached 
to each record into Kafka.
+        * Timestamps must be positive for Kafka to accept them.
+        *
+        * @param writeTimestampToKafka Flag indicating if Flink's internal 
timestamps are written to Kafka.
+        */
+       public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+               this.writeTimestampToKafka = writeTimestampToKafka;
+       }
+
+       /**
+        * Defines whether the producer should fail on errors, or only log them.
+        * If this is set to true, then exceptions will be only logged, if set 
to false,
+        * exceptions will be eventually thrown and cause the streaming program 
to
+        * fail (and enter recovery).
+        *
+        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+        */
+       public void setLogFailuresOnly(boolean logFailuresOnly) {
+               this.logFailuresOnly = logFailuresOnly;
+       }
+
+       // ----------------------------------- Utilities 
--------------------------
+
+       /**
+        * Initializes the connection to Kafka.
+        */
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
+                       LOG.warn("Using {} semantic, but checkpointing is not 
enabled. Switching to {} semantic.", semantic, Semantic.NONE);
+                       semantic = Semantic.NONE;
+               }
+
+               if (logFailuresOnly) {
+                       callback = new Callback() {
+                               @Override
+                               public void onCompletion(RecordMetadata 
metadata, Exception e) {
+                                       if (e != null) {
+                                               LOG.error("Error while sending 
record to Kafka: " + e.getMessage(), e);
+                                       }
+                                       acknowledgeMessage();
+                               }
+                       };
+               }
+               else {
+                       callback = new Callback() {
+                               @Override
+                               public void onCompletion(RecordMetadata 
metadata, Exception exception) {
+                                       if (exception != null && asyncException 
== null) {
+                                               asyncException = exception;
+                                       }
+                                       acknowledgeMessage();
+                               }
+                       };
+               }
+
+               super.open(configuration);
+       }
+
+       @Override
+       public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws Exception {
+               checkErroneous();
+
+               byte[] serializedKey = schema.serializeKey(next);
+               byte[] serializedValue = schema.serializeValue(next);
+               String targetTopic = schema.getTargetTopic(next);
+               if (targetTopic == null) {
+                       targetTopic = defaultTopicId;
+               }
+
+               Long timestamp = null;
+               if (this.writeTimestampToKafka) {
+                       timestamp = context.timestamp();
+               }
+
+               ProducerRecord<byte[], byte[]> record;
+               int[] partitions = topicPartitionsMap.get(targetTopic);
+               if (null == partitions) {
+                       partitions = getPartitionsByTopic(targetTopic, 
transaction.producer);
+                       topicPartitionsMap.put(targetTopic, partitions);
+               }
+               if (flinkKafkaPartitioner != null) {
+                       record = new ProducerRecord<>(
+                               targetTopic,
+                               flinkKafkaPartitioner.partition(next, 
serializedKey, serializedValue, targetTopic, partitions),
+                               timestamp,
+                               serializedKey,
+                               serializedValue);
+               } else {
+                       record = new ProducerRecord<>(targetTopic, null, 
timestamp, serializedKey, serializedValue);
+               }
+               pendingRecords.incrementAndGet();
+               transaction.producer.send(record, callback);
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (currentTransaction != null) {
+                       // to avoid exceptions on aborting transactions with 
some pending records
+                       flush(currentTransaction);
+               }
+               try {
+                       super.close();
+               }
+               catch (Exception e) {
+                       asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
+               }
+               try {
+                       producersPool.close();
+               }
+               catch (Exception e) {
+                       asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
+               }
+               // make sure we propagate pending errors
+               checkErroneous();
+       }
+
+       // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
+
+       @Override
+       protected KafkaTransactionState beginTransaction() throws Exception {
+               switch (semantic) {
+                       case EXACTLY_ONCE:
+                               FlinkKafkaProducer<byte[], byte[]> producer = 
createOrGetProducerFromPool();
+                               producer.beginTransaction();
+                               return new 
KafkaTransactionState(producer.getTransactionalId(), producer);
+                       case AT_LEAST_ONCE:
+                       case NONE:
+                               // Do not create new producer on each 
beginTransaction() if it is not necessary
+                               if (currentTransaction != null && 
currentTransaction.producer != null) {
+                                       return new 
KafkaTransactionState(currentTransaction.producer);
+                               }
+                               return new 
KafkaTransactionState(initProducer(true));
+                       default:
+                               throw new UnsupportedOperationException("Not 
implemented semantic");
+               }
+       }
+
+       private FlinkKafkaProducer<byte[], byte[]> 
createOrGetProducerFromPool() throws Exception {
+               FlinkKafkaProducer<byte[], byte[]> producer = 
producersPool.poll();
+               if (producer == null) {
+                       String transactionalId = 
availableTransactionalIds.poll();
+                       if (transactionalId == null) {
+                               throw new Exception(
+                                       "Too many ongoing snapshots. Increase 
kafka producers pool size or decrease number of concurrent checktpoins.");
+                       }
+                       producer = initTransactionalProducer(transactionalId, 
true);
+                       producer.initTransactions();
+               }
+               return producer;
+       }
+
+       @Override
+       protected void preCommit(KafkaTransactionState transaction) throws 
Exception {
+               switch (semantic) {
+                       case EXACTLY_ONCE:
+                       case AT_LEAST_ONCE:
+                               flush(transaction);
+                               break;
+                       case NONE:
+                               break;
+                       default:
+                               throw new UnsupportedOperationException("Not 
implemented semantic");
+               }
+               checkErroneous();
+       }
+
+       @Override
+       protected void commit(KafkaTransactionState transaction) {
+               switch (semantic) {
+                       case EXACTLY_ONCE:
+                               transaction.producer.commitTransaction();
+                               producersPool.add(transaction.producer);
+                               break;
+                       case AT_LEAST_ONCE:
+                       case NONE:
+                               break;
+                       default:
+                               throw new UnsupportedOperationException("Not 
implemented semantic");
+               }
+       }
+
+       @Override
+       protected void recoverAndCommit(KafkaTransactionState transaction) {
+               switch (semantic) {
+                       case EXACTLY_ONCE:
+                               KafkaTransactionState kafkaTransaction = 
transaction;
+                               FlinkKafkaProducer<byte[], byte[]> producer =
+                                       
initTransactionalProducer(kafkaTransaction.transactionalId, false);
+                               
producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch);
+                               try {
+                                       producer.commitTransaction();
+                                       producer.close();
+                               }
+                               catch (InvalidTxnStateException ex) {
+                                       // That means we have committed this 
transaction before.
+                                       LOG.warn("Encountered error {} while 
recovering transaction {}. " +
+                                               "Presumably this transaction 
has been already committed before",
+                                               ex,
+                                               transaction);
+                               }
+                               break;
+                       case AT_LEAST_ONCE:
+                       case NONE:
+                               break;
+                       default:
+                               throw new UnsupportedOperationException("Not 
implemented semantic");
+               }
+       }
+
+       @Override
+       protected void abort(KafkaTransactionState transaction) {
+               switch (semantic) {
+                       case EXACTLY_ONCE:
+                               transaction.producer.abortTransaction();
+                               producersPool.add(transaction.producer);
+                               break;
+                       case AT_LEAST_ONCE:
+                       case NONE:
+                               producersPool.add(transaction.producer);
+                               break;
+                       default:
+                               throw new UnsupportedOperationException("Not 
implemented semantic");
+               }
+       }
+
+       @Override
+       protected void recoverAndAbort(KafkaTransactionState transaction) {
+               switch (semantic) {
+                       case EXACTLY_ONCE:
+                               FlinkKafkaProducer<byte[], byte[]> producer =
+                                       
initTransactionalProducer(transaction.transactionalId, false);
+                               
producer.resumeTransaction(transaction.producerId, transaction.epoch);
+                               producer.abortTransaction();
+                               producer.close();
+                               break;
+                       case AT_LEAST_ONCE:
+                       case NONE:
+                               break;
+                       default:
+                               throw new UnsupportedOperationException("Not 
implemented semantic");
+               }
+       }
+
+       private void acknowledgeMessage() {
+               pendingRecords.decrementAndGet();
+       }
+
+       /**
+        * Flush pending records.
+        * @param transaction
+        */
+       private void flush(KafkaTransactionState transaction) throws Exception {
+               if (transaction.producer != null) {
+                       transaction.producer.flush();
+               }
+               long pendingRecordsCount = pendingRecords.get();
+               if (pendingRecordsCount != 0) {
+                       throw new IllegalStateException("Pending record count 
must be zero at this point: " + pendingRecordsCount);
+               }
+
+               // if the flushed requests has errors, we should propagate it 
also and fail the checkpoint
+               checkErroneous();
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               super.snapshotState(context);
+
+               nextTransactionalIdHintState.clear();
+               // To avoid duplication only first subtask keeps track of next 
transactional id hint. Otherwise all of the
+               // subtasks would write exactly same information.
+               if (getRuntimeContext().getIndexOfThisSubtask() == 0 && 
nextTransactionalIdHint != null) {
+                       long nextFreeTransactionalId = 
nextTransactionalIdHint.nextFreeTransactionalId;
+
+                       // If we scaled up, some (unknown) subtask must have 
created new transactional ids from scratch. In that
+                       // case we adjust nextFreeTransactionalId by the range 
of transactionalIds that could be used for this
+                       // scaling up.
+                       if (getRuntimeContext().getNumberOfParallelSubtasks() > 
nextTransactionalIdHint.lastParallelism) {
+                               nextFreeTransactionalId += 
getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
+                       }
+
+                       nextTransactionalIdHintState.add(new 
NextTransactionalIdHint(
+                               
getRuntimeContext().getNumberOfParallelSubtasks(),
+                               nextFreeTransactionalId));
+               }
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               nextTransactionalIdHintState = 
context.getOperatorStateStore().getUnionListState(
+                       NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
+
+               if (semantic != Semantic.EXACTLY_ONCE) {
+                       nextTransactionalIdHint = null;
+               } else {
+                       ArrayList<NextTransactionalIdHint> transactionalIdHints 
= Lists.newArrayList(nextTransactionalIdHintState.get());
+                       if (transactionalIdHints.size() > 1) {
+                               throw new IllegalStateException(
+                                       "There should be at most one next 
transactional id hint written by the first subtask");
+                       } else if (transactionalIdHints.size() == 0) {
+                               nextTransactionalIdHint = new 
NextTransactionalIdHint(0, 0);
+
+                               // this means that this is either:
+                               // (1) the first execution of this application
+                               // (2) previous execution has failed before 
first checkpoint completed
+                               //
+                               // in case of (2) we have to abort all previous 
transactions, but we don't know was the parallelism used
+                               // then, so we must guess using current 
configured pool size, current parallelism and
+                               // SAFE_SCALE_DOWN_FACTOR
+                               long abortTransactionalIdStart = 
getRuntimeContext().getIndexOfThisSubtask();
+                               long abortTransactionalIdEnd = 
abortTransactionalIdStart + 1;
+
+                               abortTransactionalIdStart *= 
kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR;
+                               abortTransactionalIdEnd *= 
kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR;
+                               
abortTransactions(LongStream.range(abortTransactionalIdStart, 
abortTransactionalIdEnd));
+                       } else {
+                               nextTransactionalIdHint = 
transactionalIdHints.get(0);
+                       }
+               }
+
+               super.initializeState(context);
+       }
+
+       @Override
+       protected Optional<KafkaTransactionContext> initializeUserContext() {
+               if (semantic != Semantic.EXACTLY_ONCE) {
+                       return Optional.empty();
+               }
+
+               Set<String> transactionalIds = generateNewTransactionalIds();
+               resetAvailableTransactionalIdsPool(transactionalIds);
+               return Optional.of(new 
KafkaTransactionContext(transactionalIds));
+       }
+
+       private Set<String> generateNewTransactionalIds() {
+               Preconditions.checkState(nextTransactionalIdHint != null,
+                       "nextTransactionalIdHint must be present for 
EXACTLY_ONCE");
+
+               // range of available transactional ids is:
+               // [nextFreeTransactionalId, nextFreeTransactionalId + 
parallelism * kafkaProducersPoolSize)
+               // loop below picks in a deterministic way a subrange of those 
available transactional ids based on index of
+               // this subtask
+               int subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+               Set<String> transactionalIds = new HashSet<>();
+               for (int i = 0; i < kafkaProducersPoolSize; i++) {
+                       long transactionalId = 
nextTransactionalIdHint.nextFreeTransactionalId + subtaskId * 
kafkaProducersPoolSize + i;
+                       
transactionalIds.add(generateTransactionalId(transactionalId));
+               }
+               LOG.info("Generated new transactionalIds {}", transactionalIds);
+               return transactionalIds;
+       }
+
+       @Override
+       protected void finishRecoveringContext() {
+               cleanUpUserContext();
+               
resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
+               LOG.info("Recovered transactionalIds {}", 
getUserContext().get().transactionalIds);
+       }
+
+       /**
+        * After initialization make sure that all previous transactions from 
the current user context have been completed.
+        */
+       private void cleanUpUserContext() {
+               if (!getUserContext().isPresent()) {
+                       return;
+               }
+               
abortTransactions(getUserContext().get().transactionalIds.stream());
+       }
+
+       private void resetAvailableTransactionalIdsPool(Collection<String> 
transactionalIds) {
+               availableTransactionalIds.clear();
+               for (String transactionalId : transactionalIds) {
+                       availableTransactionalIds.add(transactionalId);
+               }
+       }
+
+       // ----------------------------------- Utilities 
--------------------------
+
+       private void abortTransactions(LongStream transactionalIds) {
+               
abortTransactions(transactionalIds.mapToObj(this::generateTransactionalId));
+       }
+
+       private void abortTransactions(Stream<String> transactionalIds) {
+               transactionalIds.forEach(transactionalId -> {
+                       try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
+                                       
initTransactionalProducer(transactionalId, false)) {
+                               kafkaProducer.initTransactions();
+                       }
+               });
+       }
+
+       private String generateTransactionalId(long transactionalId) {
+               String transactionalIdFormat = 
getRuntimeContext().getTaskName() + "-%d";
+               return String.format(transactionalIdFormat, transactionalId);
+       }
+
+       int getTransactionCoordinatorId() {
+               if (currentTransaction == null || currentTransaction.producer 
== null) {
+                       throw new IllegalArgumentException();
+               }
+               return 
currentTransaction.producer.getTransactionCoordinatorId();
+       }
+
+       private FlinkKafkaProducer<byte[], byte[]> 
initTransactionalProducer(String transactionalId, boolean registerMetrics) {
+               producerConfig.put("transactional.id", transactionalId);
+               return initProducer(registerMetrics);
+       }
+
+       private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean 
registerMetrics) {
+               FlinkKafkaProducer<byte[], byte[]> producer = new 
FlinkKafkaProducer<>(this.producerConfig);
+
+               RuntimeContext ctx = getRuntimeContext();
+
+               if (flinkKafkaPartitioner != null) {
+                       if (flinkKafkaPartitioner instanceof 
FlinkKafkaDelegatePartitioner) {
+                               ((FlinkKafkaDelegatePartitioner) 
flinkKafkaPartitioner).setPartitions(
+                                       
getPartitionsByTopic(this.defaultTopicId, producer));
+                       }
+                       flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
+               }
+
+               LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
default topic {}",
+                       ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+
+               // register Kafka metrics to Flink accumulators
+               if (registerMetrics && 
!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
+                       Map<MetricName, ? extends Metric> metrics = 
producer.metrics();
+
+                       if (metrics == null) {
+                               // MapR's Kafka implementation returns null 
here.
+                               LOG.info("Producer implementation does not 
support metrics");
+                       } else {
+                               final MetricGroup kafkaMetricGroup = 
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
+                               for (Map.Entry<MetricName, ? extends Metric> 
entry: metrics.entrySet()) {
+                                       String name = entry.getKey().name();
+                                       Metric metric = entry.getValue();
+
+                                       KafkaMetricMuttableWrapper wrapper = 
previouslyCreatedMetrics.get(name);
+                                       if (wrapper != null) {
+                                               wrapper.setKafkaMetric(metric);
+                                       } else {
+                                               // TODO: somehow merge metrics 
from all active producers?
+                                               wrapper = new 
KafkaMetricMuttableWrapper(metric);
+                                               
previouslyCreatedMetrics.put(name, wrapper);
+                                               kafkaMetricGroup.gauge(name, 
wrapper);
+                                       }
+                               }
+                       }
+               }
+               return producer;
+       }
+
+       private void checkErroneous() throws Exception {
+               Exception e = asyncException;
+               if (e != null) {
+                       // prevent double throwing
+                       asyncException = null;
+                       throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
+               }
+       }
+
+       private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
+               in.defaultReadObject();
+               producersPool = new ProducersPool();
+       }
+
+       private static Properties getPropertiesFromBrokerList(String 
brokerList) {
+               String[] elements = brokerList.split(",");
+
+               // validate the broker addresses
+               for (String broker: elements) {
+                       NetUtils.getCorrectHostnamePort(broker);
+               }
+
+               Properties props = new Properties();
+               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
+               return props;
+       }
+
+       private static int[] getPartitionsByTopic(String topic, 
Producer<byte[], byte[]> producer) {
+               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
+               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(topic));
+
+               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
+               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
+                       @Override
+                       public int compare(PartitionInfo o1, PartitionInfo o2) {
+                               return Integer.compare(o1.partition(), 
o2.partition());
+                       }
+               });
+
+               int[] partitions = new int[partitionsList.size()];
+               for (int i = 0; i < partitions.length; i++) {
+                       partitions[i] = partitionsList.get(i).partition();
+               }
+
+               return partitions;
+       }
+
+       /**
+        * State for handling transactions.
+        */
+       public static class KafkaTransactionState {
+
+               private final transient FlinkKafkaProducer<byte[], byte[]> 
producer;
+
+               @Nullable
+               public final String transactionalId;
+
+               public final long producerId;
+
+               public final short epoch;
+
+               public KafkaTransactionState(String transactionalId, 
FlinkKafkaProducer<byte[], byte[]> producer) {
+                       this.producer = producer;
+                       this.transactionalId = transactionalId;
+                       this.producerId = producer.getProducerId();
+                       this.epoch = producer.getEpoch();
+               }
+
+               public KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> 
producer) {
+                       this.producer = producer;
+                       this.transactionalId = null;
+                       this.producerId = -1;
+                       this.epoch = -1;
+               }
+
+               @Override
+               public String toString() {
+                       return String.format("%s [transactionalId=%s]", 
this.getClass().getSimpleName(), transactionalId);
+               }
+       }
+
+       /**
+        * Context associated to this instance of the {@link 
FlinkKafkaProducer011}. User for keeping track of the
+        * transactionalIds.
+        */
+       public static class KafkaTransactionContext {
+               public final Set<String> transactionalIds;
+
+               public KafkaTransactionContext(Set<String> transactionalIds) {
+                       this.transactionalIds = transactionalIds;
+               }
+       }
+
+       static class ProducersPool implements Closeable {
+               private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], 
byte[]>> pool = new LinkedBlockingDeque<>();
+
+               public void add(FlinkKafkaProducer<byte[], byte[]> producer) {
+                       pool.add(producer);
+               }
+
+               public FlinkKafkaProducer<byte[], byte[]> poll() {
+                       return pool.poll();
+               }
+
+               @Override
+               public void close() {
+                       while (!pool.isEmpty()) {
+                               pool.poll().close();
+                       }
+               }
+       }
+
+       /**
+        * Keep information required to deduce next safe to use transactional 
id.
+        */
+       public static class NextTransactionalIdHint {
+               public int lastParallelism = 0;
+               public long nextFreeTransactionalId = 0;
+
+               public NextTransactionalIdHint() {
+                       this(0, 0);
+               }
+
+               public NextTransactionalIdHint(int parallelism, long 
nextFreeTransactionalId) {
+                       this.lastParallelism = parallelism;
+                       this.nextFreeTransactionalId = nextFreeTransactionalId;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
new file mode 100644
index 0000000..edc37cb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.11.
+ */
+public class Kafka011AvroTableSource extends KafkaAvroTableSource {
+
+       /**
+        * Creates a Kafka 0.11 Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param record     Avro specific record.
+        */
+       public Kafka011AvroTableSource(
+               String topic,
+               Properties properties,
+               Class<? extends SpecificRecordBase> record) {
+
+               super(
+                       topic,
+                       properties,
+                       record);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer011<>(topic, 
deserializationSchema, properties);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
new file mode 100644
index 0000000..471c2d2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.11.
+ */
+public class Kafka011JsonTableSource extends KafkaJsonTableSource {
+
+       /**
+        * Creates a Kafka 0.11 JSON {@link StreamTableSource}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param typeInfo   Type information describing the result type. The 
field names are used
+        *                   to parse the JSON file and so are the types.
+        */
+       public Kafka011JsonTableSource(
+                       String topic,
+                       Properties properties,
+                       TypeInformation<Row> typeInfo) {
+
+               super(topic, properties, typeInfo);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer011<>(topic, 
deserializationSchema, properties);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
new file mode 100644
index 0000000..5eaea97
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.11.
+ */
+public class Kafka011TableSource extends Kafka09TableSource {
+
+       /**
+        * Creates a Kafka 0.11 {@link StreamTableSource}.
+        *
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @param typeInfo              Type information describing the result 
type. The field names are used
+        *                              to parse the JSON file and so are the 
types.
+        */
+       public Kafka011TableSource(
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       TypeInformation<Row> typeInfo) {
+
+               super(topic, properties, deserializationSchema, typeInfo);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer011<>(topic, 
deserializationSchema, properties);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java
new file mode 100644
index 0000000..a22ff5c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+import org.apache.kafka.common.Metric;
+
+/**
+ * Gauge for getting the current value of a Kafka metric.
+ */
+public class KafkaMetricMuttableWrapper implements Gauge<Double> {
+       private org.apache.kafka.common.Metric kafkaMetric;
+
+       public KafkaMetricMuttableWrapper(org.apache.kafka.common.Metric 
metric) {
+               this.kafkaMetric = metric;
+       }
+
+       @Override
+       public Double getValue() {
+               return kafkaMetric.value();
+       }
+
+       public void setKafkaMetric(Metric kafkaMetric) {
+               this.kafkaMetric = kafkaMetric;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
new file mode 100644
index 0000000..51410da
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+       protected String transactionalId;
+       protected Properties extraProperties;
+
+       protected TypeInformationSerializationSchema<Integer> 
integerSerializationSchema =
+                       new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
+       protected KeyedSerializationSchema<Integer> 
integerKeyedSerializationSchema =
+                       new 
KeyedSerializationSchemaWrapper(integerSerializationSchema);
+
+       @Before
+       public void before() {
+               transactionalId = UUID.randomUUID().toString();
+               extraProperties = new Properties();
+               extraProperties.putAll(standardProps);
+               extraProperties.put("transactional.id", transactionalId);
+               extraProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+               extraProperties.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+               extraProperties.put("isolation.level", "read_committed");
+       }
+
+       @Test(timeout = 30000L)
+       public void testHappyPath() throws IOException {
+               String topicName = "flink-kafka-producer-happy-path";
+               try (Producer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
+                       kafkaProducer.commitTransaction();
+               }
+               assertRecord(topicName, "42", "42");
+               deleteTestTopic(topicName);
+       }
+
+       @Test(timeout = 30000L)
+       public void testResumeTransaction() throws IOException {
+               String topicName = "flink-kafka-producer-resume-transaction";
+               try (FlinkKafkaProducer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       kafkaProducer.send(new ProducerRecord<>(topicName, 
"42", "42"));
+                       kafkaProducer.flush();
+                       long producerId = kafkaProducer.getProducerId();
+                       short epoch = kafkaProducer.getEpoch();
+
+                       try (FlinkKafkaProducer<String, String> resumeProducer 
= new FlinkKafkaProducer<>(extraProperties)) {
+                               resumeProducer.resumeTransaction(producerId, 
epoch);
+                               resumeProducer.commitTransaction();
+                       }
+
+                       assertRecord(topicName, "42", "42");
+
+                       // this shouldn't throw - in case of network split, old 
producer might attempt to commit it's transaction
+                       kafkaProducer.commitTransaction();
+
+                       // this shouldn't fail also, for same reason as above
+                       try (FlinkKafkaProducer<String, String> resumeProducer 
= new FlinkKafkaProducer<>(extraProperties)) {
+                               resumeProducer.resumeTransaction(producerId, 
epoch);
+                               resumeProducer.commitTransaction();
+                       }
+               }
+               deleteTestTopic(topicName);
+       }
+
+       @Test(timeout = 120_000L)
+       public void testFlinkKafkaProducer011FailBeforeNotify() throws 
Exception {
+               String topic = "flink-kafka-producer-fail-before-notify";
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+
+               testHarness.setup();
+               testHarness.open();
+               testHarness.initializeState(null);
+               testHarness.processElement(42, 0);
+               testHarness.snapshot(0, 1);
+               testHarness.processElement(43, 2);
+               OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
+
+               int leaderId = kafkaServer.getLeaderToShutDown(topic);
+               failBroker(leaderId);
+
+               try {
+                       testHarness.processElement(44, 4);
+                       testHarness.snapshot(2, 5);
+                       assertFalse(true);
+               }
+               catch (Exception ex) {
+                       // expected
+               }
+               try {
+                       testHarness.close();
+               }
+               catch (Exception ex) {
+               }
+
+               kafkaServer.restartBroker(leaderId);
+
+               testHarness = createTestHarness(topic);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.close();
+
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43), 30_000L);
+
+               deleteTestTopic(topic);
+       }
+
+       @Test(timeout = 120_000L)
+       public void 
testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws 
Exception {
+               String topic = 
"flink-kafka-producer-fail-transaction-coordinator-before-notify";
+
+               Properties properties = createProperties();
+
+               FlinkKafkaProducer011<Integer> kafkaProducer = new 
FlinkKafkaProducer011<>(
+                       topic,
+                       integerKeyedSerializationSchema,
+                       properties,
+                       FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 
= new OneInputStreamOperatorTestHarness<>(
+                       new StreamSink<>(kafkaProducer),
+                       IntSerializer.INSTANCE);
+
+               testHarness1.setup();
+               testHarness1.open();
+               testHarness1.initializeState(null);
+               testHarness1.processElement(42, 0);
+               testHarness1.snapshot(0, 1);
+               testHarness1.processElement(43, 2);
+               int transactionCoordinatorId = 
kafkaProducer.getTransactionCoordinatorId();
+               OperatorStateHandles snapshot = testHarness1.snapshot(1, 3);
+
+               failBroker(transactionCoordinatorId);
+
+               try {
+                       testHarness1.processElement(44, 4);
+                       testHarness1.notifyOfCompletedCheckpoint(1);
+                       testHarness1.close();
+               }
+               catch (Exception ex) {
+                       // Expected... some random exception could be thrown by 
any of the above operations.
+               }
+               finally {
+                       kafkaServer.restartBroker(transactionCoordinatorId);
+               }
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness2 = createTestHarness(topic)) {
+                       testHarness2.setup();
+                       testHarness2.initializeState(snapshot);
+                       testHarness2.open();
+               }
+
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43), 30_000L);
+
+               deleteTestTopic(topic);
+       }
+
+       /**
+        * This tests checks whether FlinkKafkaProducer011 correctly aborts 
lingering transactions after a failure.
+        * If such transactions were left alone lingering it consumers would be 
unable to read committed records
+        * that were created after this lingering transaction.
+        */
+       @Test(timeout = 120_000L)
+       public void testFailBeforeNotifyAndResumeWorkAfterwards() throws 
Exception {
+               String topic = "flink-kafka-producer-fail-before-notify";
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+
+               testHarness.setup();
+               testHarness.open();
+               testHarness.processElement(42, 0);
+               testHarness.snapshot(0, 1);
+               testHarness.processElement(43, 2);
+               OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3);
+
+               testHarness.processElement(44, 4);
+               testHarness.snapshot(2, 5);
+               testHarness.processElement(45, 6);
+
+               // do not close previous testHarness to make sure that closing 
do not clean up something (in case of failure
+               // there might not be any close)
+               testHarness = createTestHarness(topic);
+               testHarness.setup();
+               // restore from snapshot1, transactions with records 44 and 45 
should be aborted
+               testHarness.initializeState(snapshot1);
+               testHarness.open();
+
+               // write and commit more records, after potentially lingering 
transactions
+               testHarness.processElement(46, 7);
+               testHarness.snapshot(4, 8);
+               testHarness.processElement(47, 9);
+               testHarness.notifyOfCompletedCheckpoint(4);
+
+               //now we should have:
+               // - records 42 and 43 in committed transactions
+               // - aborted transactions with records 44 and 45
+               // - committed transaction with record 46
+               // - pending transaction with record 47
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 46), 30_000L);
+
+               testHarness.close();
+               deleteTestTopic(topic);
+       }
+
+       private OneInputStreamOperatorTestHarness<Integer, Object> 
createTestHarness(String topic) throws Exception {
+               Properties properties = createProperties();
+
+               FlinkKafkaProducer011<Integer> kafkaProducer = new 
FlinkKafkaProducer011<>(
+                       topic,
+                       integerKeyedSerializationSchema,
+                       properties,
+                       FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+               return new OneInputStreamOperatorTestHarness<>(
+                       new StreamSink<>(kafkaProducer),
+                       IntSerializer.INSTANCE);
+       }
+
+       private Properties createProperties() {
+               Properties properties = new Properties();
+               properties.putAll(standardProps);
+               properties.putAll(secureProps);
+               properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, 
"true");
+               return properties;
+       }
+
+       @Test
+       public void testRecoverCommittedTransaction() throws Exception {
+               String topic = 
"flink-kafka-producer-recover-committed-transaction";
+
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+
+               testHarness.setup();
+               testHarness.open(); // producerA - start transaction (txn) 0
+               testHarness.processElement(42, 0); // producerA - write 42 in 
txn 0
+               OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); 
// producerA - pre commit txn 0, producerB - start txn 1
+               testHarness.processElement(43, 2); // producerB - write 43 in 
txn 1
+               testHarness.notifyOfCompletedCheckpoint(0); // producerA - 
commit txn 0 and return to the pool
+               testHarness.snapshot(1, 3); // producerB - pre txn 1,  
producerA - start txn 2
+               testHarness.processElement(44, 4); // producerA - write 44 in 
txn 2
+               testHarness.close(); // producerA - abort txn 2
+
+               testHarness = createTestHarness(topic);
+               testHarness.initializeState(checkpoint0); // recover state 0 - 
producerA recover and commit txn 0
+               testHarness.close();
+
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42), 30_000L);
+
+               deleteTestTopic(topic);
+       }
+
+       @Test
+       public void testRunOutOfProducersInThePool() throws Exception {
+               String topic = "flink-kafka-run-out-of-producers";
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       for (int i = 0; i < 
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
+                               testHarness.processElement(i, i * 2);
+                               testHarness.snapshot(i, i * 2 + 1);
+                       }
+               }
+               catch (Exception ex) {
+                       if (!ex.getCause().getMessage().startsWith("Too many 
ongoing")) {
+                               throw ex;
+                       }
+               }
+               deleteTestTopic(topic);
+       }
+
+       // shut down a Kafka broker
+       private void failBroker(int brokerId) {
+               KafkaServer toShutDown = null;
+               for (KafkaServer server : kafkaServer.getBrokers()) {
+
+                       if (kafkaServer.getBrokerId(server) == brokerId) {
+                               toShutDown = server;
+                               break;
+                       }
+               }
+
+               if (toShutDown == null) {
+                       StringBuilder listOfBrokers = new StringBuilder();
+                       for (KafkaServer server : kafkaServer.getBrokers()) {
+                               
listOfBrokers.append(kafkaServer.getBrokerId(server));
+                               listOfBrokers.append(" ; ");
+                       }
+
+                       throw new IllegalArgumentException("Cannot find broker 
to shut down: " + brokerId
+                               + " ; available brokers: " + 
listOfBrokers.toString());
+               } else {
+                       toShutDown.shutdown();
+                       toShutDown.awaitShutdown();
+               }
+       }
+
+       private void assertRecord(String topicName, String expectedKey, String 
expectedValue) {
+               try (KafkaConsumer<String, String> kafkaConsumer = new 
KafkaConsumer<>(extraProperties)) {
+                       
kafkaConsumer.subscribe(Collections.singletonList(topicName));
+                       ConsumerRecords<String, String> records = 
kafkaConsumer.poll(10000);
+
+                       ConsumerRecord<String, String> record = 
Iterables.getOnlyElement(records);
+                       assertEquals(expectedKey, record.key());
+                       assertEquals(expectedValue, record.value());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
new file mode 100644
index 0000000..e60bf17
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka011AvroTableSource}.
+ */
+public class Kafka011AvroTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+
+               return new Kafka011AvroTableSource(
+                       topic,
+                       properties,
+                       AvroSpecificRecord.class);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) AvroRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer011.class;
+       }
+}
+

Reply via email to