This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 793a784 [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer 793a784 is described below commit 793a78407aa22530448efbf18b714952eac40aba Author: Thomas Weise <t...@apache.org> AuthorDate: Wed May 22 21:42:15 2019 -0700 [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer --- .../connectors/kinesis/FlinkKinesisConsumer.java | 18 +- .../kinesis/config/ConsumerConfigConstants.java | 11 + .../internals/DynamoDBStreamsDataFetcher.java | 1 + .../kinesis/internals/KinesisDataFetcher.java | 292 +++++++++++++++++++-- .../kinesis/util/JobManagerWatermarkTracker.java | 179 +++++++++++++ .../connectors/kinesis/util/RecordEmitter.java | 269 +++++++++++++++++++ .../connectors/kinesis/util/WatermarkTracker.java | 114 ++++++++ .../kinesis/FlinkKinesisConsumerMigrationTest.java | 2 +- .../kinesis/FlinkKinesisConsumerTest.java | 185 +++++++++++++ .../kinesis/internals/ShardConsumerTest.java | 9 +- .../testutils/TestableKinesisDataFetcher.java | 1 + .../util/JobManagerWatermarkTrackerTest.java | 101 +++++++ .../connectors/kinesis/util/RecordEmitterTest.java | 81 ++++++ .../kinesis/util/WatermarkTrackerTest.java | 108 ++++++++ 14 files changed, 1342 insertions(+), 29 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 3c5e3c7..5b24ded 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; @@ -126,6 +127,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple private KinesisShardAssigner shardAssigner = KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER; private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner; + private WatermarkTracker watermarkTracker; // ------------------------------------------------------------------------ // Runtime state @@ -254,6 +256,20 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple ClosureCleaner.clean(this.periodicWatermarkAssigner, true); } + public WatermarkTracker getWatermarkTracker() { + return this.watermarkTracker; + } + + /** + * Set the global watermark tracker. When set, it will be used by the fetcher + * to align the shard consumers by event time. + * @param watermarkTracker + */ + public void setWatermarkTracker(WatermarkTracker watermarkTracker) { + this.watermarkTracker = watermarkTracker; + ClosureCleaner.clean(this.watermarkTracker, true); + } + // ------------------------------------------------------------------------ // Source life cycle // ------------------------------------------------------------------------ @@ -448,7 +464,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple Properties configProps, KinesisDeserializationSchema<T> deserializationSchema) { - return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner); + return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker); } @VisibleForTesting diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 41ac6b8..2f5be97 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -125,6 +125,15 @@ public class ConsumerConfigConstants extends AWSConfigConstants { /** The interval after which to consider a shard idle for purposes of watermark generation. */ public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval"; + /** The interval for periodically synchronizing the shared watermark state. */ + public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval"; + + /** The maximum delta allowed for the reader to advance ahead of the shared global watermark. */ + public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis"; + + /** The maximum number of records that will be buffered before suspending consumption of a shard. */ + public static final String WATERMARK_SYNC_QUEUE_CAPACITY = "flink.watermark.sync.queue.capacity"; + // ------------------------------------------------------------------------ // Default values for consumer configuration // ------------------------------------------------------------------------ @@ -173,6 +182,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants { public static final long DEFAULT_SHARD_IDLE_INTERVAL_MILLIS = -1; + public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000; + /** * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators. diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java index c2b7be3..5620142 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java @@ -64,6 +64,7 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> { deserializationSchema, shardAssigner, null, + null, new AtomicReference<>(), new ArrayList<>(), createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 8c8d94a..eae3153 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -38,6 +38,9 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter; +import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.InstantiationUtil; @@ -191,6 +194,9 @@ public class KinesisDataFetcher<T> { private volatile boolean running = true; private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner; + private final WatermarkTracker watermarkTracker; + private final transient RecordEmitter recordEmitter; + private transient boolean isIdle; /** * The watermark related state for each shard consumer. Entries in this map will be created when shards @@ -207,6 +213,14 @@ public class KinesisDataFetcher<T> { private long lastWatermark = Long.MIN_VALUE; /** + * The next watermark used for synchronization. + * For purposes of global watermark calculation, we need to consider the next watermark based + * on the buffered records vs. the last emitted watermark to allow for progress. + */ + private long nextWatermark = Long.MIN_VALUE; + + + /** * The time span since last consumed record, after which a shard will be considered idle for purpose of watermark * calculation. A positive value will allow the watermark to progress even when some shards don't receive new records. */ @@ -220,6 +234,82 @@ public class KinesisDataFetcher<T> { } /** + * The wrapper that holds the watermark handling related parameters + * of a record produced by the shard consumer thread. + * + * @param <T> + */ + private static class RecordWrapper<T> extends TimestampedValue<T> { + int shardStateIndex; + SequenceNumber lastSequenceNumber; + long timestamp; + Watermark watermark; + + private RecordWrapper(T record, long timestamp) { + super(record, timestamp); + this.timestamp = timestamp; + } + + @Override + public long getTimestamp() { + return timestamp; + } + } + + /** Kinesis data fetcher specific, asynchronous record emitter. */ + private class AsyncKinesisRecordEmitter extends RecordEmitter<RecordWrapper<T>> { + + private AsyncKinesisRecordEmitter() { + this(DEFAULT_QUEUE_CAPACITY); + } + + private AsyncKinesisRecordEmitter(int queueCapacity) { + super(queueCapacity); + } + + @Override + public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) { + emitRecordAndUpdateState(record); + ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId()); + sws.lastEmittedRecordWatermark = record.watermark; + } + } + + /** Synchronous emitter for use w/o watermark synchronization. */ + private class SyncKinesisRecordEmitter extends AsyncKinesisRecordEmitter { + private final ConcurrentHashMap<Integer, RecordQueue<RecordWrapper<T>>> queues = + new ConcurrentHashMap<>(); + + @Override + public RecordQueue<RecordWrapper<T>> getQueue(int producerIndex) { + return queues.computeIfAbsent(producerIndex, (key) -> { + return new RecordQueue<RecordWrapper<T>>() { + @Override + public void put(RecordWrapper<T> record) { + emit(record, this); + } + + @Override + public int getQueueId() { + return producerIndex; + } + + @Override + public int getSize() { + return 0; + } + + @Override + public RecordWrapper<T> peek() { + return null; + } + + }; + }); + } + } + + /** * Creates a Kinesis Data Fetcher. * * @param streams the streams to subscribe to @@ -234,7 +324,8 @@ public class KinesisDataFetcher<T> { Properties configProps, KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner, - AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) { + AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner, + WatermarkTracker watermarkTracker) { this(streams, sourceContext, sourceContext.getCheckpointLock(), @@ -243,6 +334,7 @@ public class KinesisDataFetcher<T> { deserializationSchema, shardAssigner, periodicWatermarkAssigner, + watermarkTracker, new AtomicReference<>(), new ArrayList<>(), createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), @@ -258,6 +350,7 @@ public class KinesisDataFetcher<T> { KinesisDeserializationSchema<T> deserializationSchema, KinesisShardAssigner shardAssigner, AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner, + WatermarkTracker watermarkTracker, AtomicReference<Throwable> error, List<KinesisStreamShardState> subscribedShardsState, HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds, @@ -272,6 +365,7 @@ public class KinesisDataFetcher<T> { this.deserializationSchema = checkNotNull(deserializationSchema); this.shardAssigner = checkNotNull(shardAssigner); this.periodicWatermarkAssigner = periodicWatermarkAssigner; + this.watermarkTracker = watermarkTracker; this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory); this.kinesis = kinesisProxyFactory.create(configProps); @@ -284,6 +378,17 @@ public class KinesisDataFetcher<T> { this.shardConsumersExecutor = createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks()); + this.recordEmitter = createRecordEmitter(configProps); + } + + private RecordEmitter createRecordEmitter(Properties configProps) { + if (periodicWatermarkAssigner != null && watermarkTracker != null) { + int queueCapacity = Integer.parseInt(configProps.getProperty( + ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY, + Integer.toString(AsyncKinesisRecordEmitter.DEFAULT_QUEUE_CAPACITY))); + return new AsyncKinesisRecordEmitter(queueCapacity); + } + return new SyncKinesisRecordEmitter(); } /** @@ -380,16 +485,37 @@ public class KinesisDataFetcher<T> { ProcessingTimeService timerService = ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService(); LOG.info("Starting periodic watermark emitter with interval {}", periodicWatermarkIntervalMillis); new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start(); + if (watermarkTracker != null) { + // setup global watermark tracking + long watermarkSyncMillis = Long.parseLong( + getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, + Long.toString(ConsumerConfigConstants.DEFAULT_WATERMARK_SYNC_MILLIS))); + watermarkTracker.setUpdateTimeoutMillis(watermarkSyncMillis * 3); // synchronization latency + watermarkTracker.open(runtimeContext); + new WatermarkSyncCallback(timerService, watermarkSyncMillis).start(); + // emit records ahead of watermark to offset synchronization latency + long lookaheadMillis = Long.parseLong( + getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, + Long.toString(0))); + recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, watermarkSyncMillis * 3)); + } } this.shardIdleIntervalMillis = Long.parseLong( getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS))); + + // run record emitter in separate thread since main thread is used for discovery + Thread thread = new Thread(this.recordEmitter); + thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks()); + thread.setDaemon(true); + thread.start(); } // ------------------------------------------------------------------------ // finally, start the infinite shard discovery and consumer launching loop; // we will escape from this loop only when shutdownFetcher() or stopWithError() is called + // TODO: have this thread emit the records for tracking backpressure final long discoveryIntervalMillis = Long.valueOf( configProps.getProperty( @@ -490,6 +616,11 @@ public class KinesisDataFetcher<T> { mainThread.interrupt(); // the main thread may be sleeping for the discovery interval } + if (watermarkTracker != null) { + watermarkTracker.close(); + } + this.recordEmitter.stop(); + if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } @@ -603,28 +734,48 @@ public class KinesisDataFetcher<T> { * @param lastSequenceNumber the last sequence number value to update */ protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { - // track per shard watermarks and emit timestamps extracted from the record, - // when a watermark assigner was configured. - if (periodicWatermarkAssigner != null) { - ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); - Preconditions.checkNotNull( - sws, "shard watermark state initialized in registerNewSubscribedShardState"); + ShardWatermarkState sws = shardWatermarks.get(shardStateIndex); + Preconditions.checkNotNull( + sws, "shard watermark state initialized in registerNewSubscribedShardState"); + Watermark watermark = null; + if (sws.periodicWatermarkAssigner != null) { recordTimestamp = sws.periodicWatermarkAssigner.extractTimestamp(record, sws.lastRecordTimestamp); - sws.lastRecordTimestamp = recordTimestamp; - sws.lastUpdated = getCurrentTimeMillis(); + // track watermark per record since extractTimestamp has side effect + watermark = sws.periodicWatermarkAssigner.getCurrentWatermark(); } + sws.lastRecordTimestamp = recordTimestamp; + sws.lastUpdated = getCurrentTimeMillis(); + RecordWrapper<T> recordWrapper = new RecordWrapper<>(record, recordTimestamp); + recordWrapper.shardStateIndex = shardStateIndex; + recordWrapper.lastSequenceNumber = lastSequenceNumber; + recordWrapper.watermark = watermark; + try { + sws.emitQueue.put(recordWrapper); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Actual record emission called from the record emitter. + * + * <p>Responsible for tracking per shard watermarks and emit timestamps extracted from + * the record, when a watermark assigner was configured. + * + * @param rw + */ + private void emitRecordAndUpdateState(RecordWrapper<T> rw) { synchronized (checkpointLock) { - if (record != null) { - sourceContext.collectWithTimestamp(record, recordTimestamp); + if (rw.getValue() != null) { + sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp); } else { LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.", - lastSequenceNumber, - subscribedShardsState.get(shardStateIndex).getStreamShardHandle()); + rw.lastSequenceNumber, + subscribedShardsState.get(rw.shardStateIndex).getStreamShardHandle()); } - - updateState(shardStateIndex, lastSequenceNumber); + updateState(rw.shardStateIndex, rw.lastSequenceNumber); } } @@ -689,6 +840,7 @@ public class KinesisDataFetcher<T> { } catch (Exception e) { throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e); } + sws.emitQueue = recordEmitter.getQueue(shardStateIndex); sws.lastUpdated = getCurrentTimeMillis(); sws.lastRecordTimestamp = Long.MIN_VALUE; shardWatermarks.put(shardStateIndex, sws); @@ -721,41 +873,57 @@ public class KinesisDataFetcher<T> { protected void emitWatermark() { LOG.debug("Evaluating watermark for subtask {} time {}", indexOfThisConsumerSubtask, getCurrentTimeMillis()); long potentialWatermark = Long.MAX_VALUE; + long potentialNextWatermark = Long.MAX_VALUE; long idleTime = (shardIdleIntervalMillis > 0) ? getCurrentTimeMillis() - shardIdleIntervalMillis : Long.MAX_VALUE; for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) { + Watermark w = e.getValue().lastEmittedRecordWatermark; // consider only active shards, or those that would advance the watermark - Watermark w = e.getValue().periodicWatermarkAssigner.getCurrentWatermark(); - if (w != null && (e.getValue().lastUpdated >= idleTime || w.getTimestamp() > lastWatermark)) { + if (w != null && (e.getValue().lastUpdated >= idleTime + || e.getValue().emitQueue.getSize() > 0 + || w.getTimestamp() > lastWatermark)) { potentialWatermark = Math.min(potentialWatermark, w.getTimestamp()); + // for sync, use the watermark of the next record, when available + // otherwise watermark may stall when record is blocked by synchronization + RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue; + RecordWrapper<T> nextRecord = q.peek(); + Watermark nextWatermark = (nextRecord != null) ? nextRecord.watermark : w; + potentialNextWatermark = Math.min(potentialNextWatermark, nextWatermark.getTimestamp()); } } // advance watermark if possible (watermarks can only be ascending) if (potentialWatermark == Long.MAX_VALUE) { if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) { - LOG.debug("No active shard for subtask {}, marking the source idle.", + LOG.info("No active shard for subtask {}, marking the source idle.", indexOfThisConsumerSubtask); // no active shard, signal downstream operators to not wait for a watermark sourceContext.markAsTemporarilyIdle(); + isIdle = true; } - } else if (potentialWatermark > lastWatermark) { - LOG.debug("Emitting watermark {} from subtask {}", - potentialWatermark, - indexOfThisConsumerSubtask); - sourceContext.emitWatermark(new Watermark(potentialWatermark)); - lastWatermark = potentialWatermark; + } else { + if (potentialWatermark > lastWatermark) { + LOG.debug("Emitting watermark {} from subtask {}", + potentialWatermark, + indexOfThisConsumerSubtask); + sourceContext.emitWatermark(new Watermark(potentialWatermark)); + lastWatermark = potentialWatermark; + isIdle = false; + } + nextWatermark = potentialNextWatermark; } } /** Per shard tracking of watermark and last activity. */ private static class ShardWatermarkState<T> { private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner; + private RecordEmitter.RecordQueue<RecordWrapper<T>> emitQueue; private volatile long lastRecordTimestamp; private volatile long lastUpdated; + private volatile Watermark lastEmittedRecordWatermark; } /** @@ -785,6 +953,82 @@ public class KinesisDataFetcher<T> { } } + /** Timer task to update shared watermark state. */ + private class WatermarkSyncCallback implements ProcessingTimeCallback { + + private final ProcessingTimeService timerService; + private final long interval; + private final MetricGroup shardMetricsGroup; + private long lastGlobalWatermark = Long.MIN_VALUE; + private long propagatedLocalWatermark = Long.MIN_VALUE; + private long logIntervalMillis = 60_000; + private int stalledWatermarkIntervalCount = 0; + private long lastLogged; + + WatermarkSyncCallback(ProcessingTimeService timerService, long interval) { + this.timerService = checkNotNull(timerService); + this.interval = interval; + this.shardMetricsGroup = consumerMetricGroup.addGroup("subtaskId", + String.valueOf(indexOfThisConsumerSubtask)); + this.shardMetricsGroup.gauge("localWatermark", () -> nextWatermark); + this.shardMetricsGroup.gauge("globalWatermark", () -> lastGlobalWatermark); + } + + public void start() { + LOG.info("Registering watermark tracker with interval {}", interval); + timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); + } + + @Override + public void onProcessingTime(long timestamp) { + if (nextWatermark != Long.MIN_VALUE) { + long globalWatermark = lastGlobalWatermark; + // TODO: refresh watermark while idle + if (!(isIdle && nextWatermark == propagatedLocalWatermark)) { + globalWatermark = watermarkTracker.updateWatermark(nextWatermark); + propagatedLocalWatermark = nextWatermark; + } else { + LOG.info("WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask); + } + + if (timestamp - lastLogged > logIntervalMillis) { + lastLogged = System.currentTimeMillis(); + LOG.info("WatermarkSyncCallback subtask: {} local watermark: {}" + + ", global watermark: {}, delta: {} timeouts: {}, emitter: {}", + indexOfThisConsumerSubtask, + nextWatermark, + globalWatermark, + nextWatermark - globalWatermark, + watermarkTracker.getUpdateTimeoutCount(), + recordEmitter.printInfo()); + + // Following is for debugging non-reproducible issue with stalled watermark + if (globalWatermark == nextWatermark && globalWatermark == lastGlobalWatermark + && stalledWatermarkIntervalCount++ > 5) { + // subtask blocks watermark, log to aid troubleshooting + stalledWatermarkIntervalCount = 0; + for (Map.Entry<Integer, ShardWatermarkState> e : shardWatermarks.entrySet()) { + RecordEmitter.RecordQueue<RecordWrapper<T>> q = e.getValue().emitQueue; + RecordWrapper<T> nextRecord = q.peek(); + if (nextRecord != null) { + LOG.info("stalled watermark {} key {} next watermark {} next timestamp {}", + nextWatermark, + e.getKey(), + nextRecord.watermark, + nextRecord.timestamp); + } + } + } + } + + lastGlobalWatermark = globalWatermark; + recordEmitter.setCurrentWatermark(globalWatermark); + } + // schedule next callback + timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); + } + } + /** * Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. * diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java new file mode 100644 index 0000000..f150bb0 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * A {@link WatermarkTracker} that shares state through {@link GlobalAggregateManager}. + */ +@PublicEvolving +public class JobManagerWatermarkTracker extends WatermarkTracker { + + private GlobalAggregateManager aggregateManager; + private final String aggregateName; + private final WatermarkAggregateFunction aggregateFunction = new WatermarkAggregateFunction(); + private final long logAccumulatorIntervalMillis; + private long updateTimeoutCount; + + public JobManagerWatermarkTracker(String aggregateName) { + this(aggregateName, -1); + } + + public JobManagerWatermarkTracker(String aggregateName, long logAccumulatorIntervalMillis) { + super(); + this.aggregateName = aggregateName; + this.logAccumulatorIntervalMillis = logAccumulatorIntervalMillis; + } + + @Override + public long updateWatermark(long localWatermark) { + WatermarkUpdate update = new WatermarkUpdate(); + update.id = getSubtaskId(); + update.watermark = localWatermark; + try { + byte[] resultBytes = aggregateManager.updateGlobalAggregate(aggregateName, + InstantiationUtil.serializeObject(update), aggregateFunction); + WatermarkResult result = InstantiationUtil.deserializeObject(resultBytes, + this.getClass().getClassLoader()); + this.updateTimeoutCount += result.updateTimeoutCount; + return result.watermark; + } catch (ClassNotFoundException | IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void open(RuntimeContext context) { + super.open(context); + this.aggregateFunction.updateTimeoutMillis = super.getUpdateTimeoutMillis(); + this.aggregateFunction.logAccumulatorIntervalMillis = logAccumulatorIntervalMillis; + Preconditions.checkArgument(context instanceof StreamingRuntimeContext); + StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) context; + this.aggregateManager = runtimeContext.getGlobalAggregateManager(); + } + + public long getUpdateTimeoutCount() { + return updateTimeoutCount; + } + + /** Watermark aggregation input. */ + protected static class WatermarkUpdate implements Serializable { + protected long watermark = Long.MIN_VALUE; + protected String id; + } + + /** Watermark aggregation result. */ + protected static class WatermarkResult implements Serializable { + protected long watermark = Long.MIN_VALUE; + protected long updateTimeoutCount = 0; + } + + /** + * Aggregate function for computing a combined watermark of parallel subtasks. + */ + private static class WatermarkAggregateFunction implements + AggregateFunction<byte[], Map<String, WatermarkState>, byte[]> { + + private long updateTimeoutMillis = DEFAULT_UPDATE_TIMEOUT_MILLIS; + private long logAccumulatorIntervalMillis = -1; + + // TODO: wrap accumulator + static long addCount; + static long lastLogged; + private static final Logger LOG = LoggerFactory.getLogger(WatermarkAggregateFunction.class); + + @Override + public Map<String, WatermarkState> createAccumulator() { + return new HashMap<>(); + } + + @Override + public Map<String, WatermarkState> add(byte[] valueBytes, Map<String, WatermarkState> accumulator) { + addCount++; + final WatermarkUpdate value; + try { + value = InstantiationUtil.deserializeObject(valueBytes, this.getClass().getClassLoader()); + } catch (Exception e) { + throw new RuntimeException(e); + } + WatermarkState ws = accumulator.get(value.id); + if (ws == null) { + accumulator.put(value.id, ws = new WatermarkState()); + } + ws.watermark = value.watermark; + ws.lastUpdated = System.currentTimeMillis(); + return accumulator; + } + + @Override + public byte[] getResult(Map<String, WatermarkState> accumulator) { + long updateTimeoutCount = 0; + long currentTime = System.currentTimeMillis(); + long globalWatermark = Long.MAX_VALUE; + for (Map.Entry<String, WatermarkState> e : accumulator.entrySet()) { + WatermarkState ws = e.getValue(); + if (ws.lastUpdated + updateTimeoutMillis < currentTime) { + // ignore outdated entry + updateTimeoutCount++; + continue; + } + globalWatermark = Math.min(ws.watermark, globalWatermark); + } + + WatermarkResult result = new WatermarkResult(); + result.watermark = globalWatermark == Long.MAX_VALUE ? Long.MIN_VALUE : globalWatermark; + result.updateTimeoutCount = updateTimeoutCount; + + if (logAccumulatorIntervalMillis > 0) { + if (currentTime - lastLogged > logAccumulatorIntervalMillis) { + lastLogged = System.currentTimeMillis(); + LOG.info("WatermarkAggregateFunction added: {}, timeout: {}, map: {}", + addCount, updateTimeoutCount, accumulator); + } + } + + try { + return InstantiationUtil.serializeObject(result); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Map<String, WatermarkState> merge(Map<String, WatermarkState> accumulatorA, Map<String, WatermarkState> accumulatorB) { + // not required + throw new UnsupportedOperationException(); + } + } + +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java new file mode 100644 index 0000000..17344b1 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Emitter that handles event time synchronization between producer threads. + * + * <p>Records are organized into per producer queues that will block when capacity is exhausted. + * + * <p>Records are emitted by selecting the oldest available element of all producer queues, + * as long as the timestamp does not exceed the current shared watermark plus allowed lookahead interval. + * + * @param <T> + */ +@Internal +public abstract class RecordEmitter<T extends TimestampedValue> implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RecordEmitter.class); + + /** + * The default capacity of a single queue. + * + * <p>Larger queue size can lead to higher throughput, but also to + * very high heap space consumption, depending on the size of elements. + * + * <p>Note that this is difficult to tune, because it does not take into account + * the size of individual objects. + */ + public static final int DEFAULT_QUEUE_CAPACITY = 100; + + private final int queueCapacity; + private final ConcurrentHashMap<Integer, AsyncRecordQueue<T>> queues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<AsyncRecordQueue<T>, Boolean> emptyQueues = new ConcurrentHashMap<>(); + private final PriorityQueue<AsyncRecordQueue<T>> heads = new PriorityQueue<>(this::compareHeadElement); + private volatile boolean running = true; + private volatile long maxEmitTimestamp = Long.MAX_VALUE; + private long maxLookaheadMillis = 60 * 1000; // one minute + private long idleSleepMillis = 100; + private final Object condition = new Object(); + + public RecordEmitter(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) { + return Long.compare(left.headTimestamp, right.headTimestamp); + } + + /** + * Accepts records from readers. + * + * @param <T> + */ + public interface RecordQueue<T> { + void put(T record) throws InterruptedException; + + int getQueueId(); + + int getSize(); + + T peek(); + } + + private class AsyncRecordQueue<T> implements RecordQueue<T> { + private final ArrayBlockingQueue<T> queue; + private final int queueId; + long headTimestamp; + + private AsyncRecordQueue(int queueId) { + super(); + this.queue = new ArrayBlockingQueue<>(queueCapacity); + this.queueId = queueId; + this.headTimestamp = Long.MAX_VALUE; + } + + public void put(T record) throws InterruptedException { + queue.put(record); + synchronized (condition) { + condition.notify(); + } + } + + public int getQueueId() { + return queueId; + } + + public int getSize() { + return queue.size(); + } + + public T peek() { + return queue.peek(); + } + + } + + /** + * The queue for the given producer (i.e. Kinesis shard consumer thread). + * + * <p>The producer may hold on to the queue for subsequent records. + * + * @param producerIndex + * @return + */ + public RecordQueue<T> getQueue(int producerIndex) { + return queues.computeIfAbsent(producerIndex, (key) -> { + AsyncRecordQueue<T> q = new AsyncRecordQueue<>(producerIndex); + emptyQueues.put(q, false); + return q; + }); + } + + /** + * How far ahead of the watermark records should be emitted. + * + * <p>Needs to account for the latency of obtaining the global watermark. + * + * @param maxLookaheadMillis + */ + public void setMaxLookaheadMillis(long maxLookaheadMillis) { + this.maxLookaheadMillis = maxLookaheadMillis; + LOG.info("[setMaxLookaheadMillis] Max lookahead millis set to {}", maxLookaheadMillis); + } + + /** + * Set the current watermark. + * + * <p>This watermark will be used to control which records to emit from the queues of pending + * elements. When an element timestamp is ahead of the watermark by at least {@link + * #maxLookaheadMillis}, elements in that queue will wait until the watermark advances. + * + * @param watermark + */ + public void setCurrentWatermark(long watermark) { + maxEmitTimestamp = watermark + maxLookaheadMillis; + synchronized (condition) { + condition.notify(); + } + LOG.info( + "[setCurrentWatermark] Current watermark set to {}, maxEmitTimestamp = {}", + watermark, + maxEmitTimestamp); + } + + /** + * Run loop, does not return unless {@link #stop()} was called. + */ + @Override + public void run() { + LOG.info("Starting emitter with maxLookaheadMillis: {}", this.maxLookaheadMillis); + + // emit available records, ordered by timestamp + AsyncRecordQueue<T> min = heads.poll(); + runLoop: + while (running) { + // find a queue to emit from + while (min == null) { + // check new or previously empty queues + if (!emptyQueues.isEmpty()) { + for (AsyncRecordQueue<T> queueHead : emptyQueues.keySet()) { + if (!queueHead.queue.isEmpty()) { + emptyQueues.remove(queueHead); + queueHead.headTimestamp = queueHead.queue.peek().getTimestamp(); + heads.offer(queueHead); + } + } + } + min = heads.poll(); + if (min == null) { + synchronized (condition) { + // wait for work + try { + condition.wait(idleSleepMillis); + } catch (InterruptedException e) { + continue runLoop; + } + } + } + } + + // wait until ready to emit min or another queue receives elements + while (min.headTimestamp > maxEmitTimestamp) { + synchronized (condition) { + // wait until ready to emit + try { + condition.wait(idleSleepMillis); + } catch (InterruptedException e) { + continue runLoop; + } + if (min.headTimestamp > maxEmitTimestamp && !emptyQueues.isEmpty()) { + // see if another queue can make progress + heads.offer(min); + min = null; + continue runLoop; + } + } + } + + // emit up to queue capacity records + // cap on empty queues since older records may arrive + AsyncRecordQueue<T> nextQueue = heads.poll(); + T record; + int emitCount = 0; + while ((record = min.queue.poll()) != null) { + emit(record, min); + // track last record emitted, even when queue becomes empty + min.headTimestamp = record.getTimestamp(); + // potentially switch to next queue + if ((nextQueue != null && min.headTimestamp > nextQueue.headTimestamp) + || (min.headTimestamp > maxEmitTimestamp) + || (emitCount++ > queueCapacity && !emptyQueues.isEmpty())) { + break; + } + } + if (record == null) { + this.emptyQueues.put(min, true); + } else { + heads.offer(min); + } + min = nextQueue; + } + } + + public void stop() { + running = false; + } + + /** Emit the record. This is specific to a connector, like the Kinesis data fetcher. */ + public abstract void emit(T record, RecordQueue<T> source); + + /** Summary of emit queues that can be used for logging. */ + public String printInfo() { + StringBuffer sb = new StringBuffer(); + sb.append(String.format("queues: %d, empty: %d", + queues.size(), emptyQueues.size())); + for (Map.Entry<Integer, AsyncRecordQueue<T>> e : queues.entrySet()) { + AsyncRecordQueue q = e.getValue(); + sb.append(String.format("\n%d timestamp: %s size: %d", + e.getValue().queueId, q.headTimestamp, q.queue.size())); + } + return sb.toString(); + } + +} diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java new file mode 100644 index 0000000..f4207c7 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTracker.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import java.io.Closeable; +import java.io.Serializable; + +/** + * The watermark tracker is responsible for aggregating watermarks across distributed operators. + * <p/>It can be used for sub tasks of a single Flink source as well as multiple heterogeneous + * sources or other operators. + * <p/>The class essentially functions like a distributed hash table that enclosing operators can + * use to adopt their processing / IO rates. + */ +@PublicEvolving +public abstract class WatermarkTracker implements Closeable, Serializable { + + public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000; + + /** + * Subtasks that have not provided a watermark update within the configured interval will be + * considered idle and excluded from target watermark calculation. + */ + private long updateTimeoutMillis = DEFAULT_UPDATE_TIMEOUT_MILLIS; + + /** + * Unique id for the subtask. + * Using string (instead of subtask index) so synchronization can spawn across multiple sources. + */ + private String subtaskId; + + /** Watermark state. */ + protected static class WatermarkState { + protected long watermark = Long.MIN_VALUE; + protected long lastUpdated; + + public long getWatermark() { + return watermark; + } + + @Override + public String toString() { + return "WatermarkState{watermark=" + watermark + ", lastUpdated=" + lastUpdated + '}'; + } + } + + protected String getSubtaskId() { + return this.subtaskId; + } + + protected long getUpdateTimeoutMillis() { + return this.updateTimeoutMillis; + } + + public abstract long getUpdateTimeoutCount(); + + /** + * Subtasks that have not provided a watermark update within the configured interval will be + * considered idle and excluded from target watermark calculation. + * + * @param updateTimeoutMillis + */ + public void setUpdateTimeoutMillis(long updateTimeoutMillis) { + this.updateTimeoutMillis = updateTimeoutMillis; + } + + /** + * Set the current watermark of the owning subtask and return the global low watermark based on + * the current state snapshot. Periodically called by the enclosing consumer instance, which is + * responsible for any timer management etc. + * + * @param localWatermark + * @return + */ + public abstract long updateWatermark(final long localWatermark); + + protected long getCurrentTime() { + return System.currentTimeMillis(); + } + + public void open(RuntimeContext context) { + if (context instanceof StreamingRuntimeContext) { + this.subtaskId = ((StreamingRuntimeContext) context).getOperatorUniqueID() + + "-" + context.getIndexOfThisSubtask(); + } else { + this.subtaskId = context.getTaskNameWithSubtasks(); + } + } + + @Override + public void close() { + // no work to do here + } + +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java index b38eef1..1ce05d1 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java @@ -418,7 +418,7 @@ public class FlinkKinesisConsumerMigrationTest { HashMap<StreamShardMetadata, SequenceNumber> testStateSnapshot, List<StreamShardHandle> testInitialDiscoveryShards) { - super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null); + super(streams, sourceContext, runtimeContext, configProps, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null, null); this.testStateSnapshot = testStateSnapshot; this.testInitialDiscoveryShards = testInitialDiscoveryShards; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index d36d68a..cbcd8b4 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -52,6 +52,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.CollectingSourceContext; @@ -60,6 +61,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -79,6 +81,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -737,6 +740,7 @@ public class FlinkKinesisConsumerTest { deserializationSchema, getShardAssigner(), getPeriodicWatermarkAssigner(), + null, new AtomicReference<>(), new ArrayList<>(), subscribedStreamsToLastDiscoveredShardIds, @@ -775,6 +779,10 @@ public class FlinkKinesisConsumerTest { public void emitWatermark(Watermark mark) { watermarks.add(mark); } + + @Override + public void markAsTemporarilyIdle() { + } }; new Thread( @@ -817,6 +825,164 @@ public class FlinkKinesisConsumerTest { assertThat(watermarks, org.hamcrest.Matchers.contains(new Watermark(-3), new Watermark(5))); } + @Test + public void testSourceSynchronization() throws Exception { + + final String streamName = "fakeStreamName"; + final Time maxOutOfOrderness = Time.milliseconds(5); + final long autoWatermarkInterval = 1_000; + final long watermarkSyncInterval = autoWatermarkInterval + 1; + + HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>(); + subscribedStreamsToLastDiscoveredShardIds.put(streamName, null); + + final KinesisDeserializationSchema<String> deserializationSchema = + new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()); + Properties props = new Properties(); + props.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(10L)); + props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, + Long.toString(watermarkSyncInterval)); + props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5)); + + BlockingQueue<String> shard1 = new LinkedBlockingQueue(); + BlockingQueue<String> shard2 = new LinkedBlockingQueue(); + + Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>(); + streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2)); + + // override createFetcher to mock Kinesis + FlinkKinesisConsumer<String> sourceFunc = + new FlinkKinesisConsumer<String>(streamName, deserializationSchema, props) { + @Override + protected KinesisDataFetcher<String> createFetcher( + List<String> streams, + SourceFunction.SourceContext<String> sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema<String> deserializationSchema) { + + KinesisDataFetcher<String> fetcher = + new KinesisDataFetcher<String>( + streams, + sourceContext, + sourceContext.getCheckpointLock(), + runtimeContext, + configProps, + deserializationSchema, + getShardAssigner(), + getPeriodicWatermarkAssigner(), + getWatermarkTracker(), + new AtomicReference<>(), + new ArrayList<>(), + subscribedStreamsToLastDiscoveredShardIds, + (props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords( + streamToQueueMap) + ) {}; + return fetcher; + } + }; + + sourceFunc.setShardAssigner( + (streamShardHandle, i) -> { + // shardId-000000000000 + return Integer.parseInt( + streamShardHandle.getShard().getShardId().substring("shardId-".length())); + }); + + sourceFunc.setPeriodicWatermarkAssigner(new TestTimestampExtractor(maxOutOfOrderness)); + + sourceFunc.setWatermarkTracker(new TestWatermarkTracker()); + + // there is currently no test harness specifically for sources, + // so we overlay the source thread here + AbstractStreamOperatorTestHarness<Object> testHarness = + new AbstractStreamOperatorTestHarness<Object>( + new StreamSource(sourceFunc), 1, 1, 0); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final ConcurrentLinkedQueue<Object> results = testHarness.getOutput(); + + @SuppressWarnings("unchecked") + SourceFunction.SourceContext<String> sourceContext = new CollectingSourceContext( + testHarness.getCheckpointLock(), results) { + @Override + public void markAsTemporarilyIdle() { + } + + @Override + public void emitWatermark(Watermark mark) { + results.add(mark); + } + }; + + new Thread( + () -> { + try { + sourceFunc.run(sourceContext); + } catch (InterruptedException e) { + // expected on cancel + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .start(); + + ArrayList<Object> expectedResults = new ArrayList<>(); + + final long record1 = 1; + shard1.put(Long.toString(record1)); + expectedResults.add(Long.toString(record1)); + awaitRecordCount(results, expectedResults.size()); + + // at this point we know the fetcher was initialized + final KinesisDataFetcher fetcher = org.powermock.reflect.Whitebox.getInternalState(sourceFunc, "fetcher"); + + // trigger watermark emit + testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); + expectedResults.add(new Watermark(-4)); + // verify watermark + awaitRecordCount(results, expectedResults.size()); + assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + assertEquals(0, TestWatermarkTracker.WATERMARK.get()); + + // trigger sync + testHarness.setProcessingTime(testHarness.getProcessingTime() + 1); + TestWatermarkTracker.assertSingleWatermark(-4); + + final long record2 = record1 + (watermarkSyncInterval * 3) + 1; + shard1.put(Long.toString(record2)); + + // TODO: check for record received instead + Thread.sleep(100); + + // Advance the watermark. Since the new record is past global watermark + threshold, + // it won't be emitted and the watermark does not advance + testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); + assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark")); + TestWatermarkTracker.assertSingleWatermark(-4); + + // Trigger global watermark sync + testHarness.setProcessingTime(testHarness.getProcessingTime() + 1); + expectedResults.add(Long.toString(record2)); + awaitRecordCount(results, expectedResults.size()); + assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + TestWatermarkTracker.assertSingleWatermark(3000); + + // Trigger watermark update and emit + testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); + expectedResults.add(new Watermark(3000)); + assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + + sourceFunc.cancel(); + testHarness.close(); + } + private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception { long timeoutMillis = System.currentTimeMillis() + 10_000; while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) { @@ -837,4 +1003,23 @@ public class FlinkKinesisConsumerTest { } } + private static class TestWatermarkTracker extends WatermarkTracker { + + private static final AtomicLong WATERMARK = new AtomicLong(); + + @Override + public long getUpdateTimeoutCount() { + return 0; + } + + @Override + public long updateWatermark(long localWatermark) { + WATERMARK.set(localWatermark); + return localWatermark; + } + + static void assertSingleWatermark(long expected) { + Assert.assertEquals(expected, WATERMARK.get()); + } + } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java index dbc7118..93886f9 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java @@ -113,9 +113,10 @@ public class ShardConsumerTest { KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), Mockito.mock(KinesisProxyInterface.class)); + int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0)); new ShardConsumer<>( fetcher, - 0, + shardIndex, subscribedShardsStateUnderTest.get(0).getStreamShardHandle(), subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L), @@ -151,9 +152,10 @@ public class ShardConsumerTest { KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), Mockito.mock(KinesisProxyInterface.class)); + int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0)); new ShardConsumer<>( fetcher, - 0, + shardIndex, subscribedShardsStateUnderTest.get(0).getStreamShardHandle(), subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), // Get a total of 1000 records with 9 getRecords() calls, @@ -195,9 +197,10 @@ public class ShardConsumerTest { KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), Mockito.mock(KinesisProxyInterface.class)); + int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0)); new ShardConsumer<>( fetcher, - 0, + shardIndex, subscribedShardsStateUnderTest.get(0).getStreamShardHandle(), subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), // Initial number of records to fetch --> 10 diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java index f1fd069..3bb11bd 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java @@ -74,6 +74,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> { deserializationSchema, DEFAULT_SHARD_ASSIGNER, null, + null, thrownErrorUnderTest, subscribedShardsStateUnderTest, subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java new file mode 100644 index 0000000..b793b54 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Test for {@link JobManagerWatermarkTracker}. */ +public class JobManagerWatermarkTrackerTest { + + private static MiniCluster flink; + + @BeforeClass + public static void setUp() throws Exception { + final Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 0); + + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(config) + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(1) + .build(); + + flink = new MiniCluster(miniClusterConfiguration); + + flink.start(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (flink != null) { + flink.close(); + } + } + + @Test + public void testUpateWatermark() throws Exception { + final Configuration clientConfiguration = new Configuration(); + clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + flink.getRestAddress().get().getHost(), + flink.getRestAddress().get().getPort(), + clientConfiguration); + + env.addSource(new TestSourceFunction(new JobManagerWatermarkTracker("fakeId"))) + .addSink(new SinkFunction<Integer>() {}); + env.execute(); + } + + private static class TestSourceFunction extends RichSourceFunction<Integer> { + + private final JobManagerWatermarkTracker tracker; + + public TestSourceFunction(JobManagerWatermarkTracker tracker) { + this.tracker = tracker; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + tracker.open(getRuntimeContext()); + } + + @Override + public void run(SourceContext<Integer> ctx) { + Assert.assertEquals(998, tracker.updateWatermark(998)); + Assert.assertEquals(999, tracker.updateWatermark(999)); + } + + @Override + public void cancel() { + } + } + +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java new file mode 100644 index 0000000..1948237 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; + +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** Test for {@link RecordEmitter}. */ +public class RecordEmitterTest { + + static List<TimestampedValue> results = Collections.synchronizedList(new ArrayList<>()); + + private class TestRecordEmitter extends RecordEmitter<TimestampedValue> { + + private TestRecordEmitter() { + super(DEFAULT_QUEUE_CAPACITY); + } + + @Override + public void emit(TimestampedValue record, RecordQueue<TimestampedValue> queue) { + results.add(record); + } + } + + @Test + public void test() throws Exception { + + TestRecordEmitter emitter = new TestRecordEmitter(); + + final TimestampedValue<String> one = new TimestampedValue<>("one", 1); + final TimestampedValue<String> two = new TimestampedValue<>("two", 2); + final TimestampedValue<String> five = new TimestampedValue<>("five", 5); + final TimestampedValue<String> ten = new TimestampedValue<>("ten", 10); + + final RecordEmitter.RecordQueue<TimestampedValue> queue0 = emitter.getQueue(0); + final RecordEmitter.RecordQueue<TimestampedValue> queue1 = emitter.getQueue(1); + + queue0.put(one); + queue0.put(five); + queue0.put(ten); + + queue1.put(two); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(emitter); + + long timeout = System.currentTimeMillis() + 10_000; + while (results.size() != 4 && System.currentTimeMillis() < timeout) { + Thread.sleep(100); + } + emitter.stop(); + executor.shutdownNow(); + + Assert.assertThat(results, Matchers.contains(one, five, two, ten)); + } + +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java new file mode 100644 index 0000000..3d59a45 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/WatermarkTrackerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link WatermarkTracker}. */ +public class WatermarkTrackerTest { + + WatermarkTracker.WatermarkState wm1 = new WatermarkTracker.WatermarkState(); + MutableLong clock = new MutableLong(0); + + private class TestWatermarkTracker extends WatermarkTracker { + /** + * The watermarks of all sub tasks that participate in the synchronization. + */ + private final Map<String, WatermarkState> watermarks = new HashMap<>(); + + private long updateTimeoutCount = 0; + + @Override + protected long getCurrentTime() { + return clock.longValue(); + } + + @Override + public long updateWatermark(final long localWatermark) { + refreshWatermarkSnapshot(this.watermarks); + + long currentTime = getCurrentTime(); + String subtaskId = this.getSubtaskId(); + + WatermarkState ws = watermarks.get(subtaskId); + if (ws == null) { + watermarks.put(subtaskId, ws = new WatermarkState()); + } + ws.lastUpdated = currentTime; + ws.watermark = Math.max(ws.watermark, localWatermark); + saveWatermark(subtaskId, ws); + + long globalWatermark = ws.watermark; + for (Map.Entry<String, WatermarkState> e : watermarks.entrySet()) { + ws = e.getValue(); + if (ws.lastUpdated + getUpdateTimeoutMillis() < currentTime) { + // ignore outdated subtask + updateTimeoutCount++; + continue; + } + globalWatermark = Math.min(ws.watermark, globalWatermark); + } + return globalWatermark; + } + + protected void refreshWatermarkSnapshot(Map<String, WatermarkState> watermarks) { + watermarks.put("wm1", wm1); + } + + protected void saveWatermark(String id, WatermarkState ws) { + // do nothing + } + + public long getUpdateTimeoutCount() { + return updateTimeoutCount; + } + } + + @Test + public void test() { + long watermark = 0; + TestWatermarkTracker ws = new TestWatermarkTracker(); + ws.open(new MockStreamingRuntimeContext(false, 1, 0)); + Assert.assertEquals(Long.MIN_VALUE, ws.updateWatermark(Long.MIN_VALUE)); + Assert.assertEquals(Long.MIN_VALUE, ws.updateWatermark(watermark)); + // timeout wm1 + clock.add(WatermarkTracker.DEFAULT_UPDATE_TIMEOUT_MILLIS + 1); + Assert.assertEquals(watermark, ws.updateWatermark(watermark)); + Assert.assertEquals(watermark, ws.updateWatermark(watermark - 1)); + + // min watermark + wm1.watermark = watermark + 1; + wm1.lastUpdated = clock.longValue(); + Assert.assertEquals(watermark, ws.updateWatermark(watermark)); + Assert.assertEquals(watermark + 1, ws.updateWatermark(watermark + 1)); + } + +}