This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d1bf5c8f69 [FLINK-34975][state/forst] Execute read/write state 
request in different executor (#25360)
1d1bf5c8f69 is described below

commit 1d1bf5c8f69b56831386f52fdd0b5bfdc286c049
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Mon Sep 23 12:01:46 2024 +0800

    [FLINK-34975][state/forst] Execute read/write state request in different 
executor (#25360)
---
 .../state/forst/ForStConfigurableOptions.java      |  5 +++
 .../flink/state/forst/ForStKeyedStateBackend.java  |  7 +++-
 .../org/apache/flink/state/forst/ForStOptions.java | 14 +++++++
 .../flink/state/forst/ForStResourceContainer.java  |  8 ++++
 .../flink/state/forst/ForStStateExecutor.java      | 45 +++++++++++++++++-----
 .../flink/state/forst/ForStStateExecutorTest.java  |  6 ++-
 6 files changed, 71 insertions(+), 14 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java
index bf2b0c9f009..322373e97b7 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java
@@ -39,6 +39,8 @@ import java.util.Set;
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
+import static 
org.apache.flink.state.forst.ForStOptions.EXECUTOR_READ_IO_PARALLELISM;
+import static 
org.apache.flink.state.forst.ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM;
 import static org.rocksdb.CompactionStyle.FIFO;
 import static org.rocksdb.CompactionStyle.LEVEL;
 import static org.rocksdb.CompactionStyle.NONE;
@@ -322,6 +324,9 @@ public class ForStConfigurableOptions implements 
Serializable {
 
     static final ConfigOption<?>[] CANDIDATE_CONFIGS =
             new ConfigOption<?>[] {
+                // configurable forst executor
+                EXECUTOR_WRITE_IO_PARALLELISM,
+                EXECUTOR_READ_IO_PARALLELISM,
                 // configurable DBOptions
                 MAX_BACKGROUND_THREADS,
                 MAX_OPEN_FILES,
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 84f09edd5fe..4140943d989 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -274,9 +274,12 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
                 throw new FlinkRuntimeException(
                         "Attempt to create StateExecutor after 
ForStKeyedStateBackend is disposed.");
             }
-            // TODO: Make io parallelism configurable
             StateExecutor stateExecutor =
-                    new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+                    new ForStStateExecutor(
+                            optionsContainer.getReadIoParallelism(),
+                            optionsContainer.getWriteIoParallelism(),
+                            db,
+                            optionsContainer.getWriteOptions());
             managedStateExecutors.add(stateExecutor);
             return stateExecutor;
         }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
index 7ab0fb23898..7dc55903937 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
@@ -156,4 +156,18 @@ public class ForStOptions {
                                             + "when '%s' is configured to 
'%s'. Increasing this value can improve the performance "
                                             + "of rocksdb timer service, but 
consumes more heap memory at the same time.",
                                     TIMER_SERVICE_FACTORY.key(), 
ForStDB.name()));
+
+    public static final ConfigOption<Integer> EXECUTOR_READ_IO_PARALLELISM =
+            
ConfigOptions.key("state.backend.forst.memory.executor-read-io-parallelism")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The number of threads used for read IO operations 
in the executor.");
+
+    public static final ConfigOption<Integer> EXECUTOR_WRITE_IO_PARALLELISM =
+            
ConfigOptions.key("state.backend.forst.memory.executor-write-io-parallelism")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "The number of threads used for write IO 
operations in the executor.");
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index 66699923bbe..5b48ef3711b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -277,6 +277,14 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         }
     }
 
+    public int getReadIoParallelism() {
+        return configuration.get(ForStOptions.EXECUTOR_READ_IO_PARALLELISM);
+    }
+
+    public int getWriteIoParallelism() {
+        return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM);
+    }
+
     /**
      * Prepare local and remote directories.
      *
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index e577e9c2d56..b9ef67eb0dd 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -52,8 +52,11 @@ public class ForStStateExecutor implements StateExecutor {
      */
     private final ExecutorService coordinatorThread;
 
-    /** The worker thread that actually executes the {@link StateRequest}s. */
-    private final ExecutorService workerThreads;
+    /** The worker thread that actually executes the read {@link 
StateRequest}s. */
+    private final ExecutorService readThreads;
+
+    /** The worker thread that actually executes the write {@link 
StateRequest}s. */
+    private final ExecutorService writeThreads;
 
     private final RocksDB db;
 
