This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5eb8d268e7fc8583ac6ca8714687dd8ad3a478b5
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Fri Jun 30 19:11:39 2023 +0800

    [FLINK-32496][connectors/common][refactor] Improve watermark alignment log 
for easier troubleshooting
---
 .../flink/runtime/source/coordinator/SourceCoordinator.java  | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 80f4abcabd0..f3b4f9dc775 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -181,9 +181,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
 
         Set<Integer> subTaskIds = combinedWatermark.keySet();
         LOG.info(
-                "Distributing maxAllowedWatermark={} to subTaskIds={}",
+                "Distributing maxAllowedWatermark={} of group={} to 
subTaskIds={} for source {}.",
                 maxAllowedWatermark,
-                subTaskIds);
+                watermarkAlignmentParams.getWatermarkGroup(),
+                subTaskIds,
+                operatorName);
 
         // Subtask maybe during deploying or restarting, so we only send 
WatermarkAlignmentEvent
         // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
@@ -611,7 +613,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
                             + "scenario (e.g. if speculative execution is 
enabled)");
         }
 
-        LOG.debug("New reported watermark={} from subTaskId={}", watermark, 
subtask);
+        LOG.debug(
+                "New reported watermark={} from subTaskId={} of source {}.",
+                watermark,
+                subtask,
+                operatorName);
 
         checkState(watermarkAlignmentParams.isEnabled());
 

Reply via email to