This is an automated email from the ASF dual-hosted git repository.
1996fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 04a31ba953f [FLINK-39145][tests] Skip counters accumulation when state
isn't initialized
04a31ba953f is described below
commit 04a31ba953f357cf50345b277227cd2ac930f28d
Author: Efrat Levitan <[email protected]>
AuthorDate: Tue May 26 10:35:14 2026 +0300
[FLINK-39145][tests] Skip counters accumulation when state isn't initialized
Sink task might be cancelled before state was initialized, should be
handled as a valid scenario to avoid test failures due to null state.
---
.../flink/test/checkpointing/UnalignedCheckpointTestBase.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index efa88a570d6..80ab0bee405 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -1121,10 +1121,13 @@ abstract class UnalignedCheckpointTestBase {
@Override
public void close() throws Exception {
- numOutputCounter.add(state.numOutput);
- outOfOrderCounter.add(state.numOutOfOrderness);
- duplicatesCounter.add(state.numDuplicates);
- lostCounter.add(state.numLostValues);
+ // sink task might be cancelled before state was initialized
+ if (state != null) {
+ numOutputCounter.add(state.numOutput);
+ outOfOrderCounter.add(state.numOutOfOrderness);
+ duplicatesCounter.add(state.numDuplicates);
+ lostCounter.add(state.numLostValues);
+ }
if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() ==
0) {
numFailures.add(getRuntimeContext().getTaskInfo().getAttemptNumber());
}