This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93558de5b2590ab909368d1020982b7d57e286a7
Author: Efrat Levitan <[email protected]>
AuthorDate: Thu Feb 19 16:09:40 2026 +0200

    [FLINK-39073][runtime] Improve logging of invalid split transitions
    
    Split transitions directly from paused to idle and from idle to pause don't 
make any sense, improving the logs to allow further analysis. It is a 
convention for flink metrics system to not fail the job though.
---
 .../runtime/metrics/groups/InternalSourceSplitMetricGroup.java      | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
index 706606a1598..f7efb1b8772 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
@@ -48,6 +48,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
     private static final long SPLIT_NOT_STARTED = -1L;
     private long splitStartTime = SPLIT_NOT_STARTED;
     private final MetricGroup splitWatermarkMetricGroup;
+    private final String splitId;
 
     private InternalSourceSplitMetricGroup(
             MetricGroup parentMetricGroup,
@@ -56,6 +57,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
             Gauge<Long> currentWatermark) {
         super(parentMetricGroup);
         this.clock = clock;
+        this.splitId = splitId;
         splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, 
splitId).addGroup(WATERMARK);
         pausedTimePerSecond =
                 splitWatermarkMetricGroup.gauge(
@@ -118,7 +120,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
             // If a split got paused it means it emitted records,
             // hence it shouldn't be considered idle anymore
             markNotIdle();
-            LOG.warn("Split marked paused while still idle");
+            LOG.warn("[{}] Split marked paused while still idle", splitId);
         }
         this.pausedTimePerSecond.markStart();
     }
@@ -129,7 +131,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
             // If a split is marked idle, it has no records to emit.
             // hence it shouldn't be considered paused anymore
             markNotPaused();
-            LOG.warn("Split marked idle while still paused");
+            LOG.warn("[{}] Split marked idle while still paused", splitId);
         }
         this.idleTimePerSecond.markStart();
     }

Reply via email to