This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 40dca974558 [FLINK-38804][runtime] Ensure channelStateWriter is closed 
after the inputGates
40dca974558 is described below

commit 40dca974558eaef8622a3080680bdbdde71e9366
Author: Efrat Levitan <[email protected]>
AuthorDate: Sun Jan 11 16:58:01 2026 +0200

    [FLINK-38804][runtime] Ensure channelStateWriter is closed after the 
inputGates
    
    [FLINK-38804][runtime] Ensure channelStateWriter is closed after the 
inputGates
---
 .../state/api/runtime/SavepointEnvironment.java    | 16 ++++++
 .../flink/runtime/execution/Environment.java       |  5 ++
 .../runtime/taskmanager/RuntimeEnvironment.java    | 14 +++++
 .../org/apache/flink/runtime/taskmanager/Task.java | 24 +++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    |  1 -
 .../operators/testutils/DummyEnvironment.java      | 16 ++++++
 .../operators/testutils/MockEnvironment.java       | 16 ++++++
 .../apache/flink/runtime/taskmanager/TaskTest.java | 62 +++++++++++++++++++++-
 .../runtime/tasks/StreamMockEnvironment.java       | 14 +++++
 10 files changed, 168 insertions(+), 2 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 42ffb7e6037..dbd918da7bf 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
