This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 793a784  [FLINK-10921] [kinesis] Shard watermark synchronization in 
Kinesis consumer
793a784 is described below

commit 793a78407aa22530448efbf18b714952eac40aba
Author: Thomas Weise <t...@apache.org>
AuthorDate: Wed May 22 21:42:15 2019 -0700

    [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer
---
 .../connectors/kinesis/FlinkKinesisConsumer.java   |  18 +-
 .../kinesis/config/ConsumerConfigConstants.java    |  11 +
 .../internals/DynamoDBStreamsDataFetcher.java      |   1 +
 .../kinesis/internals/KinesisDataFetcher.java      | 292 +++++++++++++++++++--
 .../kinesis/util/JobManagerWatermarkTracker.java   | 179 +++++++++++++
 .../connectors/kinesis/util/RecordEmitter.java     | 269 +++++++++++++++++++
 .../connectors/kinesis/util/WatermarkTracker.java  | 114 ++++++++
 .../kinesis/FlinkKinesisConsumerMigrationTest.java |   2 +-
 .../kinesis/FlinkKinesisConsumerTest.java          | 185 +++++++++++++
 .../kinesis/internals/ShardConsumerTest.java       |   9 +-
 .../testutils/TestableKinesisDataFetcher.java      |   1 +
 .../util/JobManagerWatermarkTrackerTest.java       | 101 +++++++
 .../connectors/kinesis/util/RecordEmitterTest.java |  81 ++++++
 .../kinesis/util/WatermarkTrackerTest.java         | 108 ++++++++
 14 files changed, 1342 insertions(+), 29 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 3c5e3c7..5b24ded 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
@@ -126,6 +127,7 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T> imple
        private KinesisShardAssigner shardAssigner = 
KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER;
 
        private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
+       private WatermarkTracker watermarkTracker;
 
        // 
------------------------------------------------------------------------
        //  Runtime state
@@ -254,6 +256,20 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T> imple
                ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
        }
 
+       public WatermarkTracker getWatermarkTracker() {
+               return this.watermarkTracker;
+       }
+
+       /**
+        * Set the global watermark tracker. When set, it will be used by the 
fetcher
+        * to align the shard consumers by event time.
+        * @param watermarkTracker
+        */
+       public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
+               this.watermarkTracker = watermarkTracker;
+               ClosureCleaner.clean(this.watermarkTracker, true);
+       }
+
        // 
------------------------------------------------------------------------
        //  Source life cycle
        // 
------------------------------------------------------------------------
@@ -448,7 +464,7 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T> imple
                        Properties configProps,
                        KinesisDeserializationSchema<T> deserializationSchema) {
 
-               return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner);
+               return new KinesisDataFetcher<>(streams, sourceContext, 
runtimeContext, configProps, deserializationSchema, shardAssigner, 
periodicWatermarkAssigner, watermarkTracker);
        }
 
        @VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 41ac6b8..2f5be97 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -125,6 +125,15 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
        /** The interval after which to consider a shard idle for purposes of 
watermark generation. */
        public static final String SHARD_IDLE_INTERVAL_MILLIS = 
"flink.shard.idle.interval";
 
+       /** The interval for periodically synchronizing the shared watermark 
state. */
+       public static final String WATERMARK_SYNC_MILLIS = 
"flink.watermark.sync.interval";
+
+       /** The maximum delta allowed for the reader to advance ahead of the 
shared global watermark. */
+       public static final String WATERMARK_LOOKAHEAD_MILLIS = 
"flink.watermark.lookahead.millis";
+
+       /** The maximum number of records that will be buffered before 
suspending consumption of a shard. */
+       public static final String WATERMARK_SYNC_QUEUE_CAPACITY = 
"flink.watermark.sync.queue.capacity";
+
        // 
------------------------------------------------------------------------
        //  Default values for consumer configuration
        // 
------------------------------------------------------------------------
@@ -173,6 +182,8 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
 
        public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1;
 
+       public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;
+
        /**
         * To avoid shard iterator expires in {@link ShardConsumer}s, the value 
for the configured
         * getRecords interval can not exceed 5 minutes, which is the expire 
time for retrieved iterators.
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
index c2b7be3..5620142 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -64,6 +64,7 @@ public class DynamoDBStreamsDataFetcher<T> extends 
KinesisDataFetcher<T> {
                        deserializationSchema,
                        shardAssigner,
                        null,
+                       null,
                        new AtomicReference<>(),
                        new ArrayList<>(),
                        
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 8c8d94a..eae3153 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -38,6 +38,9 @@ import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
@@ -191,6 +194,9 @@ public class KinesisDataFetcher<T> {
        private volatile boolean running = true;
 
        private final AssignerWithPeriodicWatermarks<T> 
periodicWatermarkAssigner;
+       private final WatermarkTracker watermarkTracker;
+       private final transient RecordEmitter recordEmitter;
+       private transient boolean isIdle;
 
        /**
         * The watermark related state for each shard consumer. Entries in this 
map will be created when shards
@@ -207,6 +213,14 @@ public class KinesisDataFetcher<T> {
        private long lastWatermark = Long.MIN_VALUE;
 
        /**
+        * The next watermark used for synchronization.
+        * For purposes of global watermark calculation, we need to consider 
the next watermark based
+        * on the buffered records vs. the last emitted watermark to allow for 
progress.
+        */
+       private long nextWatermark = Long.MIN_VALUE;
+
+
+       /**
         * The time span since last consumed record, after which a shard will 
be considered idle for purpose of watermark
         * calculation. A positive value will allow the watermark to progress 
even when some shards don't receive new records.
         */
