This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1d1bf5c8f69 [FLINK-34975][state/forst] Execute read/write state request in different executor (#25360) 1d1bf5c8f69 is described below commit 1d1bf5c8f69b56831386f52fdd0b5bfdc286c049 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Mon Sep 23 12:01:46 2024 +0800 [FLINK-34975][state/forst] Execute read/write state request in different executor (#25360) --- .../state/forst/ForStConfigurableOptions.java | 5 +++ .../flink/state/forst/ForStKeyedStateBackend.java | 7 +++- .../org/apache/flink/state/forst/ForStOptions.java | 14 +++++++ .../flink/state/forst/ForStResourceContainer.java | 8 ++++ .../flink/state/forst/ForStStateExecutor.java | 45 +++++++++++++++++----- .../flink/state/forst/ForStStateExecutorTest.java | 6 ++- 6 files changed, 71 insertions(+), 14 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java index bf2b0c9f009..322373e97b7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java @@ -39,6 +39,8 @@ import java.util.Set; import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.configuration.description.LinkElement.link; import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.state.forst.ForStOptions.EXECUTOR_READ_IO_PARALLELISM; +import static org.apache.flink.state.forst.ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM; import static org.rocksdb.CompactionStyle.FIFO; import static org.rocksdb.CompactionStyle.LEVEL; import static org.rocksdb.CompactionStyle.NONE; @@ -322,6 +324,9 @@ public class ForStConfigurableOptions implements Serializable { static final ConfigOption<?>[] CANDIDATE_CONFIGS = new ConfigOption<?>[] { + // configurable forst executor + EXECUTOR_WRITE_IO_PARALLELISM, + EXECUTOR_READ_IO_PARALLELISM, // configurable DBOptions MAX_BACKGROUND_THREADS, MAX_OPEN_FILES, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 84f09edd5fe..4140943d989 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -274,9 +274,12 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend { throw new FlinkRuntimeException( "Attempt to create StateExecutor after ForStKeyedStateBackend is disposed."); } - // TODO: Make io parallelism configurable StateExecutor stateExecutor = - new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); + new ForStStateExecutor( + optionsContainer.getReadIoParallelism(), + optionsContainer.getWriteIoParallelism(), + db, + optionsContainer.getWriteOptions()); managedStateExecutors.add(stateExecutor); return stateExecutor; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java index 7ab0fb23898..7dc55903937 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java @@ -156,4 +156,18 @@ public class ForStOptions { + "when '%s' is configured to '%s'. Increasing this value can improve the performance " + "of rocksdb timer service, but consumes more heap memory at the same time.", TIMER_SERVICE_FACTORY.key(), ForStDB.name())); + + public static final ConfigOption<Integer> EXECUTOR_READ_IO_PARALLELISM = + ConfigOptions.key("state.backend.forst.memory.executor-read-io-parallelism") + .intType() + .defaultValue(3) + .withDescription( + "The number of threads used for read IO operations in the executor."); + + public static final ConfigOption<Integer> EXECUTOR_WRITE_IO_PARALLELISM = + ConfigOptions.key("state.backend.forst.memory.executor-write-io-parallelism") + .intType() + .defaultValue(1) + .withDescription( + "The number of threads used for write IO operations in the executor."); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index 66699923bbe..5b48ef3711b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -277,6 +277,14 @@ public final class ForStResourceContainer implements AutoCloseable { } } + public int getReadIoParallelism() { + return configuration.get(ForStOptions.EXECUTOR_READ_IO_PARALLELISM); + } + + public int getWriteIoParallelism() { + return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM); + } + /** * Prepare local and remote directories. * diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java index e577e9c2d56..b9ef67eb0dd 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java @@ -52,8 +52,11 @@ public class ForStStateExecutor implements StateExecutor { */ private final ExecutorService coordinatorThread; - /** The worker thread that actually executes the {@link StateRequest}s. */ - private final ExecutorService workerThreads; + /** The worker thread that actually executes the read {@link StateRequest}s. */ + private final ExecutorService readThreads; + + /** The worker thread that actually executes the write {@link StateRequest}s. */ + private final ExecutorService writeThreads; private final RocksDB db; @@ -61,13 +64,29 @@ public class ForStStateExecutor implements StateExecutor { private Throwable executionError; - public ForStStateExecutor(int ioParallelism, RocksDB db, WriteOptions writeOptions) { + public ForStStateExecutor( + int readIoParallelism, int writeIoParallelism, RocksDB db, WriteOptions writeOptions) { + Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 0); this.coordinatorThread = Executors.newSingleThreadScheduledExecutor( new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator")); - this.workerThreads = - Executors.newFixedThreadPool( - ioParallelism, new ExecutorThreadFactory("ForSt-StateExecutor-IO")); + if (readIoParallelism <= 0 || writeIoParallelism <= 0) { + this.readThreads = + Executors.newFixedThreadPool( + Math.max(readIoParallelism, writeIoParallelism), + new ExecutorThreadFactory("ForSt-StateExecutor-IO")); + this.writeThreads = null; + } else { + this.readThreads = + Executors.newFixedThreadPool( + readIoParallelism, + new ExecutorThreadFactory("ForSt-StateExecutor-read-IO")); + + this.writeThreads = + Executors.newFixedThreadPool( + writeIoParallelism, + new ExecutorThreadFactory("ForSt-StateExecutor-write-IO")); + } this.db = db; this.writeOptions = writeOptions; } @@ -89,7 +108,10 @@ public class ForStStateExecutor implements StateExecutor { if (!putRequests.isEmpty()) { ForStWriteBatchOperation writeOperations = new ForStWriteBatchOperation( - db, putRequests, writeOptions, workerThreads); + db, + putRequests, + writeOptions, + writeThreads == null ? readThreads : writeThreads); futures.add(writeOperations.process()); } @@ -97,7 +119,7 @@ public class ForStStateExecutor implements StateExecutor { stateRequestClassifier.pollDbGetRequests(); if (!getRequests.isEmpty()) { ForStGeneralMultiGetOperation getOperations = - new ForStGeneralMultiGetOperation(db, getRequests, workerThreads); + new ForStGeneralMultiGetOperation(db, getRequests, readThreads); futures.add(getOperations.process()); } @@ -105,7 +127,7 @@ public class ForStStateExecutor implements StateExecutor { stateRequestClassifier.pollDbIterRequests(); if (!iterRequests.isEmpty()) { ForStIterateOperation iterOperations = - new ForStIterateOperation(db, iterRequests, workerThreads); + new ForStIterateOperation(db, iterRequests, readThreads); futures.add(iterOperations.process()); } @@ -155,7 +177,10 @@ public class ForStStateExecutor implements StateExecutor { @Override public void shutdown() { - workerThreads.shutdown(); + readThreads.shutdown(); + if (writeThreads != null) { + writeThreads.shutdown(); + } coordinatorThread.shutdown(); LOG.info("Shutting down the ForStStateExecutor."); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java index 00248ed4c7f..df8170962a4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java @@ -46,7 +46,8 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { @Test @SuppressWarnings("unchecked") void testExecuteValueStateRequest() throws Exception { - ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, new WriteOptions()); + ForStStateExecutor forStStateExecutor = + new ForStStateExecutor(3, 1, db, new WriteOptions()); ForStValueState<Integer, VoidNamespace, String> state1 = buildForStValueState("value-state-1"); ForStValueState<Integer, VoidNamespace, String> state2 = @@ -129,7 +130,8 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { @Test void testExecuteMapStateRequest() throws Exception { - ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, new WriteOptions()); + ForStStateExecutor forStStateExecutor = + new ForStStateExecutor(3, 1, db, new WriteOptions()); ForStMapState<Integer, VoidNamespace, String, String> state = buildForStMapState("map-state"); StateRequestContainer stateRequestContainer =