This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d43aeafa5a5 [FLINK-35047][state] Shutdown StateExecutors when 
ForStKeyedStateBackend is disposed (#24768)
d43aeafa5a5 is described below

commit d43aeafa5a59a53887526a7464fd3ba2d2802895
Author: Jinzhong Li <lijinzhong2...@gmail.com>
AuthorDate: Fri May 10 12:53:49 2024 +0800

    [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is 
disposed (#24768)
---
 .../asyncprocessing/AsyncExecutionController.java  |  2 +-
 .../runtime/asyncprocessing/StateExecutor.java     |  7 +-
 .../runtime/state/AsyncKeyedStateBackend.java      |  3 +
 .../AsyncExecutionControllerTest.java              | 86 ++++++++++++++++------
 .../flink/state/forst/ForStKeyedStateBackend.java  | 44 ++++++++++-
 5 files changed, 116 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index 693ad8753f1..d06888ab6c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -90,7 +90,7 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler {
     private final StateFutureFactory<K> stateFutureFactory;
 
     /** The state executor where the {@link StateRequest} is actually 
executed. */
-    final StateExecutor stateExecutor;
+    private final StateExecutor stateExecutor;
 
     /** The corresponding context that currently runs in task thread. */
     RecordContext<K> currentContext;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
index caf0f504d94..bbc506e1b1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java
@@ -22,7 +22,12 @@ import org.apache.flink.annotation.Internal;
 
 import java.util.concurrent.CompletableFuture;
 
-/** Executor for executing batch {@link StateRequest}s. */
+/**
+ * Executor for executing batch {@link StateRequest}s.
+ *
+ * <p>Notice that the owner who create the {@code StateExecutor} is 
responsible for shutting down it
+ * when it is no longer in use.
+ */
 @Internal
 public interface StateExecutor {
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
index 70cdfbef767..3f49984bd7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java
@@ -58,6 +58,9 @@ public interface AsyncKeyedStateBackend extends Disposable, 
Closeable {
      * Creates a {@code StateExecutor} which supports to execute a batch of 
state requests
      * asynchronously.
      *
+     * <p>Notice that the {@code AsyncKeyedStateBackend} is responsible for 
shutting down the
+     * StateExecutors created by itself when they are no longer in use.
+     *
      * @return a {@code StateExecutor} which supports to execute a batch of 
state requests
      *     asynchronously.
      */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index c40edbf3940..de2bbc73294 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.core.state.StateFutureUtils;
 import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
@@ -39,6 +40,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
@@ -73,7 +75,9 @@ class AsyncExecutionControllerTest {
             long timeout,
             int maxInFlight,
             MailboxExecutor mailboxExecutor,
-            AsyncFrameworkExceptionHandler exceptionHandler) {
+            AsyncFrameworkExceptionHandler exceptionHandler,
+            CloseableRegistry closeableRegistry)
+            throws IOException {
         StateExecutor stateExecutor = new TestStateExecutor();
         ValueStateDescriptor<Integer> stateDescriptor =
                 new ValueStateDescriptor<>("test-value-state", 
BasicTypeInfo.INT_TYPE_INFO);
@@ -88,6 +92,8 @@ class AsyncExecutionControllerTest {
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        closeableRegistry.registerCloseable(asyncKeyedStateBackend);
+        closeableRegistry.registerCloseable(asyncKeyedStateBackend::dispose);
         aec =
                 new AsyncExecutionController<>(
                         mailboxExecutor,
@@ -109,13 +115,15 @@ class AsyncExecutionControllerTest {
     }
 
     @Test
-    void testBasicRun() {
+    void testBasicRun() throws IOException {
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         setup(
                 100,
                 10000L,
                 1000,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         // ============================ element1 ============================
         String record1 = "key1-r1";
         String key1 = "key1";
@@ -220,16 +228,20 @@ class AsyncExecutionControllerTest {
         assertThat(aec.inFlightRecordNum.get()).isEqualTo(0);
         assertThat(output.get()).isEqualTo(1);
         assertThat(recordContext4.getReferenceCount()).isEqualTo(0);
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testRecordsRunInOrder() {
+    void testRecordsRunInOrder() throws IOException {
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         setup(
                 100,
                 10000L,
                 1000,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         // Record1 and record3 have the same key, record2 has a different key.
         // Record2 should be processed before record3.
 
@@ -284,18 +296,22 @@ class AsyncExecutionControllerTest {
         assertThat(output.get()).isEqualTo(2);
         assertThat(recordContext3.getReferenceCount()).isEqualTo(0);
         assertThat(aec.inFlightRecordNum.get()).isEqualTo(0);
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testInFlightRecordControl() {
+    void testInFlightRecordControl() throws IOException {
         int batchSize = 5;
         int maxInFlight = 10;
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         setup(
                 batchSize,
                 10000L,
                 maxInFlight,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         // For records with different keys, the in-flight records is 
controlled by batch size.
         for (int round = 0; round < 10; round++) {
             for (int i = 0; i < batchSize; i++) {
@@ -334,16 +350,20 @@ class AsyncExecutionControllerTest {
             assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
             
assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(maxInFlight);
         }
+
+        resourceRegistry.close();
     }
 
     @Test
-    public void testSyncPoint() {
+    public void testSyncPoint() throws IOException {
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         setup(
                 1000,
                 10000L,
                 6000,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         AtomicInteger counter = new AtomicInteger(0);
 
         // Test the sync point processing without a key occupied.
@@ -384,18 +404,22 @@ class AsyncExecutionControllerTest {
         assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
         assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
         recordContext2.release();
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testBufferTimeout() {
+    void testBufferTimeout() throws IOException {
         int batchSize = 5;
         int timeout = 1000;
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         setup(
                 batchSize,
                 timeout,
                 1000,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         ManuallyTriggeredScheduledExecutorService scheduledExecutor =
                 new ManuallyTriggeredScheduledExecutorService();
         aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor;
@@ -457,18 +481,22 @@ class AsyncExecutionControllerTest {
         assertThat(scheduledFuture.isDone()).isTrue();
         assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2);
         assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1);
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testBufferTimeoutSkip() {
+    void testBufferTimeoutSkip() throws IOException {
         int batchSize = 3;
         int timeout = 1000;
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         setup(
                 batchSize,
                 timeout,
                 1000,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         ManuallyTriggeredScheduledExecutorService scheduledExecutor =
                 new ManuallyTriggeredScheduledExecutorService();
         aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor;
@@ -526,14 +554,17 @@ class AsyncExecutionControllerTest {
         assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2);
         assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1);
         
assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue();
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testUserCodeException() {
+    void testUserCodeException() throws IOException {
         TestAsyncFrameworkExceptionHandler exceptionHandler =
                 new TestAsyncFrameworkExceptionHandler();
         TestMailboxExecutor testMailboxExecutor = new 
TestMailboxExecutor(false);
-        setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler);
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
+        setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler, 
resourceRegistry);
         Runnable userCode =
                 () -> {
                     valueState
@@ -562,14 +593,17 @@ class AsyncExecutionControllerTest {
                 .isEqualTo("Artificial exception in user code");
         assertThat(exceptionHandler.exception).isNull();
         assertThat(exceptionHandler.message).isNull();
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testFrameworkException() {
+    void testFrameworkException() throws IOException {
         TestAsyncFrameworkExceptionHandler exceptionHandler =
                 new TestAsyncFrameworkExceptionHandler();
         TestMailboxExecutor testMailboxExecutor = new 
TestMailboxExecutor(true);
-        setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler);
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
+        setup(1000, 10000, 6000, testMailboxExecutor, exceptionHandler, 
resourceRegistry);
         Runnable userCode = () -> valueState.asyncValue().thenAccept(val -> 
{});
         String record = "record";
         String key = "key";
@@ -588,16 +622,20 @@ class AsyncExecutionControllerTest {
                 .isEqualTo("java.lang.RuntimeException: Fail to execute.");
         assertThat(exceptionHandler.message)
                 .isEqualTo("Caught exception when submitting StateFuture's 
callback.");
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testEpochManager() {
+    void testEpochManager() throws Exception {
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         setup(
                 1000,
                 10000,
                 6000,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         AtomicInteger output = new AtomicInteger(0);
         Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> 
output.incrementAndGet());
 
@@ -622,10 +660,13 @@ class AsyncExecutionControllerTest {
         assertThat(output.get()).isEqualTo(3);
         // SERIAL_BETWEEN_EPOCH mode would drain in-flight records on 
non-record arriving.
         assertThat(epoch1.ongoingRecordCount).isEqualTo(0);
+
+        resourceRegistry.close();
     }
 
     @Test
-    void testMixEpochMode() {
+    void testMixEpochMode() throws Exception {
+        CloseableRegistry resourceRegistry = new CloseableRegistry();
         // epoch1(parallel mode) -> epoch2(parallel mode) -> epoch3(serial 
mode),
         // when epoch2 close, epoch1 is still in-flight.
         // when epoch3 close, all in-flight records should drain, epoch1 and 
epoch2 should finish.
@@ -634,7 +675,8 @@ class AsyncExecutionControllerTest {
                 10000,
                 6000,
                 new SyncMailboxExecutor(),
-                new TestAsyncFrameworkExceptionHandler());
+                new TestAsyncFrameworkExceptionHandler(),
+                resourceRegistry);
         AtomicInteger output = new AtomicInteger(0);
         Runnable userCode = () -> valueState.asyncValue().thenAccept(v -> 
output.incrementAndGet());
 
@@ -678,6 +720,8 @@ class AsyncExecutionControllerTest {
         assertThat(epoch2.ongoingRecordCount).isEqualTo(0);
         assertThat(epoch3.ongoingRecordCount).isEqualTo(0);
         assertThat(output.get()).isEqualTo(6);
+
+        resourceRegistry.close();
     }
 
     /** Simulate the underlying state that is actually used to execute the 
request. */
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 292cbd835fc..5192536a8dc 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
 import org.apache.flink.runtime.state.v2.StateDescriptor;
 import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -38,10 +39,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -91,6 +95,17 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
     /** Handler to handle state request. */
     private StateRequestHandler stateRequestHandler;
 
+    /** Lock guarding the {@code managedStateExecutors} and {@code disposed}. 
*/
+    private final Object lock = new Object();
+
+    /** The StateExecutors which are managed by this ForStKeyedStateBackend. */
+    @GuardedBy("lock")
+    private final Set<StateExecutor> managedStateExecutors;
+
+    /** The flag indicating whether ForStKeyedStateBackend is closed. */
+    @GuardedBy("lock")
+    private boolean closed = false;
+
     // mark whether this backend is already disposed and prevent duplicate 
disposing
     private boolean disposed = false;
 
@@ -113,6 +128,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
         this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
         this.defaultColumnFamily = defaultColumnFamilyHandle;
         this.nativeMetricMonitor = nativeMetricMonitor;
+        this.managedStateExecutors = new HashSet<>(1);
     }
 
     @Override
@@ -147,8 +163,17 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
     @Override
     @Nonnull
     public StateExecutor createStateExecutor() {
-        // TODO: Make io parallelism configurable
-        return new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+        synchronized (lock) {
+            if (closed) {
+                throw new FlinkRuntimeException(
+                        "Attempt to create StateExecutor after 
ForStKeyedStateBackend is disposed.");
+            }
+            // TODO: Make io parallelism configurable
+            StateExecutor stateExecutor =
+                    new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+            managedStateExecutors.add(stateExecutor);
+            return stateExecutor;
+        }
     }
 
     /** Should only be called by one thread, and only after all accesses to 
the DB happened. */
@@ -157,6 +182,11 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
         if (this.disposed) {
             return;
         }
+        synchronized (lock) {
+            if (!closed) {
+                IOUtils.closeQuietly(this);
+            }
+        }
 
         // IMPORTANT: null reference to signal potential async checkpoint 
workers that the db was
         // disposed, as
@@ -207,6 +237,14 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
 
     @Override
     public void close() throws IOException {
-        // do nothing currently, native resources will be release in dispose 
method
+        synchronized (lock) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            for (StateExecutor executor : managedStateExecutors) {
+                executor.shutdown();
+            }
+        }
     }
 }

Reply via email to