[FLINK-3375] [kafka connector] Add per-Kafka-partition watermark generation to 
the FlinkKafkaConsumer

This closes #1839


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac1549e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac1549e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac1549e

Branch: refs/heads/master
Commit: 0ac1549ec58c1737a79e5770a171a8b14bed56dc
Parents: 885d543
Author: kl0u <kklou...@gmail.com>
Authored: Tue Mar 8 17:35:14 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 01:10:54 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  56 +--
 .../connectors/kafka/internals/Fetcher.java     |  12 +-
 .../kafka/internals/LegacyFetcher.java          |  51 ++-
 .../kafka/internals/ZookeeperOffsetHandler.java |   4 +-
 .../connectors/kafka/Kafka08ITCase.java         |  16 +-
 .../connectors/kafka/KafkaConsumerTest.java     |  12 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  13 +-
 .../connectors/kafka/Kafka09ITCase.java         |  11 +-
 .../kafka/FlinkKafkaConsumerBase.java           | 357 ++++++++++++++++++-
 .../kafka/internals/KafkaPartitionState.java    |  65 ++++
 .../connectors/kafka/KafkaConsumerTestBase.java | 234 +++++++++++-
 .../AssignerWithPeriodicWatermarks.java         |   2 +-
 12 files changed, 744 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 865fe36..4748781 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
@@ -108,11 +109,6 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer08.class);
 
-       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
-        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
-       public static final long OFFSET_NOT_SET = -915623761776L;
-
-
        /** Configuration key for the number of retries for getting the 
partition info */
        public static final String GET_PARTITIONS_RETRIES_KEY = 
"flink.get-partitions.retry";
 
@@ -252,17 +248,19 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                        this.fetcher = null; // fetcher remains null
                        return;
                }
-               
 
                // offset handling
                offsetHandler = new ZookeeperOffsetHandler(props);
 
                committedOffsets = new HashMap<>();
 
-               Map<KafkaTopicPartition, Long> subscribedPartitionsWithOffsets 
= new HashMap<>(subscribedPartitions.size());
-               // initially load the map with "offset not set"
+               // initially load the map with "offset not set", last max read 
timestamp set to Long.MIN_VALUE
+               // and mark the partition as in-active, until we receive the 
first element
+               Map<KafkaTopicPartition, KafkaPartitionState> 
subscribedPartitionsWithOffsets =
+                       new HashMap<>(subscribedPartitions.size());
                for(KafkaTopicPartition ktp: subscribedPartitions) {
-                       subscribedPartitionsWithOffsets.put(ktp, 
FlinkKafkaConsumer08.OFFSET_NOT_SET);
+                       subscribedPartitionsWithOffsets.put(ktp,
+                               new KafkaPartitionState(ktp.getPartition(), 
FlinkKafkaConsumerBase.OFFSET_NOT_SET));
                }
 
                // seek to last known pos, from restore request
@@ -272,16 +270,20 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                                                thisConsumerIndex, 
KafkaTopicPartition.toString(restoreToOffset));
                        }
                        // initialize offsets with restored state
