This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d6a8afd98dab90572472dd89fd398db34822647d Author: Arvid Heise <ar...@ververica.com> AuthorDate: Fri Aug 13 15:29:06 2021 +0200 [FLINK-23767][connectors] Revert going automatically idle when no partitions are (temporarily) available. This is in-line with the refined definition where only user controlled settings should turn a source instance idle. For example, previously a Kinesis source with finished shard would go idle without a way for the user to avoid it causing potential late records when a new shard is assigned to that particular reader instance. --- .../apache/flink/api/common/eventtime/CombinedWatermarkStatus.java | 1 - .../flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java index 0975ddc..474f5d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java @@ -69,7 +69,6 @@ final class CombinedWatermarkStatus { // if we don't have any outputs minimumOverAllOutputs is not valid, it's still // at its initial Long.MAX_VALUE state and we must not emit that if (partialWatermarks.isEmpty()) { - this.idle = combinedWatermark > Long.MIN_VALUE; return false; } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java index 61ce8d5..e712c2a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java @@ -382,7 +382,7 @@ public class WatermarkOutputMultiplexerTest { } @Test - public void testEmittingIdleAfterAllSplitsRemoved() { + public void testNotEmittingIdleAfterAllSplitsRemoved() { final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput(); final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput); @@ -396,7 +396,7 @@ public class WatermarkOutputMultiplexerTest { multiplexer.onPeriodicEmit(); assertThat(underlyingWatermarkOutput.lastWatermark(), equalTo(emittedWatermark)); - assertThat(underlyingWatermarkOutput.isIdle(), equalTo(true)); + assertThat(underlyingWatermarkOutput.isIdle(), equalTo(false)); } /**