@@ -220,6 +234,82 @@ public class KinesisDataFetcher<T> {
        }
 
        /**
+        * The wrapper that holds the watermark handling related parameters
+        * of a record produced by the shard consumer thread.
+        *
+        * @param <T>
+        */
+       private static class RecordWrapper<T> extends TimestampedValue<T> {
+               int shardStateIndex;
+               SequenceNumber lastSequenceNumber;
+               long timestamp;
+               Watermark watermark;
+
+               private RecordWrapper(T record, long timestamp) {
+                       super(record, timestamp);
+                       this.timestamp = timestamp;
+               }
+
+               @Override
+               public long getTimestamp() {
+                       return timestamp;
+               }
+       }
+
+       /** Kinesis data fetcher specific, asynchronous record emitter. */
+       private class AsyncKinesisRecordEmitter extends 
RecordEmitter<RecordWrapper<T>> {
+
+               private AsyncKinesisRecordEmitter() {
+                       this(DEFAULT_QUEUE_CAPACITY);
+               }
+
+               private AsyncKinesisRecordEmitter(int queueCapacity) {
+                       super(queueCapacity);
+               }
+
+               @Override
+               public void emit(RecordWrapper<T> record, 
RecordQueue<RecordWrapper<T>> queue) {
+                       emitRecordAndUpdateState(record);
+                       ShardWatermarkState<T> sws = 
shardWatermarks.get(queue.getQueueId());
+                       sws.lastEmittedRecordWatermark = record.watermark;
+               }
+       }
+
+       /** Synchronous emitter for use w/o watermark synchronization. */
+       private class SyncKinesisRecordEmitter extends 
AsyncKinesisRecordEmitter {
+               private final ConcurrentHashMap<Integer, 
RecordQueue<RecordWrapper<T>>> queues =
+                       new ConcurrentHashMap<>();
+
+               @Override
+               public RecordQueue<RecordWrapper<T>> getQueue(int 
producerIndex) {
+                       return queues.computeIfAbsent(producerIndex, (key) -> {
+                               return new RecordQueue<RecordWrapper<T>>() {
+                                       @Override
+                                       public void put(RecordWrapper<T> 
record) {
+                                               emit(record, this);
+                                       }
+
+                                       @Override
+                                       public int getQueueId() {
+                                               return producerIndex;
+                                       }
+
+                                       @Override
+                                       public int getSize() {
+                                               return 0;
+                                       }
+
+                                       @Override
+                                       public RecordWrapper<T> peek() {
+                                               return null;
+                                       }
+
+                               };
+                       });
+               }
+       }
+
+       /**
         * Creates a Kinesis Data Fetcher.
         *
         * @param streams the streams to subscribe to
@@ -234,7 +324,8 @@ public class KinesisDataFetcher<T> {
                                                        Properties configProps,
                                                        
KinesisDeserializationSchema<T> deserializationSchema,
                                                        KinesisShardAssigner 
shardAssigner,
-                                                       
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
+                                                       
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+                                                       WatermarkTracker 
watermarkTracker) {
                this(streams,
                        sourceContext,
                        sourceContext.getCheckpointLock(),
@@ -243,6 +334,7 @@ public class KinesisDataFetcher<T> {
                        deserializationSchema,
                        shardAssigner,
                        periodicWatermarkAssigner,
+                       watermarkTracker,
                        new AtomicReference<>(),
                        new ArrayList<>(),
                        
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
@@ -258,6 +350,7 @@ public class KinesisDataFetcher<T> {
                                                                
KinesisDeserializationSchema<T> deserializationSchema,
                                                                
KinesisShardAssigner shardAssigner,
                                                                
AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+                                                               
WatermarkTracker watermarkTracker,
                                                                
AtomicReference<Throwable> error,
                                                                
List<KinesisStreamShardState> subscribedShardsState,
                                                                HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
@@ -272,6 +365,7 @@ public class KinesisDataFetcher<T> {
                this.deserializationSchema = 
checkNotNull(deserializationSchema);
                this.shardAssigner = checkNotNull(shardAssigner);
                this.periodicWatermarkAssigner = periodicWatermarkAssigner;
+               this.watermarkTracker = watermarkTracker;
                this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
                this.kinesis = kinesisProxyFactory.create(configProps);
 
@@ -284,6 +378,17 @@ public class KinesisDataFetcher<T> {
 
                this.shardConsumersExecutor =
                        
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+               this.recordEmitter = createRecordEmitter(configProps);
+       }
+
+       private RecordEmitter createRecordEmitter(Properties configProps) {
+               if (periodicWatermarkAssigner != null && watermarkTracker != 
null) {
+                       int queueCapacity = 
Integer.parseInt(configProps.getProperty(
+                               
ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY,
+                               
Integer.toString(AsyncKinesisRecordEmitter.DEFAULT_QUEUE_CAPACITY)));
+                       return new AsyncKinesisRecordEmitter(queueCapacity);
+               }
+               return new SyncKinesisRecordEmitter();
        }
 
        /**
@@ -380,16 +485,37 @@ public class KinesisDataFetcher<T> {
                                ProcessingTimeService timerService = 
((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
                                LOG.info("Starting periodic watermark emitter 
with interval {}", periodicWatermarkIntervalMillis);
                                new PeriodicWatermarkEmitter(timerService, 
periodicWatermarkIntervalMillis).start();
+                               if (watermarkTracker != null) {
+                                       // setup global watermark tracking
+                                       long watermarkSyncMillis = 
Long.parseLong(
+                                               
getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
+                                                       
Long.toString(ConsumerConfigConstants.DEFAULT_WATERMARK_SYNC_MILLIS)));
+                                       
watermarkTracker.setUpdateTimeoutMillis(watermarkSyncMillis * 3); // 
synchronization latency
+                                       watermarkTracker.open(runtimeContext);
+                                       new WatermarkSyncCallback(timerService, 
watermarkSyncMillis).start();
+                                       // emit records ahead of watermark to 
offset synchronization latency
+                                       long lookaheadMillis = Long.parseLong(
+                                               
getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS,
+                                                       Long.toString(0)));
+                                       
recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, 
watermarkSyncMillis * 3));
+                               }
                        }
                        this.shardIdleIntervalMillis = Long.parseLong(
                                
getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
                                        
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
+
+                       // run record emitter in separate thread since main 
thread is used for discovery
+                       Thread thread = new Thread(this.recordEmitter);
+                       thread.setName("recordEmitter-" + 
runtimeContext.getTaskNameWithSubtasks());
+                       thread.setDaemon(true);
+                       thread.start();
                }
 
                // 
------------------------------------------------------------------------
 
                // finally, start the infinite shard discovery and consumer 
launching loop;
                // we will escape from this loop only when shutdownFetcher() or 
stopWithError() is called
+               // TODO: have this thread emit the records for tracking 
backpressure
 
                final long discoveryIntervalMillis = Long.valueOf(
                        configProps.getProperty(
@@ -490,6 +616,11 @@ public class KinesisDataFetcher<T> {
                        mainThread.interrupt(); // the main thread may be 
sleeping for the discovery interval
                }
 
+               if (watermarkTracker != null) {
+                       watermarkTracker.close();
+               }
+               this.recordEmitter.stop();
+
                if (LOG.isInfoEnabled()) {
                        LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
                }
@@ -603,28 +734,48 @@ public class KinesisDataFetcher<T> {
         * @param lastSequenceNumber the last sequence number value to update
         */
        protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