-                       this.offsetsState = restoreToOffset;
-                       subscribedPartitionsWithOffsets.putAll(restoreToOffset);
+                       this.partitionState = restoreInfoFromCheckpoint();
+                       subscribedPartitionsWithOffsets.putAll(partitionState);
                        restoreToOffset = null;
                }
                else {
-                       // start with empty offsets
-                       offsetsState = new HashMap<>();
+                       // start with empty partition state
+                       partitionState = new HashMap<>();
 
                        // no restore request: overwrite offsets.
-                       
subscribedPartitionsWithOffsets.putAll(offsetHandler.getOffsets(subscribedPartitions));
+                       for(Map.Entry<KafkaTopicPartition, Long> offsetInfo: 
offsetHandler.getOffsets(subscribedPartitions).entrySet()) {
+                               KafkaTopicPartition key = offsetInfo.getKey();
+                               subscribedPartitionsWithOffsets.put(key,
+                                       new 
KafkaPartitionState(key.getPartition(), offsetInfo.getValue()));
+                       }
                }
                if(subscribedPartitionsWithOffsets.size() != 
subscribedPartitions.size()) {
                        throw new IllegalStateException("The subscribed 
partitions map has more entries than the subscribed partitions " +
@@ -289,22 +291,22 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                }
 
                // create fetcher
-               fetcher = new LegacyFetcher(subscribedPartitionsWithOffsets, 
props,
+               fetcher = new LegacyFetcher<T>(this, 
subscribedPartitionsWithOffsets, props,
                                getRuntimeContext().getTaskName(), 
getRuntimeContext().getUserCodeClassLoader());
        }
 
        @Override
        public void run(SourceContext<T> sourceContext) throws Exception {
                if (fetcher != null) {
-                       // For non-checkpointed sources, a thread which 
periodically commits the current offset into ZK.
-                       PeriodicOffsetCommitter<T> offsetCommitter = null;
-
-                       // check whether we need to start the periodic 
checkpoint committer
                        StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
+
+                       // if we have a non-checkpointed source, start a thread 
which periodically commits
+                       // the current offset into ZK.
+
+                       PeriodicOffsetCommitter<T> offsetCommitter = null;
                        if (!streamingRuntimeContext.isCheckpointingEnabled()) {
                                // we use Kafka's own configuration parameter 
key for this.
-                               // Note that the default configuration value in 
Kafka is 60 * 1000, so we use the
-                               // same here.
+                               // Note that the default configuration value in 
Kafka is 60 * 1000, so we use the same here.
                                long commitInterval = getLongFromConfig(props, 
"auto.commit.interval.ms", 60000);
                                offsetCommitter = new 
PeriodicOffsetCommitter<>(commitInterval, this);
                                offsetCommitter.setDaemon(true);
@@ -313,7 +315,7 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                        }
 
                        try {
-                               fetcher.run(sourceContext, deserializer, 
offsetsState);
+                               fetcher.run(sourceContext, deserializer, 
partitionState);
                        } finally {
                                if (offsetCommitter != null) {
                                        offsetCommitter.close();
@@ -439,7 +441,7 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                private final FlinkKafkaConsumer08<T> consumer;
                private volatile boolean running = true;
 
-               public PeriodicOffsetCommitter(long commitInterval, 
FlinkKafkaConsumer08<T> consumer) {
+               PeriodicOffsetCommitter(long commitInterval, 
FlinkKafkaConsumer08<T> consumer) {
                        this.commitInterval = commitInterval;
                        this.consumer = consumer;
                }
@@ -453,9 +455,12 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                                                Thread.sleep(commitInterval);
                                                //  ------------  commit 
current offsets ----------------
 
-                                               // create copy of current 
offsets
+                                               // create copy a deep copy of 
the current offsets
                                                @SuppressWarnings("unchecked")
-                                               HashMap<KafkaTopicPartition, 
Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) 
consumer.offsetsState.clone();
+                                               HashMap<KafkaTopicPartition, 
Long> currentOffsets = new HashMap<>(consumer.partitionState.size());
+                                               for 
(Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: 
consumer.partitionState.entrySet()) {
+                                                       
currentOffsets.put(entry.getKey(), entry.getValue().getOffset());
+                                               }
                                                
consumer.commitOffsets(currentOffsets);
                                        } catch (InterruptedException e) {
                                                if (running) {
@@ -474,7 +479,6 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                        this.running = false;
                        this.interrupt();
                }
-
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
index 2dacbce..f86687e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -39,8 +39,10 @@ public interface Fetcher {
        /**
         * Starts fetch data from Kafka and emitting it into the stream.
         * 
-        * <p>To provide exactly once guarantees, the fetcher needs emit a 
record and update the update
-        * of the last consumed offset in one atomic operation:</p>
+        * <p>To provide exactly once guarantees, the fetcher needs emit a 
record and update the last
+        * consumed offset in one atomic operation. This is done in the
+        * {@link 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#processElement(SourceFunction.SourceContext,
 KafkaTopicPartition, Object, long)}
+        * which is called from within the {@link 
SourceFunction.SourceContext#getCheckpointLock()}, as shown below:</p>
         * <pre>{@code
         * 
         * while (running) {
@@ -48,8 +50,7 @@ public interface Fetcher {
         *     long offset = ...
         *     int partition = ...
         *     synchronized (sourceContext.getCheckpointLock()) {
-        *         sourceContext.collect(next);
-        *         lastOffsets[partition] = offset;
+        *         processElement(sourceContext, partition, next, offset)
         *     }
         * }
         * }</pre>
@@ -60,8 +61,7 @@ public interface Fetcher {
         * @param lastOffsets The map into which to store the offsets for which 
elements are emitted (operator state)
         */
        <T> void run(SourceFunction.SourceContext<T> sourceContext, 
KeyedDeserializationSchema<T> valueDeserializer,
-                               HashMap<KafkaTopicPartition, Long> lastOffsets) 
throws Exception;
-
+                               HashMap<KafkaTopicPartition, 
KafkaPartitionState> lastOffsets) throws Exception;
 
        /**
         * Exit run loop with given error and release all resources.

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index fa3b6a8..c4dd55c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -30,6 +30,7 @@ import kafka.message.MessageAndOffset;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.StringUtils;
@@ -61,13 +62,12 @@ import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getInt
  * 
  * <p>This code is in parts based on the tutorial code for the low-level Kafka 
consumer.</p>
  */
-public class LegacyFetcher implements Fetcher {
+public class LegacyFetcher<T> implements Fetcher {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(LegacyFetcher.class);
 
        private static final FetchPartition MARKER = new FetchPartition("n/a", 
-1, -1);
-       
-       
+
        /** The properties that configure the Kafka connection */
        private final Properties config;
        
@@ -79,7 +79,6 @@ public class LegacyFetcher implements Fetcher {
        
        /** The classloader for dynamically loaded classes */
        private final ClassLoader userCodeClassloader;
-
        
        /** Reference the the thread that executed the run() method. */
        private volatile Thread mainThread;
@@ -94,6 +93,9 @@ public class LegacyFetcher implements Fetcher {
         */
        private final ClosableBlockingQueue<FetchPartition> 
unassignedPartitions = new ClosableBlockingQueue<>();
 
+       /** The {@link FlinkKafkaConsumer08} to whom this Fetcher belongs. */
+       private final FlinkKafkaConsumer08<T> flinkKafkaConsumer;
+
        /**
         * Create a LegacyFetcher instance.
         *
@@ -103,20 +105,23 @@ public class LegacyFetcher implements Fetcher {
         * @param userCodeClassloader classloader for loading user code
         */
        public LegacyFetcher(
-                               Map<KafkaTopicPartition, Long> 
initialPartitionsToRead, Properties props,
+                               FlinkKafkaConsumer08<T> owner,
+                               Map<KafkaTopicPartition, KafkaPartitionState> 
initialPartitionsToRead,
+                               Properties props,
                                String taskName, ClassLoader 
userCodeClassloader) {
-               
+
+               this.flinkKafkaConsumer = requireNonNull(owner);
                this.config = requireNonNull(props, "The config properties 
cannot be null");
                this.userCodeClassloader = requireNonNull(userCodeClassloader);
                if (initialPartitionsToRead.size() == 0) {
                        throw new IllegalArgumentException("List of initial 
partitions is empty");
                }
 
-               for (Map.Entry<KafkaTopicPartition, Long> partitionToRead: 
initialPartitionsToRead.entrySet()) {
+               for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> 
partitionToRead: initialPartitionsToRead.entrySet()) {
                        KafkaTopicPartition ktp = partitionToRead.getKey();
                        // we increment the offset by one so that we fetch the 
next message in the partition.
-                       long offset = partitionToRead.getValue();
-                       if (offset >= 0 && offset != 
FlinkKafkaConsumer08.OFFSET_NOT_SET) {
+                       long offset = partitionToRead.getValue().getOffset();
+                       if (offset >= 0 && offset != 
FlinkKafkaConsumerBase.OFFSET_NOT_SET) {
                                offset += 1L;
                        }
                        unassignedPartitions.add(new 
FetchPartition(ktp.getTopic(), ktp.getPartition(), offset));
@@ -141,7 +146,7 @@ public class LegacyFetcher implements Fetcher {
        @Override
        public <T> void run(SourceFunction.SourceContext<T> sourceContext,
                                                KeyedDeserializationSchema<T> 
deserializer,
-                                               HashMap<KafkaTopicPartition, 
Long> lastOffsets) throws Exception {
+                                               HashMap<KafkaTopicPartition, 
KafkaPartitionState> partitionState) throws Exception {
 
                // NOTE: This method needs to always release all resources it 
acquires
 
@@ -173,7 +178,7 @@ public class LegacyFetcher implements Fetcher {
                                                        
                                                        if (brokerThread == 
null || !brokerThread.getNewPartitionsQueue().isOpen()) {
                                                                // start new 
thread
-                                                               brokerThread = 
createAndStartSimpleConsumerThread(sourceContext, deserializer, lastOffsets, 
partitions, leader);
+                                                               brokerThread = 
createAndStartSimpleConsumerThread(sourceContext, deserializer, partitions, 
leader);
                                                                
brokerToThread.put(leader, brokerThread);
                                                        } else {
                                                                // put elements 
into queue of thread
@@ -185,7 +190,7 @@ public class LegacyFetcher implements Fetcher {
                                                                                
// create a new thread for connecting to this broker
                                                                                
List<FetchPartition> seedPartitions = new ArrayList<>();
                                                                                
seedPartitions.add(fp);
-                                                                               
brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, 
lastOffsets, seedPartitions, leader);
+                                                                               
brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, 
seedPartitions, leader);
                                                                                
brokerToThread.put(leader, brokerThread);
                                                                                
newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for 
the subsequent partitions
                                                                        }
@@ -241,15 +246,13 @@ public class LegacyFetcher implements Fetcher {
 
        private <T> SimpleConsumerThread<T> 
createAndStartSimpleConsumerThread(SourceFunction.SourceContext<T> 
sourceContext,
                                                                                
                                                                
KeyedDeserializationSchema<T> deserializer,
-                                                                               
                                                                
HashMap<KafkaTopicPartition, Long> lastOffsets,
                                                                                
                                                                
List<FetchPartition> seedPartitions, Node leader) throws IOException, 
ClassNotFoundException {
-               SimpleConsumerThread<T> brokerThread;
                final KeyedDeserializationSchema<T> clonedDeserializer =
                                InstantiationUtil.clone(deserializer, 
userCodeClassloader);
 
                // seed thread with list of fetch partitions (otherwise it 
would shut down immediately again
-               brokerThread = new SimpleConsumerThread<>(this, config,
-                               leader, seedPartitions, unassignedPartitions, 
sourceContext, clonedDeserializer, lastOffsets);
+               SimpleConsumerThread<T> brokerThread = new 
SimpleConsumerThread<>(this, config,
+                               leader, seedPartitions, unassignedPartitions, 
sourceContext, clonedDeserializer);
 
                brokerThread.setName(String.format("SimpleConsumer - %s - 
broker-%s (%s:%d)",
                                taskName, leader.id(), leader.host(), 
leader.port()));
@@ -387,8 +390,7 @@ public class LegacyFetcher implements Fetcher {
                
                private final SourceFunction.SourceContext<T> sourceContext;
                private final KeyedDeserializationSchema<T> deserializer;
-               private final HashMap<KafkaTopicPartition, Long> offsetsState;
-               
+
                private final List<FetchPartition> partitions;
                
                private final Node broker;
@@ -399,8 +401,6 @@ public class LegacyFetcher implements Fetcher {
 
                private final ClosableBlockingQueue<FetchPartition> 
unassignedPartitions;
 
-
-
                private volatile boolean running = true;
 
                /** Queue containing new fetch partitions for the consumer 
thread */
@@ -424,15 +424,13 @@ public class LegacyFetcher implements Fetcher {
                                                                        
List<FetchPartition> seedPartitions,
                                                                        
ClosableBlockingQueue<FetchPartition> unassignedPartitions,
                                                                        
SourceFunction.SourceContext<T> sourceContext,
-                                                                       
KeyedDeserializationSchema<T> deserializer,
-                                                                       
HashMap<KafkaTopicPartition, Long> offsetsState) {
+                                                                       
KeyedDeserializationSchema<T> deserializer) {
                        this.owner = owner;
                        this.config = config;
                        this.broker = broker;
                        this.partitions = seedPartitions;
                        this.sourceContext = requireNonNull(sourceContext);
                        this.deserializer = requireNonNull(deserializer);
-                       this.offsetsState = requireNonNull(offsetsState);
                        this.unassignedPartitions = 
requireNonNull(unassignedPartitions);
 
                        this.soTimeout = getIntFromConfig(config, 
"socket.timeout.ms", 30000);
@@ -661,8 +659,7 @@ public class LegacyFetcher implements Fetcher {
                                                                        
continue partitionsLoop;
                                                                }
                                                                synchronized 
(sourceContext.getCheckpointLock()) {
-                                                                       
sourceContext.collect(value);
-                                                                       
offsetsState.put(topicPartition, offset);
+                                                                       
owner.flinkKafkaConsumer.processElement(sourceContext, topicPartition, value, 
offset);
                                                                }
                                                                
                                                                // advance 
offset for the next request
@@ -703,7 +700,7 @@ public class LegacyFetcher implements Fetcher {
                        List<FetchPartition> partitionsToGetOffsetsFor = new 
ArrayList<>();
 
                        for (FetchPartition fp : partitions) {
-                               if (fp.nextOffsetToRead == 
FlinkKafkaConsumer08.OFFSET_NOT_SET) {
+                               if (fp.nextOffsetToRead == 
FlinkKafkaConsumerBase.OFFSET_NOT_SET) {
                                        // retrieve the offset from the consumer
                                        partitionsToGetOffsetsFor.add(fp);
                                }
@@ -716,7 +713,7 @@ public class LegacyFetcher implements Fetcher {
                                // we subtract -1 from the offset
                                synchronized 
(sourceContext.getCheckpointLock()) {
                                        for(FetchPartition fp: 
partitionsToGetOffsetsFor) {
-                                               this.offsetsState.put(new 
KafkaTopicPartition(fp.topic, fp.partition), fp.nextOffsetToRead - 1L);
+                                               
owner.flinkKafkaConsumer.updateOffsetForPartition(new 
KafkaTopicPartition(fp.topic, fp.partition), fp.nextOffsetToRead - 1L);
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index 003e24f..328cab0 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -24,7 +24,7 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
        
-       private static final long OFFSET_NOT_SET = 
FlinkKafkaConsumer08.OFFSET_NOT_SET;
+       private static final long OFFSET_NOT_SET = 
FlinkKafkaConsumerBase.OFFSET_NOT_SET;
 
        private final String groupId;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index d704fbd..d6ee968 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -61,6 +61,16 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
        }
 
        @Test(timeout = 60000)
+       public void testPunctuatedExplicitWMConsumer() throws Exception {
+               runExplicitPunctuatedWMgeneratingConsumerTest(false);
+       }
+
+       @Test(timeout = 60000)
+       public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
+               runExplicitPunctuatedWMgeneratingConsumerTest(true);
+       }
+
+       @Test(timeout = 60000)
        public void testKeyValueSupport() throws Exception {
                runKeyValueTest();
        }
@@ -198,9 +208,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
                LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
 
-               assertTrue(o1 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o1 >= 
0 && o1 <= 100));
-               assertTrue(o2 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o2 >= 
0 && o2 <= 100));
-               assertTrue(o3 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o3 >= 
0 && o3 <= 100));
+               assertTrue(o1 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o1 
>= 0 && o1 <= 100));
+               assertTrue(o2 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o2 
>= 0 && o2 <= 100));
+               assertTrue(o3 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o3 
>= 0 && o3 <= 100));
 
                LOG.info("Manipulating offsets");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
index 113ad71..7337f65 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
 
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -81,7 +82,7 @@ public class KafkaConsumerTest {
        @Test
        public void testSnapshot() {
                try {
-                       Field offsetsField = 
FlinkKafkaConsumerBase.class.getDeclaredField("offsetsState");
+                       Field offsetsField = 
FlinkKafkaConsumerBase.class.getDeclaredField("partitionState");
                        Field runningField = 
FlinkKafkaConsumerBase.class.getDeclaredField("running");
                        Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
 
@@ -92,18 +93,19 @@ public class KafkaConsumerTest {
                        FlinkKafkaConsumer08<?> consumer = 
mock(FlinkKafkaConsumer08.class);
                        when(consumer.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
 
-
+                       HashMap<KafkaTopicPartition, KafkaPartitionState> 
testState = new HashMap<>();
                        HashMap<KafkaTopicPartition, Long> testOffsets = new 
HashMap<>();
                        long[] offsets = new long[] { 43, 6146, 133, 16, 162, 
616 };
                        int j = 0;
                        for (long i: offsets) {
                                KafkaTopicPartition ktp = new 
KafkaTopicPartition("topic", j++);
+                               testState.put(ktp, new 
KafkaPartitionState(ktp.getPartition(), i));
                                testOffsets.put(ktp, i);
                        }
 
                        LinkedMap map = new LinkedMap();
 
-                       offsetsField.set(consumer, testOffsets);
+                       offsetsField.set(consumer, testState);
                        runningField.set(consumer, true);
                        mapField.set(consumer, map);
 
@@ -118,7 +120,9 @@ public class KafkaConsumerTest {
                                HashMap<KafkaTopicPartition, Long> 
checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone();
 
                                for (Map.Entry<KafkaTopicPartition, Long> e: 
testOffsets.entrySet()) {
-                                       testOffsets.put(e.getKey(), 
e.getValue() + 1);
+                                       KafkaTopicPartition ktp = e.getKey();
+                                       testState.put(ktp, new 
KafkaPartitionState(ktp.getPartition(), e.getValue() + 1));
+                                       testOffsets.put(ktp, e.getValue() + 1);
                                }
 
                                assertEquals(checkpointCopy, checkpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 3b780bd..55f9875 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -269,7 +269,7 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
                this.subscribedPartitionsAsFlink = 
assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex);
                if(this.subscribedPartitionsAsFlink.isEmpty()) {
                        LOG.info("This consumer doesn't have any partitions 
assigned");
-                       this.offsetsState = null;
+                       this.partitionState = null;
                        return;
                } else {
                        StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
@@ -302,13 +302,13 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
                // check if we need to explicitly seek to a specific offset 
(restore case)
                if(restoreToOffset != null) {
                        // we are in a recovery scenario
-                       for(Map.Entry<KafkaTopicPartition, Long> offset: 
restoreToOffset.entrySet()) {
+                       for(Map.Entry<KafkaTopicPartition, Long> info: 
restoreToOffset.entrySet()) {
                                // seek all offsets to the right position
-                               this.consumer.seek(new 
TopicPartition(offset.getKey().getTopic(), offset.getKey().getPartition()), 
offset.getValue() + 1);
+                               this.consumer.seek(new 
TopicPartition(info.getKey().getTopic(), info.getKey().getPartition()), 
info.getValue() + 1);
                        }
-                       this.offsetsState = restoreToOffset;
+                       this.partitionState = restoreInfoFromCheckpoint();
                } else {
-                       this.offsetsState = new HashMap<>();
+                       this.partitionState = new HashMap<>();
                }
        }
 
@@ -474,8 +474,7 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
                                                                break pollLoop;
                                                        }
                                                        synchronized 
(sourceContext.getCheckpointLock()) {
-                                                               
sourceContext.collect(value);
-                                                               
flinkKafkaConsumer.offsetsState.put(flinkPartition, record.offset());
+                                                               
flinkKafkaConsumer.processElement(sourceContext, flinkPartition, value, 
record.offset());
                                                        }
                                                }
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index eb152a2..82e1dce 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -36,13 +36,22 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
                runFailOnNoBrokerTest();
        }
 
-
        @Test(timeout = 60000)
        public void testConcurrentProducerConsumerTopology() throws Exception {
                runSimpleConcurrentProducerConsumerTopology();
        }
 
        @Test(timeout = 60000)
+       public void testPunctuatedExplicitWMConsumer() throws Exception {
+               runExplicitPunctuatedWMgeneratingConsumerTest(false);
+       }
+
+       @Test(timeout = 60000)
+       public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
+               runExplicitPunctuatedWMgeneratingConsumerTest(true);
+       }
+
+       @Test(timeout = 60000)
        public void testKeyValueSupport() throws Exception {
                runKeyValueTest();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 5f20f16..d9e813f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -18,12 +18,22 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.TimestampAssigner;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +48,7 @@ import static java.util.Objects.requireNonNull;
 import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.checkArgument;
 
 public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFunction<T>
-               implements CheckpointListener, 
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, 
ResultTypeQueryable<T> {
+               implements CheckpointListener, 
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, 
ResultTypeQueryable<T>, Triggerable {
 
        // 
------------------------------------------------------------------------
 
@@ -46,6 +56,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
        private static final long serialVersionUID = -6272159445203409112L;
 
+       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
+        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
+       public static final long OFFSET_NOT_SET = -915623761776L;
+
        /** The maximum number of pending non-committed checkpoints to track, 
to avoid memory leaks */
        public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
 
@@ -58,8 +72,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        /** Data for pending but uncommitted checkpoints */
        protected final LinkedMap pendingCheckpoints = new LinkedMap();
 
-       /** The offsets of the last returned elements */
-       protected transient HashMap<KafkaTopicPartition, Long> offsetsState;
+       /**
+        * Information about the partitions being read by the local consumer. 
This contains:
+        * offsets of the last returned elements, and if a timestamp assigner 
is used, it
+        * also contains the maximum seen timestamp in the partition and if the 
partition
+        * still receives elements or it is inactive.
+        */
+       protected transient HashMap<KafkaTopicPartition, KafkaPartitionState> 
partitionState;
 
        /** The offsets to restore to, if the consumer restores state from a 
checkpoint */
        protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
@@ -68,13 +87,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        protected volatile boolean running = true;
 
        // 
------------------------------------------------------------------------
+       //                                                      WATERMARK 
EMISSION
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The user-specified methods to extract the timestamps from the 
records in Kafka, and
+        * to decide when to emit watermarks.
+        */
+       private AssignerWithPunctuatedWatermarks<T> punctuatedWatermarkAssigner;
+
+       /**
+        * The user-specified methods to extract the timestamps from the 
records in Kafka, and
+        * to decide when to emit watermarks.
+        */
+       private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+
+       private StreamingRuntimeContext runtime = null;
+
+       private SourceContext<T> srcContext = null;
+
+       /**
+        * The interval between consecutive periodic watermark emissions,
+        * as configured via the {@link 
ExecutionConfig#getAutoWatermarkInterval()}.
+        */
+       private long watermarkInterval = -1;
 
+       /** The last emitted watermark. */
+       private long lastEmittedWatermark = Long.MIN_VALUE;
+
+       // 
------------------------------------------------------------------------
 
        /**
         * Creates a new Flink Kafka Consumer, using the given type of fetcher 
and offset handler.
         *
         * <p>To determine which kink of fetcher and offset handler to use, 
please refer to the docs
-        * at the beginnign of this class.</p>
+        * at the beginning of this class.</p>
         *
         * @param deserializer
         *           The deserializer to turn raw byte messages into Java/Scala 
objects.
@@ -85,13 +132,300 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                this.deserializer = requireNonNull(deserializer, 
"valueDeserializer");
        }
 
+       /**
+        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner. Bare in mind
+        * that the source can either have an {@link 
AssignerWithPunctuatedWatermarks} or an
+        * {@link AssignerWithPeriodicWatermarks}, not both.
+        */
+       public FlinkKafkaConsumerBase<T> 
setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
+               checkEmitterDuringInit();
+               this.punctuatedWatermarkAssigner = assigner;
+               return this;
+       }
+
+       /**
+        * Specifies an {@link AssignerWithPeriodicWatermarks} to emit 
watermarks periodically. Bare in mind that the
+        * source can either have an {@link AssignerWithPunctuatedWatermarks} 
or an
+        * {@link AssignerWithPeriodicWatermarks}, not both.
+        */
+       public FlinkKafkaConsumerBase<T> 
setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
+               checkEmitterDuringInit();
+               this.periodicWatermarkAssigner = assigner;
+               return this;
+       }
+
+       /**
+        * Processes the element after having been read from Kafka and 
deserialized, and updates the
+        * last read offset for the specifies partition. These two actions 
should be performed in
+        * an atomic way in order to guarantee exactly once semantics.
+        * @param sourceContext
+        *           The context the task operates in.
+        * @param partDescriptor
+        *            A descriptor containing the topic and the id of the 
partition.
+        * @param value
+        *           The element to process.
+        * @param offset
+        *           The offset of the element in the partition.
+        * */
+       public void processElement(SourceContext<T> sourceContext, 
KafkaTopicPartition partDescriptor, T value, long offset) {
+               if (punctuatedWatermarkAssigner == null && 
periodicWatermarkAssigner == null) {
+                       // the case where no watermark emitter is specified.
+                       sourceContext.collect(value);
+               } else {
+
+                       if (srcContext == null) {
+                               srcContext = sourceContext;
+                       }
+
+                       long extractedTimestamp = 
extractTimestampAndEmitElement(partDescriptor, value);
+
+                       // depending on the specified watermark emitter, either 
send a punctuated watermark,
+                       // or set the timer for the first periodic watermark. 
In the periodic case, we set the timer
+                       // only for the first watermark, as it is the trigger() 
that will set the subsequent ones.
+
+                       if (punctuatedWatermarkAssigner != null) {
+                               final Watermark nextWatermark = 
punctuatedWatermarkAssigner
+                                       .checkAndGetNextWatermark(value, 
extractedTimestamp);
+                               if (nextWatermark != null) {
+                                       
emitWatermarkIfMarkingProgress(sourceContext);
+                               }
+                       } else if(periodicWatermarkAssigner != null && runtime 
== null) {
+                               runtime = (StreamingRuntimeContext) 
getRuntimeContext();
+                               watermarkInterval = 
runtime.getExecutionConfig().getAutoWatermarkInterval();
+                               if (watermarkInterval > 0) {
+                                       
runtime.registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+                               }
+                       }
+               }
+               updateOffsetForPartition(partDescriptor, offset);
+       }
+
+       /**
+        * Extract the timestamp from the element based on the user-specified 
extractor,
+        * emit the element with the new timestamp, and update the partition 
monitoring info (if necessary).
+        * In more detail, upon reception of an element with a timestamp 
greater than the greatest timestamp
+        * seen so far in that partition, this method updates the maximum 
timestamp seen for that partition,
+        * and marks the partition as {@code active}, i.e. it still receives 
fresh data.
+        * @param partDescriptor the partition the new element belongs to.
+        * @param value the element to be forwarded.
+        * @return the timestamp of the new element.
+        */
+       private long extractTimestampAndEmitElement(KafkaTopicPartition 
partDescriptor, T value) {
+               long extractedTimestamp = 
getTimestampAssigner().extractTimestamp(value, Long.MIN_VALUE);
+               srcContext.collectWithTimestamp(value, extractedTimestamp);
+               updateMaximumTimestampForPartition(partDescriptor, 
extractedTimestamp);
+               return extractedTimestamp;
+       }
+
+       /**
+        * Upon reception of an element with a timestamp greater than the 
greatest timestamp seen so far in the partition,
+        * this method updates the maximum timestamp seen for that partition to 
{@code timestamp}, and marks the partition
+        * as {@code active}, i.e. it still receives fresh data. If the 
partition is not known to the system, then a new
+        * {@link KafkaPartitionState} is created and is associated to the new 
partition for future monitoring.
+        * @param partDescriptor
+        *            A descriptor containing the topic and the id of the 
partition.
+        * @param timestamp
+        *           The timestamp to set the minimum to, if smaller than the 
already existing one.
+        * @return {@code true} if the minimum was updated successfully to 
{@code timestamp}, {@code false}
+        *           if the previous value is smaller than the provided 
timestamp
+        * */
+       private boolean updateMaximumTimestampForPartition(KafkaTopicPartition 
partDescriptor, long timestamp) {
+               KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
+
+               if(timestamp > info.getMaxTimestamp()) {
+
+                       // the flag is set to false as soon as the current 
partition's max timestamp is sent as a watermark.
+                       // if then, and for that partition, only late elements 
arrive, then the max timestamp will stay the
+                       // same, and it will keep the overall system from 
progressing.
+                       // To avoid this, we only mark a partition as active on 
non-late elements.
+
+                       info.setActive(true);
+                       info.setMaxTimestamp(timestamp);
+                       return  true;
+               }
+               return false;
+       }
+
+       /**
+        * Updates the last read offset for the partition specified by the 
{@code partDescriptor} to {@code offset}.
+        * If it is the first time we see the partition, then a new {@link 
KafkaPartitionState} is created to monitor
+        * this specific partition.
+        * @param partDescriptor the partition whose info to update.
+        * @param offset the last read offset of the partition.
+        */
+       public void updateOffsetForPartition(KafkaTopicPartition 
partDescriptor, long offset) {
+               KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
+               info.setOffset(offset);
+       }
+
+       @Override
+       public void trigger(long timestamp) throws Exception {
+               if(this.srcContext == null) {
+                       // if the trigger is called before any elements, then we
+                       // just set the next timer to fire when it should and we
+                       // ignore the triggering as this would produce no 
results.
+                       setNextWatermarkTimer();
+                       return;
+               }
+
+               // this is valid because this method is only called when 
watermarks
+               // are set to be emitted periodically.
+               final Watermark nextWatermark = 
periodicWatermarkAssigner.getCurrentWatermark();
+               if(nextWatermark != null) {
+                       emitWatermarkIfMarkingProgress(srcContext);
+               }
+               setNextWatermarkTimer();
+       }
+
+       /**
+        * Emits a new watermark, with timestamp equal to the minimum across 
all the maximum timestamps
+        * seen per local partition (across all topics). The new watermark is 
emitted if and only if
+        * it signals progress in event-time, i.e. if its timestamp is greater 
than the timestamp of
+        * the last emitted watermark. In addition, this method marks as 
inactive the partition whose
+        * timestamp was emitted as watermark, i.e. the one with the minimum 
across the maximum timestamps
+        * of the local partitions. This is done to avoid not making progress 
because
+        * a partition stopped receiving data. The partition is going to be 
marked as {@code active}
+        * as soon as the <i>next non-late</i> element arrives.
+        *
+        * @return {@code true} if the Watermark was successfully emitted, 
{@code false} otherwise.
+        */
+       private boolean 
emitWatermarkIfMarkingProgress(SourceFunction.SourceContext<T> sourceContext) {
+               Tuple2<KafkaTopicPartition, Long> globalMinTs = 
getMinTimestampAcrossAllTopics();
+               if(globalMinTs.f0 != null ) {
+                       synchronized (sourceContext.getCheckpointLock()) {
+                               long minTs = globalMinTs.f1;
+                               if(minTs > lastEmittedWatermark) {
+                                       lastEmittedWatermark = minTs;
+                                       Watermark toEmit = new Watermark(minTs);
+                                       sourceContext.emitWatermark(toEmit);
+                                       return true;
+                               }
+                       }
+               }
+               return false;
+       }
+
+       /**
+        * Kafka sources with timestamp extractors are expected to keep the 
maximum timestamp seen per
+        * partition they are reading from. This is to mark the per-partition 
event-time progress.
+        *
+        * This method iterates this list, and returns the minimum timestamp 
across these per-partition
+        * max timestamps, and across all topics. In addition to this 
information, it also returns the topic and
+        * the partition within the topic the timestamp belongs to.
+        */
+       private Tuple2<KafkaTopicPartition, Long> 
getMinTimestampAcrossAllTopics() {
+               Tuple2<KafkaTopicPartition, Long> minTimestamp = new 
Tuple2<>(null, Long.MAX_VALUE);
+               for(Map.Entry<KafkaTopicPartition, KafkaPartitionState> 
entries: partitionState.entrySet()) {
+                       KafkaTopicPartition part = entries.getKey();
+                       KafkaPartitionState info = entries.getValue();
+
+                       if(partitionIsActive(part) && info.getMaxTimestamp() < 
minTimestamp.f1) {
+                               minTimestamp.f0 = part;
+                               minTimestamp.f1 = info.getMaxTimestamp();
+                       }
+               }
+
+               if(minTimestamp.f0 != null) {
+                       // it means that we have a winner and we have to set 
its flag to
+                       // inactive, until its next non-late element.
+                       KafkaTopicPartition partitionDescriptor = 
minTimestamp.f0;
+                       setActiveFlagForPartition(partitionDescriptor, false);
+               }
+
+               return minTimestamp;
+       }
+
+       /**
+        * Sets the {@code active} flag for a given partition of a topic to 
{@code isActive}.
+        * This flag signals if the partition is still receiving data and it is 
used to avoid the case
+        * where a partition stops receiving data, so its max seen timestamp 
does not advance, and it
+        * holds back the progress of the watermark for all partitions. Note 
that if the partition is
+        * not known to the system, then a new {@link KafkaPartitionState} is 
created and is associated
+        * to the new partition for future monitoring.
+        *
+        * @param partDescriptor
+        *                              A descriptor containing the topic and 
the id of the partition.
+        * @param isActive
+        *                              The value {@code true} or {@code false} 
to set the flag to.
+        */
+       private void setActiveFlagForPartition(KafkaTopicPartition 
partDescriptor, boolean isActive) {
+               KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
+               info.setActive(isActive);
+       }
+
+       /**
+        * Gets the statistics for a given partition specified by the {@code 
partition} argument.
+        * If it is the first time we see this partition, a new {@link 
KafkaPartitionState} data structure
+        * is initialized to monitor it from now on. This method never throws a 
{@link NullPointerException}.
+        * @param partition the partition to be fetched.
+        * @return the gathered statistics for that partition.
+        * */
+       private KafkaPartitionState getOrInitializeInfo(KafkaTopicPartition 
partition) {
+               KafkaPartitionState info = partitionState.get(partition);
+               if(info == null) {
+                       info = new 
KafkaPartitionState(partition.getPartition(), 
FlinkKafkaConsumerBase.OFFSET_NOT_SET);
+                       partitionState.put(partition, info);
+               }
+               return info;
+       }
+
+       /**
+        * Checks if a partition of a topic is still active, i.e. if it still 
receives data.
+        * @param partDescriptor
+        *          A descriptor containing the topic and the id of the 
partition.
+        * */
+       private boolean partitionIsActive(KafkaTopicPartition partDescriptor) {
+               KafkaPartitionState info = partitionState.get(partDescriptor);
+               if(info == null) {
+                       throw new RuntimeException("Unknown Partition: Topic=" 
+ partDescriptor.getTopic() +
+                               " Partition=" + partDescriptor.getPartition());
+               }
+               return info.isActive();
+       }
+
+       private TimestampAssigner<T> getTimestampAssigner() {
+               checkEmitterStateAfterInit();
+               return periodicWatermarkAssigner != null ? 
periodicWatermarkAssigner : punctuatedWatermarkAssigner;
+       }
+
+       private void setNextWatermarkTimer() {
+               long timeToNextWatermark = System.currentTimeMillis() + 
watermarkInterval;
+               runtime.registerTimer(timeToNextWatermark, this);
+       }
+
+       private void checkEmitterDuringInit() {
+               if(periodicWatermarkAssigner != null) {
+                       throw new RuntimeException("A periodic watermark 
emitter has already been provided.");
+               } else if(punctuatedWatermarkAssigner != null) {
+                       throw new RuntimeException("A punctuated watermark 
emitter has already been provided.");
+               }
+       }
+
+       private void checkEmitterStateAfterInit() {
+               if(periodicWatermarkAssigner == null && 
punctuatedWatermarkAssigner == null) {
+                       throw new RuntimeException("The timestamp assigner has 
not been initialized.");
+               } else if(periodicWatermarkAssigner != null && 
punctuatedWatermarkAssigner != null) {
+                       throw new RuntimeException("The source can either have 
an assigner with punctuated " +
+                               "watermarks or one with periodic watermarks, 
not both.");
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Checkpoint and restore
        // 
------------------------------------------------------------------------
 
+       HashMap<KafkaTopicPartition, KafkaPartitionState> 
restoreInfoFromCheckpoint() {
+               HashMap<KafkaTopicPartition, KafkaPartitionState> partInfo = 
new HashMap<>(restoreToOffset.size());
+               for(Map.Entry<KafkaTopicPartition, Long> offsets: 
restoreToOffset.entrySet()) {
+                       KafkaTopicPartition key = offsets.getKey();
+                       partInfo.put(key, new 
KafkaPartitionState(key.getPartition(), offsets.getValue()));
+               }
+               return partInfo;
+       }
+
        @Override
        public HashMap<KafkaTopicPartition, Long> snapshotState(long 
checkpointId, long checkpointTimestamp) throws Exception {
-               if (offsetsState == null) {
+               if (partitionState == null) {
                        LOG.debug("snapshotState() requested on not yet opened 
source; returning null.");
                        return null;
                }
@@ -100,15 +434,16 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        return null;
                }
 
+               HashMap<KafkaTopicPartition, Long> currentOffsets = new 
HashMap<>();
+               for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: 
partitionState.entrySet()) {
+                       currentOffsets.put(entry.getKey(), 
entry.getValue().getOffset());
+               }
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Snapshotting state. Offsets: {}, checkpoint 
id: {}, timestamp: {}",
-                                       
KafkaTopicPartition.toString(offsetsState), checkpointId, checkpointTimestamp);
+                                       
KafkaTopicPartition.toString(currentOffsets), checkpointId, 
checkpointTimestamp);
                }
 
-               // the use of clone() is okay here is okay, we just need a new 
map, the keys are not changed
-               //noinspection unchecked
-               HashMap<KafkaTopicPartition, Long> currentOffsets = 
(HashMap<KafkaTopicPartition, Long>) offsetsState.clone();
-
                // the map cannot be asynchronously updated, because only one 
checkpoint call can happen
                // on this function at a time: either snapshotState() or 
notifyCheckpointComplete()
                pendingCheckpoints.put(checkpointId, currentOffsets);
@@ -128,7 +463,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               if (offsetsState == null) {
+               if (partitionState == null) {
                        LOG.debug("notifyCheckpointComplete() called on 
uninitialized source");
                        return;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
new file mode 100644
index 0000000..11a392a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.io.Serializable;
+
+public class KafkaPartitionState implements Serializable {
+
+       private static final long serialVersionUID = 722083576322742328L;
+
+       private final int partitionID;
+       private long offset;
+
+       private long maxTimestamp = Long.MIN_VALUE;
+       private boolean isActive = false;
+
+       public KafkaPartitionState(int id, long offset) {
+               this.partitionID = id;
+               this.offset = offset;
+       }
+
+       public void setOffset(long offset) {
+               this.offset = offset;
+       }
+
+       public void setActive(boolean isActive) {
+               this.isActive = isActive;
+       }
+
+       public void setMaxTimestamp(long timestamp) {
+               maxTimestamp = timestamp;
+       }
+
+       public int getPartition() {
+               return partitionID;
+       }
+
+       public boolean isActive() {
+               return isActive;
+       }
+
+       public long getMaxTimestamp() {
+               return maxTimestamp;
+       }
+
+       public long getOffset() {
+               return offset;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2cd59e6..340950b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -50,15 +51,21 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobExecutionException;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
@@ -69,6 +76,7 @@ import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -374,7 +382,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                deleteTestTopic(topic);
        }
 
-
        /**
         * Tests the proper consumption when having a 1:1 correspondence 
between kafka partitions and
         * Flink sources.
@@ -1443,4 +1450,229 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        this.numElementsTotal = state;
                }
        }
+
+       /////////////                   Testing the Kafka consumer with embeded 
watermark generation functionality                      ///////////////
+
+       @RetryOnException(times=0, 
exception=kafka.common.NotLeaderForPartitionException.class)
+       public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean 
emptyPartition) throws Exception {
+
+               final String topic1 = "wmExtractorTopic1_" + 
UUID.randomUUID().toString();
+               final String topic2 = "wmExtractorTopic2_" + 
UUID.randomUUID().toString();
+
+               final Map<String, Boolean> topics = new HashMap<>();
+               topics.put(topic1, false);
+               topics.put(topic2, emptyPartition);
+
+               final int noOfTopcis = topics.size();
+               final int partitionsPerTopic = 1;
+               final int elementsPerPartition = 100 + 1;
+
+               final int totalElements = emptyPartition ?
+                       partitionsPerTopic * elementsPerPartition :
+                       noOfTopcis * partitionsPerTopic * elementsPerPartition;
+
+               createTestTopic(topic1, partitionsPerTopic, 1);
+               createTestTopic(topic2, partitionsPerTopic, 1);
+
+               final StreamExecutionEnvironment env =
+                       
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(partitionsPerTopic);
+               env.setRestartStrategy(RestartStrategies.noRestart()); // fail 
immediately
+               env.getConfig().disableSysoutLogging();
+
+               TypeInformation<Tuple2<Long, Integer>> longIntType = 
TypeInfoParser.parse("Tuple2<Long, Integer>");
+
+               Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+               producerProperties.setProperty("retries", "0");
+
+               putDataInTopics(env, producerProperties, elementsPerPartition, 
topics, longIntType);
+
+               List<String> topicTitles = new ArrayList<>(topics.keySet());
+               runPunctuatedComsumer(env, topicTitles, totalElements, 
longIntType);
+
+               executeAndCatchException(env, 
"runComsumerWithPunctuatedExplicitWMTest");
+
+               for(String topic: topicTitles) {
+                       deleteTestTopic(topic);
+               }
+       }
+
+       private void executeAndCatchException(StreamExecutionEnvironment env, 
String execName) throws Exception {
+               try {
+                       tryExecutePropagateExceptions(env, execName);
+               }
+               catch (ProgramInvocationException | JobExecutionException e) {
+                       // look for NotLeaderForPartitionException
+                       Throwable cause = e.getCause();
+
+                       // search for nested SuccessExceptions
+                       int depth = 0;
+                       while (cause != null && depth++ < 20) {
+                               if (cause instanceof 
kafka.common.NotLeaderForPartitionException) {
+                                       throw (Exception) cause;
+                               }
+                               cause = cause.getCause();
+                       }
+                       throw e;
+               }
+       }
+
+       private void putDataInTopics(StreamExecutionEnvironment env,
+                                                               Properties 
producerProperties,
+                                                               final int 
elementsPerPartition,
+                                                               Map<String, 
Boolean> topics,
+                                                               
TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
+               if(topics.size() != 2) {
+                       throw new RuntimeException("This method accepts two 
topics as arguments.");
+               }
+
+               TypeInformationSerializationSchema<Tuple2<Long, Integer>> 
sinkSchema =
+                       new 
TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
+
+               DataStream<Tuple2<Long, Integer>> stream = env
+                       .addSource(new RichParallelSourceFunction<Tuple2<Long, 
Integer>>() {
+                               private boolean running = true;
+
+                               @Override
+                               public void run(SourceContext<Tuple2<Long, 
Integer>> ctx) throws InterruptedException {
+                                       int topic = 0;
+                                       int currentTs = 1;
+
+                                       while (running && currentTs < 
elementsPerPartition) {
+                                               long timestamp = (currentTs % 
10 == 0) ? -1L : currentTs;
+                                               ctx.collect(new Tuple2<Long, 
Integer>(timestamp, topic));
+                                               currentTs++;
+                                       }
+
+                                       Tuple2<Long, Integer> toWrite2 = new 
Tuple2<Long, Integer>(-1L, topic);
+                                       ctx.collect(toWrite2);
+                               }
+
+                               @Override
+                               public void cancel() {
+                               running = false;
+                       }
+                       }).setParallelism(1);
+
+               List<Map.Entry<String, Boolean>> topicsL = new 
ArrayList<>(topics.entrySet());
+               stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
+
+                       @Override
+                       public Tuple2<Long, Integer> map(Tuple2<Long, Integer> 
value) throws Exception {
+                               return value;
+                       }
+               
}).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(0).getKey(),
+                       new KeyedSerializationSchemaWrapper<>(sinkSchema), 
producerProperties, null)).setParallelism(1);
+
+               if(!topicsL.get(1).getValue()) {
+                       stream.map(new MapFunction<Tuple2<Long,Integer>, 
Tuple2<Long,Integer>>() {
+
+                               @Override
+                               public Tuple2<Long, Integer> map(Tuple2<Long, 
Integer> value) throws Exception {
+                                       long timestamp = (value.f0 == -1) ? -1L 
: 1000 + value.f0;
+                                       return new Tuple2<Long, 
Integer>(timestamp, 1);
+                               }
+                       
}).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(1).getKey(),
+                               new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, 
null)).setParallelism(1);
+               }
+       }
+
+       private DataStreamSink<Tuple2<Long, Integer>> 
runPunctuatedComsumer(StreamExecutionEnvironment env,
+                                                                               
                                                                List<String> 
topics,
+                                                                               
                                                                final int 
totalElementsToExpect,
+                                                                               
                                                                
TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) {
+
+               TypeInformationSerializationSchema<Tuple2<Long, Integer>> 
sourceSchema =
+                       new TypeInformationSerializationSchema<>(inputTypeInfo, 
env.getConfig());
+
+               FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = 
kafkaServer
+                       .getConsumer(topics, sourceSchema, standardProps)
+                       .setPunctuatedWatermarkEmitter(new 
TestPunctuatedTSExtractor());
+
+               DataStreamSource<Tuple2<Long, Integer>> consuming = 
env.setParallelism(1).addSource(source);
+
+               return consuming
+                       .transform("testingWatermarkOperator", inputTypeInfo, 
new WMTestingOperator())
+                       .addSink(new RichSinkFunction<Tuple2<Long, Integer>>() {
+
+                               private int elementCount = 0;
+
+                               @Override
+                               public void invoke(Tuple2<Long, Integer> value) 
throws Exception {
+                                       elementCount++;
+                                       if (elementCount == 
totalElementsToExpect) {
+                                               throw new SuccessException();
+                                       }
+                               }
+
+                               @Override
+                               public void close() throws Exception {
+                                       super.close();
+                               }
+                       });
+       }
+
+       /** An extractor that emits a Watermark whenever the timestamp <b>in 
the record</b> is equal to {@code -1}. */
+       private static class TestPunctuatedTSExtractor implements 
AssignerWithPunctuatedWatermarks<Tuple2<Long, Integer>> {
+
+               @Override
+               public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> 
lastElement, long extractedTimestamp) {
+                       return (lastElement.f0 == -1) ? new 
Watermark(extractedTimestamp) : null;
+               }
+
+               @Override
+               public long extractTimestamp(Tuple2<Long, Integer> element, 
long previousElementTimestamp) {
+                       return element.f0;
+               }
+       }
+
+       private static class WMTestingOperator extends 
AbstractStreamOperator<Tuple2<Long, Integer>> implements 
OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
+
+               private long lastReceivedWatermark = Long.MIN_VALUE;
+
+               private Map<Integer, Boolean> isEligible = new HashMap<>();
+               private Map<Integer, Long> perPartitionMaxTs = new HashMap<>();
+
+               WMTestingOperator() {
+                       isEligible = new HashMap<>();
+                       perPartitionMaxTs = new HashMap<>();
+               }
+
+               @Override
+               public void processElement(StreamRecord<Tuple2<Long, Integer>> 
element) throws Exception {
+                       int partition = element.getValue().f1;
+                       Long maxTs = perPartitionMaxTs.get(partition);
+                       if(maxTs == null || maxTs < element.getValue().f0) {
+                               perPartitionMaxTs.put(partition, 
element.getValue().f0);
+                               isEligible.put(partition, element.getValue().f0 
> lastReceivedWatermark);
+                       }
+                       output.collect(element);
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+                       int partition = -1;
+                       long minTS = Long.MAX_VALUE;
+                       for (Integer part : perPartitionMaxTs.keySet()) {
+                               Long ts = perPartitionMaxTs.get(part);
+                               if (ts < minTS && isEligible.get(part)) {
+                                       partition = part;
+                                       minTS = ts;
+                                       lastReceivedWatermark = ts;
+                               }
+                       }
+                       isEligible.put(partition, false);
+
+                       assertEquals(minTS, mark.getTimestamp());
+                       output.emitWatermark(mark);
+               }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       perPartitionMaxTs.clear();
+                       isEligible.clear();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
index 38ee394..4b17300 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
  * 
  * <p>Use this class to generate watermarks in a periodical interval.
  * At most every {@code i} milliseconds (configured via
- * {@link ExecutionConfig#getAutoWatermarkInterval()}, the system will call the
+ * {@link ExecutionConfig#getAutoWatermarkInterval()}), the system will call 
the
  * {@link #getCurrentWatermark()} method to probe for the next watermark value.
  * The system will generate a new watermark, if the probed value is non-null
  * and has a timestamp larger than that of the previous watermark (to preserve

Reply via email to