This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cdf43aecafa [FLINK-35933] Skip distributing maxAllowedWatermark if 
there are no subtasks
cdf43aecafa is described below

commit cdf43aecafa33a50a4b4b6841c4cf505b195994a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Jul 30 17:21:49 2024 +0000

    [FLINK-35933] Skip distributing maxAllowedWatermark if there are no subtasks
---
 .../flink/runtime/source/coordinator/SourceCoordinator.java       | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 3133bbe7ce7..982310bc540 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -166,6 +166,13 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
 
     @VisibleForTesting
     void announceCombinedWatermark() {
+        Set<Integer> subTaskIds = combinedWatermark.keySet();
+        if (subTaskIds.isEmpty()) {
+            LOG.debug(
+                    "Skip distributing maxAllowedWatermark of group={} for 
source {} - no subtasks.",
+                    watermarkAlignmentParams.getWatermarkGroup(),
+                    operatorName);
+        }
         checkState(
                 watermarkAlignmentParams != 
WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
 
@@ -190,7 +197,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
             maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
         }
 
-        Set<Integer> subTaskIds = combinedWatermark.keySet();
         LOG.info(
                 "Distributing maxAllowedWatermark={} of group={} to 
subTaskIds={} for source {}.",
                 maxAllowedWatermark,

Reply via email to