This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 61834da8298 [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844) 61834da8298 is described below commit 61834da82984323484718e551c15e31ce023e026 Author: Seth Saperstein <99828679+sethsaperstein-l...@users.noreply.github.com> AuthorDate: Mon Nov 28 15:35:01 2022 -0800 [FLINK-29099][connectors/kinesis] Update global watermark for idle subtask (#20844) --- .../connectors/kinesis/internals/KinesisDataFetcher.java | 14 ++++++++++++-- .../kinesis/util/JobManagerWatermarkTracker.java | 5 +++++ .../kinesis/util/JobManagerWatermarkTrackerTest.java | 1 + 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 4fcc80a250e..d6c7b296da8 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -1174,6 +1174,14 @@ public class KinesisDataFetcher<T> { } } + LOG.debug( + "WatermarkEmitter subtask: {}, last watermark: {}, potential watermark: {}" + + ", potential next watermark: {}", + indexOfThisConsumerSubtask, + lastWatermark, + potentialWatermark, + potentialNextWatermark); + // advance watermark if possible (watermarks can only be ascending) if (potentialWatermark == Long.MAX_VALUE) { if (shardWatermarks.isEmpty() || shardIdleIntervalMillis > 0) { @@ -1265,11 +1273,11 @@ public class KinesisDataFetcher<T> { public void onProcessingTime(long timestamp) { if (nextWatermark != Long.MIN_VALUE) { long globalWatermark = lastGlobalWatermark; - // TODO: refresh watermark while idle if (!(isIdle && nextWatermark == propagatedLocalWatermark)) { globalWatermark = watermarkTracker.updateWatermark(nextWatermark); propagatedLocalWatermark = nextWatermark; } else { + globalWatermark = watermarkTracker.updateWatermark(Long.MIN_VALUE); LOG.info( "WatermarkSyncCallback subtask: {} is idle", indexOfThisConsumerSubtask); @@ -1279,12 +1287,14 @@ public class KinesisDataFetcher<T> { lastLogged = System.currentTimeMillis(); LOG.info( "WatermarkSyncCallback subtask: {} local watermark: {}" - + ", global watermark: {}, delta: {} timeouts: {}, emitter: {}", + + ", global watermark: {}, delta: {} timeouts: {}, idle: {}" + + ", emitter: {}", indexOfThisConsumerSubtask, nextWatermark, globalWatermark, nextWatermark - globalWatermark, watermarkTracker.getUpdateTimeoutCount(), + isIdle, recordEmitter.printInfo()); // Following is for debugging non-reproducible issue with stalled watermark diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java index b4c78438dca..51d055e2c96 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java @@ -129,6 +129,11 @@ public class JobManagerWatermarkTracker extends WatermarkTracker { } catch (Exception e) { throw new RuntimeException(e); } + // no op to get global watermark without updating it + if (value.watermark == Long.MIN_VALUE) { + addCount--; + return accumulator; + } WatermarkState ws = accumulator.get(value.id); if (ws == null) { accumulator.put(value.id, ws = new WatermarkState()); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java index c5ea32aa2f5..5af64295812 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTrackerTest.java @@ -67,6 +67,7 @@ public class JobManagerWatermarkTrackerTest { public void run(SourceContext<Integer> ctx) { assertThat(tracker.updateWatermark(998)).isEqualTo(998); assertThat(tracker.updateWatermark(999)).isEqualTo(999); + assertThat(tracker.updateWatermark(Long.MIN_VALUE)).isEqualTo(999); } @Override