@@ -70,6 +71,8 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.flink.util.concurrent.Executors;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -119,6 +122,8 @@ public class SavepointEnvironment implements Environment {
 
     private final ChannelStateWriteRequestExecutorFactory 
channelStateExecutorFactory;
 
+    @Nullable private ChannelStateWriter channelStateWriter;
+
     private SavepointEnvironment(
             RuntimeContext ctx,
             ExecutionConfig executionConfig,
@@ -440,4 +445,15 @@ public class SavepointEnvironment implements Environment {
             return CompletableFuture.completedFuture(null);
         }
     }
+
+    @Override
+    public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
+        this.channelStateWriter = channelStateWriter;
+    }
+
+    @Override
+    @Nullable
+    public ChannelStateWriter getChannelStateWriter() {
+        return this.channelStateWriter;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index a4b203b5b9d..4b449a4d129 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -280,4 +281,8 @@ public interface Environment {
     }
 
     ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory();
+
+    void setChannelStateWriter(ChannelStateWriter channelStateWriter);
+
+    ChannelStateWriter getChannelStateWriter();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index d5e297c54b0..69df0116802 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
@@ -118,6 +119,8 @@ public class RuntimeEnvironment implements Environment {
 
     ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory;
 
+    @Nullable private ChannelStateWriter channelStateWriter;
+
     // ------------------------------------------------------------------------
 
     public RuntimeEnvironment(
@@ -408,4 +411,15 @@ public class RuntimeEnvironment implements Environment {
     public ChannelStateWriteRequestExecutorFactory 
getChannelStateExecutorFactory() {
         return channelStateExecutorFactory;
     }
+
+    public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
+        checkState(this.channelStateWriter == null, "Cannot set 
channelStateWriter twice!");
+        this.channelStateWriter = channelStateWriter;
+    }
+
+    @Override
+    @Nullable
+    public ChannelStateWriter getChannelStateWriter() {
+        return this.channelStateWriter;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1be391cd985..844820041b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -311,6 +312,9 @@ public class Task
      */
     private UserCodeClassLoader userCodeClassLoader;
 
+    /** The channelStateWriter of the env. We obtain it after the invokable is 
initialized. */
+    @Nullable private volatile ChannelStateWriter channelStateWriter;
+
     /**
      * <b>IMPORTANT:</b> This constructor may not start any work that would 
need to be undone in the
      * case of a failing task deployment.
@@ -508,6 +512,12 @@ public class Task
         return invokable;
     }
 
+    @Nullable
+    @VisibleForTesting
+    ChannelStateWriter getChannelStateWriter() {
+        return channelStateWriter;
+    }
+
     public boolean isBackPressured() {
         if (invokable == null
                 || partitionWriters.length == 0
@@ -749,6 +759,10 @@ public class Task
                 FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
             }
 
+            // We register a reference to the channelStateWriter
+            // so we can close it after the inputGates close
+            this.channelStateWriter = env.getChannelStateWriter();
+
             // ----------------------------------------------------------------
             //  actual task core work
             // ----------------------------------------------------------------
@@ -1011,6 +1025,16 @@ public class Task
         }
         closeAllResultPartitions();
         closeAllInputGates();
+        if (this.channelStateWriter != null) {
+            LOG.debug("Closing channelStateWriter for task {}", 
taskNameWithSubtask);
+            try {
+                this.channelStateWriter.close();
+            } catch (Throwable t) {
+                ExceptionUtils.rethrowIfFatalError(t);
+                LOG.error(
+                        "Failed to close channelStateWriter for task {}.", 
taskNameWithSubtask, t);
+            }
+        }
 
         try {
             taskStateManager.close();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a7e02d45846..505e67a30d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -522,6 +522,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                                     CheckpointingOptions
                                                             
.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE))
                             : ChannelStateWriter.NO_OP;
+            environment.setChannelStateWriter(channelStateWriter);
+
             this.subtaskCheckpointCoordinator =
                     new SubtaskCheckpointCoordinatorImpl(
                             checkpointStorageAccess,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 24c63a7e5ab..811354c47f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -579,7 +579,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             }
         }
         IOUtils.closeAllQuietly(asyncCheckpointRunnables);
-        channelStateWriter.close();
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 0a6a425c4ac..fe291c1b4e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
@@ -61,6 +62,8 @@ import 
org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
 import org.apache.flink.util.UserCodeClassLoader;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -88,6 +91,8 @@ public class DummyEnvironment implements Environment {
 
     private CheckpointStorageAccess checkpointStorageAccess;
 
+    @Nullable private ChannelStateWriter channelStateWriter;
+
     public DummyEnvironment() {
         this("Test Job", 1, 0, 1);
     }
@@ -312,4 +317,15 @@ public class DummyEnvironment implements Environment {
     public CheckpointStorageAccess getCheckpointStorageAccess() {
         return checkNotNull(checkpointStorageAccess);
     }
+
+    @Override
+    public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
+        this.channelStateWriter = channelStateWriter;
+    }
+
+    @Override
+    @Nullable
+    public ChannelStateWriter getChannelStateWriter() {
+        return this.channelStateWriter;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 1b4a7bb8172..c50eface312 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
@@ -65,6 +66,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.flink.util.concurrent.Executors;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -149,6 +152,8 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
     private final ChannelStateWriteRequestExecutorFactory 
channelStateExecutorFactory;
 
+    @Nullable private ChannelStateWriter channelStateWriter;
+
     public static MockEnvironmentBuilder builder() {
         return new MockEnvironmentBuilder();
     }
@@ -495,4 +500,15 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
     public void setExternalFailureCauseConsumer(Consumer<Throwable> 
externalFailureCauseConsumer) {
         this.externalFailureCauseConsumer = 
Optional.of(externalFailureCauseConsumer);
     }
+
+    @Override
+    public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
+        this.channelStateWriter = channelStateWriter;
+    }
+
+    @Override
+    @Nullable
+    public ChannelStateWriter getChannelStateWriter() {
+        return this.channelStateWriter;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 65bc95d4b0c..36a248934e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
@@ -77,6 +78,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
@@ -1249,6 +1251,34 @@ public class TaskTest extends TestLogger {
         assertEquals(ExecutionState.FINISHED, 
task.getTerminationFuture().getNow(null));
     }
 
+    private void testChannelStateWriterCloses(Class<? extends 
TriggerLatchInvokable> invokable)
+            throws Exception {
+        final Task task =
+                createTaskBuilder()
+                        .setInvokable(invokable)
+                        .setTaskManagerActions(new NoOpTaskManagerActions())
+                        .build(Executors.directExecutor());
+
+        task.startTaskThread();
+        awaitInvokableLatch(task);
+        ChannelStateWriterWithCloseTracker channelStateWriter =
+                (ChannelStateWriterWithCloseTracker) 
task.getChannelStateWriter();
+        assertFalse(channelStateWriter.isClosed());
+        triggerInvokableLatch(task);
+        task.getExecutingThread().join();
+        assertTrue(channelStateWriter.isClosed());
+    }
+
+    @Test
+    public void testChannelStateWriterClosesOnSuccess() throws Exception {
+        testChannelStateWriterCloses(ChannelStateWriterSetterInvokable.class);
+    }
+
+    @Test
+    public void testChannelStateWriterClosesOnFailure() throws Exception {
+        
testChannelStateWriterCloses(FailingChannelStateWriterSetterInvokable.class);
+    }
+
     private void assertCheckpointDeclined(
             Task task,
             TestCheckpointResponder testCheckpointResponder,
@@ -1576,7 +1606,7 @@ public class TaskTest extends TestLogger {
     }
 
     /** {@link AbstractInvokable} which throws {@link RuntimeException} on 
invoke. */
-    public static final class InvokableWithExceptionOnTrigger extends 
TriggerLatchInvokable {
+    public static class InvokableWithExceptionOnTrigger extends 
TriggerLatchInvokable {
         public InvokableWithExceptionOnTrigger(Environment environment) {
             super(environment);
         }
@@ -1762,4 +1792,34 @@ public class TaskTest extends TestLogger {
             }
         }
     }
+
+    private static class ChannelStateWriterWithCloseTracker
+            extends ChannelStateWriter.NoOpChannelStateWriter {
+        private final AtomicBoolean closeCalled = new AtomicBoolean(false);
+
+        @Override
+        public void close() {
+            closeCalled.set(true);
+        }
+
+        public boolean isClosed() {
+            return closeCalled.get();
+        }
+    }
+
+    private static class ChannelStateWriterSetterInvokable extends 
InvokableBlockingWithTrigger {
+
+        public ChannelStateWriterSetterInvokable(Environment environment) {
+            super(environment);
+            environment.setChannelStateWriter(new 
ChannelStateWriterWithCloseTracker());
+        }
+    }
+
+    private static class FailingChannelStateWriterSetterInvokable
+            extends InvokableWithExceptionOnTrigger {
+        public FailingChannelStateWriterSetterInvokable(Environment 
environment) {
+            super(environment);
+            environment.setChannelStateWriter(new 
ChannelStateWriterWithCloseTracker());
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9fb6b733762..dd4ef8994c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
@@ -143,6 +144,8 @@ public class StreamMockEnvironment implements Environment {
 
     private CheckpointStorageAccess checkpointStorageAccess;
 
+    @Nullable private ChannelStateWriter channelStateWriter;
+
     public StreamMockEnvironment(
             Configuration jobConfig,
             Configuration taskConfig,
@@ -455,4 +458,15 @@ public class StreamMockEnvironment implements Environment {
     public CheckpointStorageAccess getCheckpointStorageAccess() {
         return checkpointStorageAccess;
     }
+
+    @Override
+    public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
+        this.channelStateWriter = channelStateWriter;
+    }
+
+    @Override
+    @Nullable
+    public ChannelStateWriter getChannelStateWriter() {
+        return this.channelStateWriter;
+    }
 }

Reply via email to