This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6a8afd98dab90572472dd89fd398db34822647d
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Aug 13 15:29:06 2021 +0200

    [FLINK-23767][connectors] Revert going automatically idle when no 
partitions are (temporarily) available.
    
    This is in-line with the refined definition where only user controlled 
settings should turn a source instance idle. For example, previously a Kinesis 
source with finished shard would go idle without a way for the user to avoid it 
causing potential late records when a new shard is assigned to that particular 
reader instance.
---
 .../apache/flink/api/common/eventtime/CombinedWatermarkStatus.java    | 1 -
 .../flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java    | 4 ++--
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
index 0975ddc..474f5d3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
@@ -69,7 +69,6 @@ final class CombinedWatermarkStatus {
         // if we don't have any outputs minimumOverAllOutputs is not valid, 
it's still
         // at its initial Long.MAX_VALUE state and we must not emit that
         if (partialWatermarks.isEmpty()) {
-            this.idle = combinedWatermark > Long.MIN_VALUE;
             return false;
         }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
index 61ce8d5..e712c2a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
@@ -382,7 +382,7 @@ public class WatermarkOutputMultiplexerTest {
     }
 
     @Test
-    public void testEmittingIdleAfterAllSplitsRemoved() {
+    public void testNotEmittingIdleAfterAllSplitsRemoved() {
         final TestingWatermarkOutput underlyingWatermarkOutput = 
createTestingWatermarkOutput();
         final WatermarkOutputMultiplexer multiplexer =
                 new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
@@ -396,7 +396,7 @@ public class WatermarkOutputMultiplexerTest {
 
         multiplexer.onPeriodicEmit();
         assertThat(underlyingWatermarkOutput.lastWatermark(), 
equalTo(emittedWatermark));
-        assertThat(underlyingWatermarkOutput.isIdle(), equalTo(true));
+        assertThat(underlyingWatermarkOutput.isIdle(), equalTo(false));
     }
 
     /**

Reply via email to