-               // track per shard watermarks and emit timestamps extracted 
from the record,
-               // when a watermark assigner was configured.
-               if (periodicWatermarkAssigner != null) {
-                       ShardWatermarkState sws = 
shardWatermarks.get(shardStateIndex);
-                       Preconditions.checkNotNull(
-                               sws, "shard watermark state initialized in 
registerNewSubscribedShardState");
+               ShardWatermarkState sws = shardWatermarks.get(shardStateIndex);
+               Preconditions.checkNotNull(
+                       sws, "shard watermark state initialized in 
registerNewSubscribedShardState");
+               Watermark watermark = null;
+               if (sws.periodicWatermarkAssigner != null) {
                        recordTimestamp =
                                
sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp);
-                       sws.lastRecordTimestamp = recordTimestamp;
-                       sws.lastUpdated = getCurrentTimeMillis();
+                       // track watermark per record since extractTimestamp 
has side effect
+                       watermark = 
sws.periodicWatermarkAssigner.getCurrentWatermark();
                }
+               sws.lastRecordTimestamp = recordTimestamp;
+               sws.lastUpdated = getCurrentTimeMillis();
 
+               RecordWrapper<T> recordWrapper = new RecordWrapper<>(record, 
recordTimestamp);
+               recordWrapper.shardStateIndex = shardStateIndex;
+               recordWrapper.lastSequenceNumber = lastSequenceNumber;
+               recordWrapper.watermark = watermark;
+               try {
+                       sws.emitQueue.put(recordWrapper);
+               } catch (InterruptedException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       /**
+        * Actual record emission called from the record emitter.
+        *
+        * <p>Responsible for tracking per shard watermarks and emit timestamps 
extracted from
+        * the record, when a watermark assigner was configured.
+        *
+        * @param rw
+        */
+       private void emitRecordAndUpdateState(RecordWrapper<T> rw) {
                synchronized (checkpointLock) {
-                       if (record != null) {
-                               sourceContext.collectWithTimestamp(record, 
recordTimestamp);
+                       if (rw.getValue() != null) {
+                               
sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
                        } else {
                                LOG.warn("Skipping non-deserializable record at 
sequence number {} of shard {}.",
-                                       lastSequenceNumber,
-                                       
subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
+                                       rw.lastSequenceNumber,
+                                       
subscribedShardsState.get(rw.shardStateIndex).getStreamShardHandle());
                        }
-
-                       updateState(shardStateIndex, lastSequenceNumber);
+                       updateState(rw.shardStateIndex, rw.lastSequenceNumber);
                }
        }
 
@@ -689,6 +840,7 @@ public class KinesisDataFetcher<T> {
                                } catch (Exception e) {
                                        throw new RuntimeException("Failed to 
instantiate new WatermarkAssigner", e);
                                }
+                               sws.emitQueue = 
recordEmitter.getQueue(shardStateIndex);
                                sws.lastUpdated = getCurrentTimeMillis();
                                sws.lastRecordTimestamp = Long.MIN_VALUE;
                                shardWatermarks.put(shardStateIndex, sws);
@@ -721,41 +873,57 @@ public class KinesisDataFetcher<T> {
        protected void emitWatermark() {
                LOG.debug("Evaluating watermark for subtask {} time {}", 
indexOfThisConsumerSubtask, getCurrentTimeMillis());
                long potentialWatermark = Long.MAX_VALUE;
+               long potentialNextWatermark = Long.MAX_VALUE;
                long idleTime =
                        (shardIdleIntervalMillis > 0)
                                ? getCurrentTimeMillis() - 
shardIdleIntervalMillis
                                : Long.MAX_VALUE;
 
                for (Map.Entry<Integer, ShardWatermarkState> e : 
shardWatermarks.entrySet()) {
+                       Watermark w = e.getValue().lastEmittedRecordWatermark;
                        // consider only active shards, or those that would 
advance the watermark
-                       Watermark w = 
e.getValue().periodicWatermarkAssigner.getCurrentWatermark();
-                       if (w != null && (e.getValue().lastUpdated >= idleTime 
|| w.getTimestamp() > lastWatermark)) {
+                       if (w != null && (e.getValue().lastUpdated >= idleTime
+                               || e.getValue().emitQueue.getSize() > 0
+                               || w.getTimestamp() > lastWatermark)) {
                                potentialWatermark = 
Math.min(potentialWatermark, w.getTimestamp());
+                               // for sync, use the watermark of the next 
record, when available
+                               // otherwise watermark may stall when record is 
blocked by synchronization
+                               RecordEmitter.RecordQueue<RecordWrapper<T>> q = 
e.getValue().emitQueue;
+                               RecordWrapper<T> nextRecord = q.peek();
+                               Watermark nextWatermark = (nextRecord != null) 
? nextRecord.watermark : w;
+                               potentialNextWatermark = 
Math.min(potentialNextWatermark, nextWatermark.getTimestamp());
                        }
                }
 
                // advance watermark if possible (watermarks can only be 
ascending)
                if (potentialWatermark == Long.MAX_VALUE) {
                        if (shardWatermarks.isEmpty() || 
shardIdleIntervalMillis > 0) {
-                               LOG.debug("No active shard for subtask {}, 
marking the source idle.",
+                               LOG.info("No active shard for subtask {}, 
marking the source idle.",
                                        indexOfThisConsumerSubtask);
                                // no active shard, signal downstream operators 
to not wait for a watermark
                                sourceContext.markAsTemporarilyIdle();
+                               isIdle = true;
                        }
-               } else if (potentialWatermark > lastWatermark) {
-                       LOG.debug("Emitting watermark {} from subtask {}",
-                               potentialWatermark,
-                               indexOfThisConsumerSubtask);
-                       sourceContext.emitWatermark(new 
Watermark(potentialWatermark));
-                       lastWatermark = potentialWatermark;
+               } else {
+                       if (potentialWatermark > lastWatermark) {
+                               LOG.debug("Emitting watermark {} from subtask 
{}",
+                                       potentialWatermark,
+                                       indexOfThisConsumerSubtask);
+                               sourceContext.emitWatermark(new 
Watermark(potentialWatermark));
+                               lastWatermark = potentialWatermark;
+                               isIdle = false;
+                       }
+                       nextWatermark = potentialNextWatermark;
                }
        }
 
        /** Per shard tracking of watermark and last activity. */
        private static class ShardWatermarkState<T> {
                private AssignerWithPeriodicWatermarks<T> 
periodicWatermarkAssigner;
+               private RecordEmitter.RecordQueue<RecordWrapper<T>> emitQueue;
                private volatile long lastRecordTimestamp;
                private volatile long lastUpdated;
+               private volatile Watermark lastEmittedRecordWatermark;
        }
 
        /**
@@ -785,6 +953,82 @@ public class KinesisDataFetcher<T> {
                }
        }
 
+       /** Timer task to update shared watermark state. */
+       private class WatermarkSyncCallback implements ProcessingTimeCallback {
+
+               private final ProcessingTimeService timerService;
+               private final long interval;
+               private final MetricGroup shardMetricsGroup;
+               private long lastGlobalWatermark = Long.MIN_VALUE;
+               private long propagatedLocalWatermark = Long.MIN_VALUE;
+               private long logIntervalMillis = 60_000;
+               private int stalledWatermarkIntervalCount = 0;
+               private long lastLogged;
+
+               WatermarkSyncCallback(ProcessingTimeService timerService, long 
interval) {
+                       this.timerService = checkNotNull(timerService);
+                       this.interval = interval;
+                       this.shardMetricsGroup = 
consumerMetricGroup.addGroup("subtaskId",
+                               String.valueOf(indexOfThisConsumerSubtask));
+                       this.shardMetricsGroup.gauge("localWatermark", () -> 
nextWatermark);
+                       this.shardMetricsGroup.gauge("globalWatermark", () -> 
lastGlobalWatermark);
+               }
+
+               public void start() {
+                       LOG.info("Registering watermark tracker with interval 
{}", interval);
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
+               }
+
+               @Override
+               public void onProcessingTime(long timestamp) {
+                       if (nextWatermark != Long.MIN_VALUE) {
+                               long globalWatermark = lastGlobalWatermark;
+                               // TODO: refresh watermark while idle
+                               if (!(isIdle && nextWatermark == 
propagatedLocalWatermark)) {
+                                       globalWatermark = 
watermarkTracker.updateWatermark(nextWatermark);
+                                       propagatedLocalWatermark = 
nextWatermark;
+                               } else {
+                                       LOG.info("WatermarkSyncCallback 
subtask: {} is idle", indexOfThisConsumerSubtask);
+                               }
+
+                               if (timestamp - lastLogged > logIntervalMillis) 
{
+                                       lastLogged = System.currentTimeMillis();
+                                       LOG.info("WatermarkSyncCallback 
subtask: {} local watermark: {}"
+                                                       + ", global watermark: 
{}, delta: {} timeouts: {}, emitter: {}",
+                                               indexOfThisConsumerSubtask,
+                                               nextWatermark,
+                                               globalWatermark,
+                                               nextWatermark - globalWatermark,
+                                               
watermarkTracker.getUpdateTimeoutCount(),
+                                               recordEmitter.printInfo());
+
+                                       // Following is for debugging 
non-reproducible issue with stalled watermark
+                                       if (globalWatermark == nextWatermark && 
globalWatermark == lastGlobalWatermark
+                                               && 
stalledWatermarkIntervalCount++ > 5) {
+                                               // subtask blocks watermark, 
log to aid troubleshooting
+                                               stalledWatermarkIntervalCount = 
0;
+                                               for (Map.Entry<Integer, 
ShardWatermarkState> e : shardWatermarks.entrySet()) {
+                                                       
RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue;
+                                                       RecordWrapper<T> 
nextRecord = q.peek();
+                                                       if (nextRecord != null) 
{
+                                                               
LOG.info("stalled watermark {} key {} next watermark {} next timestamp {}",
+                                                                       
nextWatermark,
+                                                                       
e.getKey(),
+                                                                       
nextRecord.watermark,
+                                                                       
nextRecord.timestamp);
+                                                       }
+                                               }
+                                       }
+                               }
+
+                               lastGlobalWatermark = globalWatermark;
+                               
recordEmitter.setCurrentWatermark(globalWatermark);
+                       }
+                       // schedule next callback
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
+               }
+       }
+
        /**
         * Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
         *
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
new file mode 100644
index 0000000..f150bb0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link WatermarkTracker} that shares state through {@link 
GlobalAggregateManager}.
+ */
+@PublicEvolving
+public class JobManagerWatermarkTracker extends WatermarkTracker {
+
+       private GlobalAggregateManager aggregateManager;
+       private final String aggregateName;
+       private final WatermarkAggregateFunction aggregateFunction = new 
WatermarkAggregateFunction();
+       private final long logAccumulatorIntervalMillis;
+       private long updateTimeoutCount;
+
+       public JobManagerWatermarkTracker(String aggregateName) {
+               this(aggregateName, -1);
+       }
+
+       public JobManagerWatermarkTracker(String aggregateName, long 
logAccumulatorIntervalMillis) {
+               super();
+               this.aggregateName = aggregateName;
+               this.logAccumulatorIntervalMillis = 
logAccumulatorIntervalMillis;
+       }
+
+       @Override
+       public long updateWatermark(long localWatermark) {
+               WatermarkUpdate update = new WatermarkUpdate();
+               update.id = getSubtaskId();
+               update.watermark = localWatermark;
+               try {
+                       byte[] resultBytes = 
aggregateManager.updateGlobalAggregate(aggregateName,
+                               InstantiationUtil.serializeObject(update), 
aggregateFunction);
+                       WatermarkResult result = 
InstantiationUtil.deserializeObject(resultBytes,
+                               this.getClass().getClassLoader());
+                       this.updateTimeoutCount += result.updateTimeoutCount;
+                       return result.watermark;
+               } catch (ClassNotFoundException | IOException ex) {
+                       throw new RuntimeException(ex);
+               }
+       }
+
+       @Override
+       public void open(RuntimeContext context) {
+               super.open(context);
+               this.aggregateFunction.updateTimeoutMillis = 
super.getUpdateTimeoutMillis();
+               this.aggregateFunction.logAccumulatorIntervalMillis = 
logAccumulatorIntervalMillis;
+               Preconditions.checkArgument(context instanceof 
StreamingRuntimeContext);
+               StreamingRuntimeContext runtimeContext = 
(StreamingRuntimeContext) context;
+               this.aggregateManager = 
runtimeContext.getGlobalAggregateManager();
+       }
+
+       public long getUpdateTimeoutCount() {
+               return updateTimeoutCount;
+       }
+
+       /** Watermark aggregation input. */
+       protected static class WatermarkUpdate implements Serializable {
+               protected long watermark = Long.MIN_VALUE;
+               protected String id;
+       }
+
+       /** Watermark aggregation result. */
+       protected static class WatermarkResult implements Serializable {
+               protected long watermark = Long.MIN_VALUE;
+               protected long updateTimeoutCount = 0;
+       }
+
+       /**
+        * Aggregate function for computing a combined watermark of parallel 
subtasks.
+        */
+       private static class WatermarkAggregateFunction implements
+               AggregateFunction<byte[], Map<String, WatermarkState>, byte[]> {
+
+               private long updateTimeoutMillis = 
DEFAULT_UPDATE_TIMEOUT_MILLIS;
+               private long logAccumulatorIntervalMillis = -1;
+
+               // TODO: wrap accumulator
+               static long addCount;
+               static long lastLogged;
+               private static final Logger LOG = 
LoggerFactory.getLogger(WatermarkAggregateFunction.class);
+
+               @Override
+               public Map<String, WatermarkState> createAccumulator() {
+                       return new HashMap<>();
+               }
+
+               @Override
+               public Map<String, WatermarkState> add(byte[] valueBytes, 
Map<String, WatermarkState> accumulator) {
+                       addCount++;
+                       final WatermarkUpdate value;
+                       try {
+                               value = 
InstantiationUtil.deserializeObject(valueBytes, 
this.getClass().getClassLoader());
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+                       WatermarkState ws = accumulator.get(value.id);
+                       if (ws == null) {
+                               accumulator.put(value.id, ws = new 
WatermarkState());
+                       }
+                       ws.watermark = value.watermark;
+                       ws.lastUpdated = System.currentTimeMillis();
+                       return accumulator;
+               }
+
+               @Override
+               public byte[] getResult(Map<String, WatermarkState> 
accumulator) {
+                       long updateTimeoutCount = 0;
+                       long currentTime = System.currentTimeMillis();
+                       long globalWatermark = Long.MAX_VALUE;
+                       for (Map.Entry<String, WatermarkState> e : 
accumulator.entrySet()) {
+                               WatermarkState ws = e.getValue();
+                               if (ws.lastUpdated + updateTimeoutMillis < 
currentTime) {
+                                       // ignore outdated entry
+                                       updateTimeoutCount++;
+                                       continue;
+                               }
+                               globalWatermark = Math.min(ws.watermark, 
globalWatermark);
+                       }
+
+                       WatermarkResult result = new WatermarkResult();
+                       result.watermark = globalWatermark == Long.MAX_VALUE ? 
Long.MIN_VALUE : globalWatermark;
+                       result.updateTimeoutCount = updateTimeoutCount;
+
+                       if (logAccumulatorIntervalMillis > 0) {
+                               if (currentTime - lastLogged > 
logAccumulatorIntervalMillis) {
+                                       lastLogged = System.currentTimeMillis();
+                                       LOG.info("WatermarkAggregateFunction 
added: {}, timeout: {}, map: {}",
+                                               addCount, updateTimeoutCount, 
accumulator);
+                               }
+                       }
+
+                       try {
+                               return 
InstantiationUtil.serializeObject(result);
+                       } catch (IOException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public Map<String, WatermarkState> merge(Map<String, 
WatermarkState> accumulatorA, Map<String, WatermarkState> accumulatorB) {
+                       // not required
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
new file mode 100644
index 0000000..17344b1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * <p>Records are organized into per producer queues that will block when 
capacity is exhausted.
+ *
+ * <p>Records are emitted by selecting the oldest available element of all 
producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus 
allowed lookahead interval.
+ *
+ * @param <T>
+ */
+@Internal
+public abstract class RecordEmitter<T extends TimestampedValue> implements 
Runnable {
+       private static final Logger LOG = 
LoggerFactory.getLogger(RecordEmitter.class);
+
+       /**
+        * The default capacity of a single queue.
+        *
+        * <p>Larger queue size can lead to higher throughput, but also to
+        * very high heap space consumption, depending on the size of elements.
+        *
+        * <p>Note that this is difficult to tune, because it does not take 
into account
+        * the size of individual objects.
+        */
+       public static final int DEFAULT_QUEUE_CAPACITY = 100;
+
+       private final int queueCapacity;
+       private final ConcurrentHashMap<Integer, AsyncRecordQueue<T>> queues = 
new ConcurrentHashMap<>();
+       private final ConcurrentHashMap<AsyncRecordQueue<T>, Boolean> 
emptyQueues = new ConcurrentHashMap<>();
+       private final PriorityQueue<AsyncRecordQueue<T>> heads = new 
PriorityQueue<>(this::compareHeadElement);
+       private volatile boolean running = true;
+       private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+       private long maxLookaheadMillis = 60 * 1000; // one minute
+       private long idleSleepMillis = 100;
+       private final Object condition = new Object();
+
+       public RecordEmitter(int queueCapacity) {
+               this.queueCapacity = queueCapacity;
+       }
+
+       private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue 
right) {
+               return Long.compare(left.headTimestamp, right.headTimestamp);
+       }
+
+       /**
+        * Accepts records from readers.
+        *
+        * @param <T>
+        */
+       public interface RecordQueue<T> {
+               void put(T record) throws InterruptedException;
+
+               int getQueueId();
+
+               int getSize();
+
+               T peek();
+       }
+
+       private class AsyncRecordQueue<T> implements RecordQueue<T> {
+               private final ArrayBlockingQueue<T> queue;
+               private final int queueId;
+               long headTimestamp;
+
+               private AsyncRecordQueue(int queueId) {
+                       super();
+                       this.queue = new ArrayBlockingQueue<>(queueCapacity);
+                       this.queueId = queueId;
+                       this.headTimestamp = Long.MAX_VALUE;
+               }
+
+               public void put(T record) throws InterruptedException {
+                       queue.put(record);
+                       synchronized (condition) {
+                               condition.notify();
+                       }
+               }
+
+               public int getQueueId() {
+                       return queueId;
+               }
+
+               public int getSize() {
+                       return queue.size();
+               }
+
+               public T peek() {
+                       return queue.peek();
+               }
+
+       }
+
+       /**
+        * The queue for the given producer (i.e. Kinesis shard consumer 
thread).
+        *
+        * <p>The producer may hold on to the queue for subsequent records.
+        *
+        * @param producerIndex
+        * @return
+        */
+       public RecordQueue<T> getQueue(int producerIndex) {
+               return queues.computeIfAbsent(producerIndex, (key) -> {
+                       AsyncRecordQueue<T> q = new 
AsyncRecordQueue<>(producerIndex);
+                       emptyQueues.put(q, false);
+                       return q;
+               });
+       }
+
+       /**
+        * How far ahead of the watermark records should be emitted.
+        *
+        * <p>Needs to account for the latency of obtaining the global 
watermark.
+        *
+        * @param maxLookaheadMillis
+        */
+       public void setMaxLookaheadMillis(long maxLookaheadMillis) {
+               this.maxLookaheadMillis = maxLookaheadMillis;
+               LOG.info("[setMaxLookaheadMillis] Max lookahead millis set to 
{}", maxLookaheadMillis);
+       }
+
+       /**
+        * Set the current watermark.
+        *
+        * <p>This watermark will be used to control which records to emit from 
the queues of pending
+        * elements. When an element timestamp is ahead of the watermark by at 
least {@link
+        * #maxLookaheadMillis}, elements in that queue will wait until the 
watermark advances.
+        *
+        * @param watermark
+        */
+       public void setCurrentWatermark(long watermark) {
+               maxEmitTimestamp = watermark + maxLookaheadMillis;
+               synchronized (condition) {
+                       condition.notify();
+               }
+               LOG.info(
+                       "[setCurrentWatermark] Current watermark set to {}, 
maxEmitTimestamp = {}",
+                       watermark,
+                       maxEmitTimestamp);
+       }
+
+       /**
+        * Run loop, does not return unless {@link #stop()} was called.
+        */
+       @Override
+       public void run() {
+               LOG.info("Starting emitter with maxLookaheadMillis: {}", 
this.maxLookaheadMillis);
+
+               // emit available records, ordered by timestamp
+               AsyncRecordQueue<T> min = heads.poll();
+               runLoop:
+               while (running) {
+                       // find a queue to emit from
+                       while (min == null) {
+                               // check new or previously empty queues
+                               if (!emptyQueues.isEmpty()) {
+                                       for (AsyncRecordQueue<T> queueHead : 
emptyQueues.keySet()) {
+                                               if (!queueHead.queue.isEmpty()) 
{
+                                                       
emptyQueues.remove(queueHead);
+                                                       queueHead.headTimestamp 
= queueHead.queue.peek().getTimestamp();
+                                                       heads.offer(queueHead);
+                                               }
+                                       }
+                               }
+                               min = heads.poll();
+                               if (min == null) {
+                                       synchronized (condition) {
+                                               // wait for work
+                                               try {
+                                                       
condition.wait(idleSleepMillis);
+                                               } catch (InterruptedException 
e) {
+                                                       continue runLoop;
+                                               }
+                                       }
+                               }
+                       }
+
+                       // wait until ready to emit min or another queue 
receives elements
+                       while (min.headTimestamp > maxEmitTimestamp) {
+                               synchronized (condition) {
+                                       // wait until ready to emit
+                                       try {
+                                               condition.wait(idleSleepMillis);
+                                       } catch (InterruptedException e) {
+                                               continue runLoop;
+                                       }
+                                       if (min.headTimestamp > 
maxEmitTimestamp && !emptyQueues.isEmpty()) {
+                                               // see if another queue can 
make progress
+                                               heads.offer(min);
+                                               min = null;
+                                               continue runLoop;
+                                       }
+                               }
+                       }
+
+                       // emit up to queue capacity records
+                       // cap on empty queues since older records may arrive
+                       AsyncRecordQueue<T> nextQueue = heads.poll();
+                       T record;
+                       int emitCount = 0;
+                       while ((record = min.queue.poll()) != null) {
+                               emit(record, min);
+                               // track last record emitted, even when queue 
becomes empty
+                               min.headTimestamp = record.getTimestamp();
+                               // potentially switch to next queue
+                               if ((nextQueue != null && min.headTimestamp > 
nextQueue.headTimestamp)
+                                       || (min.headTimestamp > 
maxEmitTimestamp)
+                                       || (emitCount++ > queueCapacity && 
!emptyQueues.isEmpty())) {
+                                       break;
+                               }
+                       }
+                       if (record == null) {
+                               this.emptyQueues.put(min, true);
+                       } else {
+                               heads.offer(min);
+                       }
+                       min = nextQueue;
+               }
+       }
+
+       public void stop() {
+               running = false;
+       }
+
+       /** Emit the record. This is specific to a connector, like the Kinesis 
data fetcher. */
+       public abstract void emit(T record, RecordQueue<T> source);
+
+       /** Summary of emit queues that can be used for logging. */
+       public String printInfo() {
+               StringBuffer sb = new StringBuffer();
+               sb.append(String.format("queues: %d, empty: %d",
+                       queues.size(), emptyQueues.size()));
+               for (Map.Entry<Integer, AsyncRecordQueue<T>> e : 
queues.entrySet()) {
+                       AsyncRecordQueue q = e.getValue();
+                       sb.append(String.format("\n%d timestamp: %s size: %d",
+                               e.getValue().queueId, q.headTimestamp, 
q.queue.size()));
+               }
+               return sb.toString();
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java
new file mode 100644
index 0000000..f4207c7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+/**
+ * The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+ * <p/>It can be used for sub tasks of a single Flink source as well as 
multiple heterogeneous
+ * sources or other operators.
+ * <p/>The class essentially functions like a distributed hash table that 
enclosing operators can
+ * use to adopt their processing / IO rates.
+ */
+@PublicEvolving
+public abstract class WatermarkTracker implements Closeable, Serializable {
+
+       public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;
+
+       /**
+        * Subtasks that have not provided a watermark update within the 
configured interval will be
+        * considered idle and excluded from target watermark calculation.
+        */
+       private long updateTimeoutMillis = DEFAULT_UPDATE_TIMEOUT_MILLIS;
+
+       /**
+        * Unique id for the subtask.
+        * Using string (instead of subtask index) so synchronization can spawn 
across multiple sources.
+        */
+       private String subtaskId;
+
+       /** Watermark state. */
+       protected static class WatermarkState {
+               protected long watermark = Long.MIN_VALUE;
+               protected long lastUpdated;
+
+               public long getWatermark() {
+                       return watermark;
+               }
+
+               @Override
+               public String toString() {
+                       return "WatermarkState{watermark=" + watermark + ", 
lastUpdated=" + lastUpdated + '}';
+               }
+       }
+
+       protected String getSubtaskId() {
+               return this.subtaskId;
+       }
+
+       protected long getUpdateTimeoutMillis() {
+               return this.updateTimeoutMillis;
+       }
+
+       public abstract long getUpdateTimeoutCount();
+
+       /**
+        * Subtasks that have not provided a watermark update within the 
configured interval will be
+        * considered idle and excluded from target watermark calculation.
+        *
+        * @param updateTimeoutMillis
+        */
+       public void setUpdateTimeoutMillis(long updateTimeoutMillis) {
+               this.updateTimeoutMillis = updateTimeoutMillis;
+       }
+
+       /**
+        * Set the current watermark of the owning subtask and return the 
global low watermark based on
+        * the current state snapshot. Periodically called by the enclosing 
consumer instance, which is
+        * responsible for any timer management etc.
+        *
+        * @param localWatermark
+        * @return
+        */
+       public abstract long updateWatermark(final long localWatermark);
+
+       protected long getCurrentTime() {
+               return System.currentTimeMillis();
+       }
+
+       public void open(RuntimeContext context) {
+               if (context instanceof StreamingRuntimeContext) {
+                       this.subtaskId = ((StreamingRuntimeContext) 
context).getOperatorUniqueID()
+                               + "-" + context.getIndexOfThisSubtask();
+               } else {
+                       this.subtaskId = context.getTaskNameWithSubtasks();
+               }
+       }
+
+       @Override
+       public void close() {
+               // no work to do here
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index b38eef1..1ce05d1 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -418,7 +418,7 @@ public class FlinkKinesisConsumerMigrationTest {
                                HashMap<StreamShardMetadata, SequenceNumber> 
testStateSnapshot,
                                List<StreamShardHandle> 
testInitialDiscoveryShards) {
 
-                       super(streams, sourceContext, runtimeContext, 
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null);
+                       super(streams, sourceContext, runtimeContext, 
configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null, null);
 
                        this.testStateSnapshot = testStateSnapshot;
                        this.testInitialDiscoveryShards = 
testInitialDiscoveryShards;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d36d68a..cbcd8b4 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.CollectingSourceContext;
 
@@ -60,6 +61,7 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -79,6 +81,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -737,6 +740,7 @@ public class FlinkKinesisConsumerTest {
                                                        deserializationSchema,
                                                        getShardAssigner(),
                                                        
getPeriodicWatermarkAssigner(),
+                                                       null,
                                                        new AtomicReference<>(),
                                                        new ArrayList<>(),
                                                        
subscribedStreamsToLastDiscoveredShardIds,
@@ -775,6 +779,10 @@ public class FlinkKinesisConsumerTest {
                        public void emitWatermark(Watermark mark) {
                                watermarks.add(mark);
                        }
+
+                       @Override
+                       public void markAsTemporarilyIdle() {
+                       }
                };
 
                new Thread(
@@ -817,6 +825,164 @@ public class FlinkKinesisConsumerTest {
                assertThat(watermarks, org.hamcrest.Matchers.contains(new 
Watermark(-3), new Watermark(5)));
        }
 
+       @Test
+       public void testSourceSynchronization() throws Exception {
+
+               final String streamName = "fakeStreamName";
+               final Time maxOutOfOrderness = Time.milliseconds(5);
+               final long autoWatermarkInterval = 1_000;
+               final long watermarkSyncInterval = autoWatermarkInterval + 1;
+
+               HashMap<String, String> 
subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
+               subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);
+
+               final KinesisDeserializationSchema<String> 
deserializationSchema =
+                       new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema());
+               Properties props = new Properties();
+               props.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, 
Long.toString(10L));
+               props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
+                       Long.toString(watermarkSyncInterval));
+               
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, 
Long.toString(5));
+
+               BlockingQueue<String> shard1 = new LinkedBlockingQueue();
+               BlockingQueue<String> shard2 = new LinkedBlockingQueue();
+
+               Map<String, List<BlockingQueue<String>>> streamToQueueMap = new 
HashMap<>();
+               streamToQueueMap.put(streamName, Lists.newArrayList(shard1, 
shard2));
+
+               // override createFetcher to mock Kinesis
+               FlinkKinesisConsumer<String> sourceFunc =
+                       new FlinkKinesisConsumer<String>(streamName, 
deserializationSchema, props) {
+                               @Override
+                               protected KinesisDataFetcher<String> 
createFetcher(
+                                       List<String> streams,
+                                       SourceFunction.SourceContext<String> 
sourceContext,
+                                       RuntimeContext runtimeContext,
+                                       Properties configProps,
+                                       KinesisDeserializationSchema<String> 
deserializationSchema) {
+
+                                       KinesisDataFetcher<String> fetcher =
+                                               new KinesisDataFetcher<String>(
+                                                       streams,
+                                                       sourceContext,
+                                                       
sourceContext.getCheckpointLock(),
+                                                       runtimeContext,
+                                                       configProps,
+                                                       deserializationSchema,
+                                                       getShardAssigner(),
+                                                       
getPeriodicWatermarkAssigner(),
+                                                       getWatermarkTracker(),
+                                                       new AtomicReference<>(),
+                                                       new ArrayList<>(),
+                                                       
subscribedStreamsToLastDiscoveredShardIds,
+                                                       (props) -> 
FakeKinesisBehavioursFactory.blockingQueueGetRecords(
+                                                               
streamToQueueMap)
+                                               ) {};
+                                       return fetcher;
+                               }
+                       };
+
+               sourceFunc.setShardAssigner(
+                       (streamShardHandle, i) -> {
+                               // shardId-000000000000
+                               return Integer.parseInt(
+                                       
streamShardHandle.getShard().getShardId().substring("shardId-".length()));
+                       });
+
+               sourceFunc.setPeriodicWatermarkAssigner(new 
TestTimestampExtractor(maxOutOfOrderness));
+
+               sourceFunc.setWatermarkTracker(new TestWatermarkTracker());
+
+               // there is currently no test harness specifically for sources,
+               // so we overlay the source thread here
+               AbstractStreamOperatorTestHarness<Object> testHarness =
+                       new AbstractStreamOperatorTestHarness<Object>(
+                               new StreamSource(sourceFunc), 1, 1, 0);
+               testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+               
testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
+
+               testHarness.initializeEmptyState();
+               testHarness.open();
+
+               final ConcurrentLinkedQueue<Object> results = 
testHarness.getOutput();
+
+               @SuppressWarnings("unchecked")
+               SourceFunction.SourceContext<String> sourceContext = new 
CollectingSourceContext(
+                       testHarness.getCheckpointLock(), results) {
+                       @Override
+                       public void markAsTemporarilyIdle() {
+                       }
+
+                       @Override
+                       public void emitWatermark(Watermark mark) {
+                               results.add(mark);
+                       }
+               };
+
+               new Thread(
+                       () -> {
+                               try {
+                                       sourceFunc.run(sourceContext);
+                               } catch (InterruptedException e) {
+                                       // expected on cancel
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       })
+                       .start();
+
+               ArrayList<Object> expectedResults = new ArrayList<>();
+
+               final long record1 = 1;
+               shard1.put(Long.toString(record1));
+               expectedResults.add(Long.toString(record1));
+               awaitRecordCount(results, expectedResults.size());
+
+               // at this point we know the fetcher was initialized
+               final KinesisDataFetcher fetcher = 
org.powermock.reflect.Whitebox.getInternalState(sourceFunc, "fetcher");
+
+               // trigger watermark emit
+               testHarness.setProcessingTime(testHarness.getProcessingTime() + 
autoWatermarkInterval);
+               expectedResults.add(new Watermark(-4));
+               // verify watermark
+               awaitRecordCount(results, expectedResults.size());
+               assertThat(results, 
org.hamcrest.Matchers.contains(expectedResults.toArray()));
+               assertEquals(0, TestWatermarkTracker.WATERMARK.get());
+
+               // trigger sync
+               testHarness.setProcessingTime(testHarness.getProcessingTime() + 
1);
+               TestWatermarkTracker.assertSingleWatermark(-4);
+
+               final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
+               shard1.put(Long.toString(record2));
+
+               // TODO: check for record received instead
+               Thread.sleep(100);
+
+               // Advance the watermark. Since the new record is past global 
watermark + threshold,
+               // it won't be emitted and the watermark does not advance
+               testHarness.setProcessingTime(testHarness.getProcessingTime() + 
autoWatermarkInterval);
+               assertThat(results, 
org.hamcrest.Matchers.contains(expectedResults.toArray()));
+               assertEquals(3000L, (long) 
org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
+               TestWatermarkTracker.assertSingleWatermark(-4);
+
+               // Trigger global watermark sync
+               testHarness.setProcessingTime(testHarness.getProcessingTime() + 
1);
+               expectedResults.add(Long.toString(record2));
+               awaitRecordCount(results, expectedResults.size());
+               assertThat(results, 
org.hamcrest.Matchers.contains(expectedResults.toArray()));
+               TestWatermarkTracker.assertSingleWatermark(3000);
+
+               // Trigger watermark update and emit
+               testHarness.setProcessingTime(testHarness.getProcessingTime() + 
autoWatermarkInterval);
+               expectedResults.add(new Watermark(3000));
+               assertThat(results, 
org.hamcrest.Matchers.contains(expectedResults.toArray()));
+
+               sourceFunc.cancel();
+               testHarness.close();
+       }
+
        private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> 
queue, int count) throws Exception {
                long timeoutMillis = System.currentTimeMillis() + 10_000;
                while (System.currentTimeMillis() < timeoutMillis && 
queue.size() < count) {
@@ -837,4 +1003,23 @@ public class FlinkKinesisConsumerTest {
                }
        }
 
+       private static class TestWatermarkTracker extends WatermarkTracker {
+
+               private static final AtomicLong WATERMARK = new AtomicLong();
+
+               @Override
+               public long getUpdateTimeoutCount() {
+                       return 0;
+               }
+
+               @Override
+               public long updateWatermark(long localWatermark) {
+                       WATERMARK.set(localWatermark);
+                       return localWatermark;
+               }
+
+               static void assertSingleWatermark(long expected) {
+                       Assert.assertEquals(expected, WATERMARK.get());
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index dbc7118..93886f9 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -113,9 +113,10 @@ public class ShardConsumerTest {
                                
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
                                Mockito.mock(KinesisProxyInterface.class));
 
+               int shardIndex = 
fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
                new ShardConsumer<>(
                        fetcher,
-                       0,
+                       shardIndex,
                        
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
                        
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
                        
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 
9, 500L),
@@ -151,9 +152,10 @@ public class ShardConsumerTest {
                                
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
                                Mockito.mock(KinesisProxyInterface.class));
 
+               int shardIndex = 
fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
                new ShardConsumer<>(
                        fetcher,
-                       0,
+                       shardIndex,
                        
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
                        
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
                        // Get a total of 1000 records with 9 getRecords() 
calls,
@@ -195,9 +197,10 @@ public class ShardConsumerTest {
                                
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
                                Mockito.mock(KinesisProxyInterface.class));
 
+               int shardIndex = 
fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
                new ShardConsumer<>(
                        fetcher,
-                       0,
+                       shardIndex,
                        
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
                        
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
                        // Initial number of records to fetch --> 10
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index f1fd069..3bb11bd 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -74,6 +74,7 @@ public class TestableKinesisDataFetcher<T> extends 
KinesisDataFetcher<T> {
                        deserializationSchema,
                        DEFAULT_SHARD_ASSIGNER,
                        null,
+                       null,
                        thrownErrorUnderTest,
                        subscribedShardsStateUnderTest,
                        subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
new file mode 100644
index 0000000..b793b54
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Test for {@link JobManagerWatermarkTracker}. */
+public class JobManagerWatermarkTrackerTest {
+
+       private static MiniCluster flink;
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               final Configuration config = new Configuration();
+               config.setInteger(RestOptions.PORT, 0);
+
+               final MiniClusterConfiguration miniClusterConfiguration = new 
MiniClusterConfiguration.Builder()
+                       .setConfiguration(config)
+                       .setNumTaskManagers(1)
+                       .setNumSlotsPerTaskManager(1)
+                       .build();
+
+               flink = new MiniCluster(miniClusterConfiguration);
+
+               flink.start();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (flink != null) {
+                       flink.close();
+               }
+       }
+
+       @Test
+       public void testUpateWatermark() throws Exception {
+               final Configuration clientConfiguration = new Configuration();
+               clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 
0);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                       flink.getRestAddress().get().getHost(),
+                       flink.getRestAddress().get().getPort(),
+                       clientConfiguration);
+
+               env.addSource(new TestSourceFunction(new 
JobManagerWatermarkTracker("fakeId")))
+                       .addSink(new SinkFunction<Integer>() {});
+               env.execute();
+       }
+
+       private static class TestSourceFunction extends 
RichSourceFunction<Integer> {
+
+               private final JobManagerWatermarkTracker tracker;
+
+               public TestSourceFunction(JobManagerWatermarkTracker tracker) {
+                       this.tracker = tracker;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       tracker.open(getRuntimeContext());
+               }
+
+               @Override
+               public void run(SourceContext<Integer> ctx) {
+                       Assert.assertEquals(998, tracker.updateWatermark(998));
+                       Assert.assertEquals(999, tracker.updateWatermark(999));
+               }
+
+               @Override
+               public void cancel() {
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
new file mode 100644
index 0000000..1948237
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Test for {@link RecordEmitter}. */
+public class RecordEmitterTest {
+
+       static List<TimestampedValue> results = 
Collections.synchronizedList(new ArrayList<>());
+
+       private class TestRecordEmitter extends RecordEmitter<TimestampedValue> 
{
+
+               private TestRecordEmitter() {
+                       super(DEFAULT_QUEUE_CAPACITY);
+               }
+
+               @Override
+               public void emit(TimestampedValue record, 
RecordQueue<TimestampedValue> queue) {
+                       results.add(record);
+               }
+       }
+
+       @Test
+       public void test() throws Exception {
+
+               TestRecordEmitter emitter = new TestRecordEmitter();
+
+               final TimestampedValue<String> one = new 
TimestampedValue<>("one", 1);
+               final TimestampedValue<String> two = new 
TimestampedValue<>("two", 2);
+               final TimestampedValue<String> five = new 
TimestampedValue<>("five", 5);
+               final TimestampedValue<String> ten = new 
TimestampedValue<>("ten", 10);
+
+               final RecordEmitter.RecordQueue<TimestampedValue> queue0 = 
emitter.getQueue(0);
+               final RecordEmitter.RecordQueue<TimestampedValue> queue1 = 
emitter.getQueue(1);
+
+               queue0.put(one);
+               queue0.put(five);
+               queue0.put(ten);
+
+               queue1.put(two);
+
+               ExecutorService executor = Executors.newSingleThreadExecutor();
+               executor.submit(emitter);
+
+               long timeout = System.currentTimeMillis() + 10_000;
+               while (results.size() != 4 && System.currentTimeMillis() < 
timeout) {
+                       Thread.sleep(100);
+               }
+               emitter.stop();
+               executor.shutdownNow();
+
+               Assert.assertThat(results, Matchers.contains(one, five, two, 
ten));
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java
new file mode 100644
index 0000000..3d59a45
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test for {@link WatermarkTracker}. */
+public class WatermarkTrackerTest {
+
+       WatermarkTracker.WatermarkState wm1 = new 
WatermarkTracker.WatermarkState();
+       MutableLong clock = new MutableLong(0);
+
+       private class TestWatermarkTracker extends WatermarkTracker {
+               /**
+                * The watermarks of all sub tasks that participate in the 
synchronization.
+                */
+               private final Map<String, WatermarkState> watermarks = new 
HashMap<>();
+
+               private long updateTimeoutCount = 0;
+
+               @Override
+               protected long getCurrentTime() {
+                       return clock.longValue();
+               }
+
+               @Override
+               public long updateWatermark(final long localWatermark) {
+                       refreshWatermarkSnapshot(this.watermarks);
+
+                       long currentTime = getCurrentTime();
+                       String subtaskId = this.getSubtaskId();
+
+                       WatermarkState ws = watermarks.get(subtaskId);
+                       if (ws == null) {
+                               watermarks.put(subtaskId, ws = new 
WatermarkState());
+                       }
+                       ws.lastUpdated = currentTime;
+                       ws.watermark = Math.max(ws.watermark, localWatermark);
+                       saveWatermark(subtaskId, ws);
+
+                       long globalWatermark = ws.watermark;
+                       for (Map.Entry<String, WatermarkState> e : 
watermarks.entrySet()) {
+                               ws = e.getValue();
+                               if (ws.lastUpdated + getUpdateTimeoutMillis() < 
currentTime) {
+                                       // ignore outdated subtask
+                                       updateTimeoutCount++;
+                                       continue;
+                               }
+                               globalWatermark = Math.min(ws.watermark, 
globalWatermark);
+                       }
+                       return globalWatermark;
+               }
+
+               protected void refreshWatermarkSnapshot(Map<String, 
WatermarkState> watermarks) {
+                       watermarks.put("wm1", wm1);
+               }
+
+               protected void saveWatermark(String id, WatermarkState ws) {
+                       // do nothing
+               }
+
+               public long getUpdateTimeoutCount() {
+                       return updateTimeoutCount;
+               }
+       }
+
+       @Test
+       public void test() {
+               long watermark = 0;
+               TestWatermarkTracker ws = new TestWatermarkTracker();
+               ws.open(new MockStreamingRuntimeContext(false, 1, 0));
+               Assert.assertEquals(Long.MIN_VALUE, 
ws.updateWatermark(Long.MIN_VALUE));
+               Assert.assertEquals(Long.MIN_VALUE, 
ws.updateWatermark(watermark));
+               // timeout wm1
+               clock.add(WatermarkTracker.DEFAULT_UPDATE_TIMEOUT_MILLIS + 1);
+               Assert.assertEquals(watermark, ws.updateWatermark(watermark));
+               Assert.assertEquals(watermark, ws.updateWatermark(watermark - 
1));
+
+               // min watermark
+               wm1.watermark = watermark + 1;
+               wm1.lastUpdated = clock.longValue();
+               Assert.assertEquals(watermark, ws.updateWatermark(watermark));
+               Assert.assertEquals(watermark + 1, ws.updateWatermark(watermark 
+ 1));
+       }
+
+}

Reply via email to