@@ -61,13 +64,29 @@ public class ForStStateExecutor implements StateExecutor {
 
     private Throwable executionError;
 
-    public ForStStateExecutor(int ioParallelism, RocksDB db, WriteOptions 
writeOptions) {
+    public ForStStateExecutor(
+            int readIoParallelism, int writeIoParallelism, RocksDB db, 
WriteOptions writeOptions) {
+        Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 
0);
         this.coordinatorThread =
                 Executors.newSingleThreadScheduledExecutor(
                         new 
ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
-        this.workerThreads =
-                Executors.newFixedThreadPool(
-                        ioParallelism, new 
ExecutorThreadFactory("ForSt-StateExecutor-IO"));
+        if (readIoParallelism <= 0 || writeIoParallelism <= 0) {
+            this.readThreads =
+                    Executors.newFixedThreadPool(
+                            Math.max(readIoParallelism, writeIoParallelism),
+                            new 
ExecutorThreadFactory("ForSt-StateExecutor-IO"));
+            this.writeThreads = null;
+        } else {
+            this.readThreads =
+                    Executors.newFixedThreadPool(
+                            readIoParallelism,
+                            new 
ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
+
+            this.writeThreads =
+                    Executors.newFixedThreadPool(
+                            writeIoParallelism,
+                            new 
ExecutorThreadFactory("ForSt-StateExecutor-write-IO"));
+        }
         this.db = db;
         this.writeOptions = writeOptions;
     }
@@ -89,7 +108,10 @@ public class ForStStateExecutor implements StateExecutor {
                     if (!putRequests.isEmpty()) {
                         ForStWriteBatchOperation writeOperations =
                                 new ForStWriteBatchOperation(
-                                        db, putRequests, writeOptions, 
workerThreads);
+                                        db,
+                                        putRequests,
+                                        writeOptions,
+                                        writeThreads == null ? readThreads : 
writeThreads);
                         futures.add(writeOperations.process());
                     }
 
@@ -97,7 +119,7 @@ public class ForStStateExecutor implements StateExecutor {
                             stateRequestClassifier.pollDbGetRequests();
                     if (!getRequests.isEmpty()) {
                         ForStGeneralMultiGetOperation getOperations =
-                                new ForStGeneralMultiGetOperation(db, 
getRequests, workerThreads);
+                                new ForStGeneralMultiGetOperation(db, 
getRequests, readThreads);
                         futures.add(getOperations.process());
                     }
 
@@ -105,7 +127,7 @@ public class ForStStateExecutor implements StateExecutor {
                             stateRequestClassifier.pollDbIterRequests();
                     if (!iterRequests.isEmpty()) {
                         ForStIterateOperation iterOperations =
-                                new ForStIterateOperation(db, iterRequests, 
workerThreads);
+                                new ForStIterateOperation(db, iterRequests, 
readThreads);
                         futures.add(iterOperations.process());
                     }
 
@@ -155,7 +177,10 @@ public class ForStStateExecutor implements StateExecutor {
 
     @Override
     public void shutdown() {
-        workerThreads.shutdown();
+        readThreads.shutdown();
+        if (writeThreads != null) {
+            writeThreads.shutdown();
+        }
         coordinatorThread.shutdown();
         LOG.info("Shutting down the ForStStateExecutor.");
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
index 00248ed4c7f..df8170962a4 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java
@@ -46,7 +46,8 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase 
{
     @Test
     @SuppressWarnings("unchecked")
     void testExecuteValueStateRequest() throws Exception {
-        ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, 
new WriteOptions());
+        ForStStateExecutor forStStateExecutor =
+                new ForStStateExecutor(3, 1, db, new WriteOptions());
         ForStValueState<Integer, VoidNamespace, String> state1 =
                 buildForStValueState("value-state-1");
         ForStValueState<Integer, VoidNamespace, String> state2 =
@@ -129,7 +130,8 @@ class ForStStateExecutorTest extends 
ForStDBOperationTestBase {
 
     @Test
     void testExecuteMapStateRequest() throws Exception {
-        ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, 
new WriteOptions());
+        ForStStateExecutor forStStateExecutor =
+                new ForStStateExecutor(3, 1, db, new WriteOptions());
         ForStMapState<Integer, VoidNamespace, String, String> state =
                 buildForStMapState("map-state");
         StateRequestContainer stateRequestContainer =

Reply via email to