This is an automated email from the ASF dual-hosted git repository.

1996fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 04a31ba953f [FLINK-39145][tests] Skip counters accumulation when state 
isn't initialized
04a31ba953f is described below

commit 04a31ba953f357cf50345b277227cd2ac930f28d
Author: Efrat Levitan <[email protected]>
AuthorDate: Tue May 26 10:35:14 2026 +0300

    [FLINK-39145][tests] Skip counters accumulation when state isn't initialized
    
    Sink task might be cancelled before state was initialized, should be 
handled as a valid scenario to avoid test failures due to null state.
---
 .../flink/test/checkpointing/UnalignedCheckpointTestBase.java | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index efa88a570d6..80ab0bee405 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -1121,10 +1121,13 @@ abstract class UnalignedCheckpointTestBase {
 
         @Override
         public void close() throws Exception {
-            numOutputCounter.add(state.numOutput);
-            outOfOrderCounter.add(state.numOutOfOrderness);
-            duplicatesCounter.add(state.numDuplicates);
-            lostCounter.add(state.numLostValues);
+            // sink task might be cancelled before state was initialized
+            if (state != null) {
+                numOutputCounter.add(state.numOutput);
+                outOfOrderCounter.add(state.numOutOfOrderness);
+                duplicatesCounter.add(state.numDuplicates);
+                lostCounter.add(state.numLostValues);
+            }
             if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 
0) {
                 
numFailures.add(getRuntimeContext().getTaskInfo().getAttemptNumber());
             }

Reply via email to