This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0170f60a7273830ba5e5f323e7533cae05d26d1c Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Fri Jun 30 19:11:39 2023 +0800 [FLINK-32496][connectors/common][refactor] Improve watermark alignment log for easier troubleshooting --- .../flink/runtime/source/coordinator/SourceCoordinator.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 80f4abcabd0..f3b4f9dc775 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -181,9 +181,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> Set<Integer> subTaskIds = combinedWatermark.keySet(); LOG.info( - "Distributing maxAllowedWatermark={} to subTaskIds={}", + "Distributing maxAllowedWatermark={} of group={} to subTaskIds={} for source {}.", maxAllowedWatermark, - subTaskIds); + watermarkAlignmentParams.getWatermarkGroup(), + subTaskIds, + operatorName); // Subtask maybe during deploying or restarting, so we only send WatermarkAlignmentEvent // to ready task to avoid period task fail (Java-ThreadPoolExecutor will not schedule @@ -611,7 +613,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> + "scenario (e.g. if speculative execution is enabled)"); } - LOG.debug("New reported watermark={} from subTaskId={}", watermark, subtask); + LOG.debug( + "New reported watermark={} from subTaskId={} of source {}.", + watermark, + subtask, + operatorName); checkState(watermarkAlignmentParams.isEnabled());