This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 513d54d637 IGNITE-22837 Invocation of the local raft client happens in a different pool (#4209) 513d54d637 is described below commit 513d54d637b64215b7fc04e3992aeb80faee9f66 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Fri Aug 9 21:43:14 2024 +0300 IGNITE-22837 Invocation of the local raft client happens in a different pool (#4209) --- .../ignite/internal/thread/ThreadOperation.java | 4 ++- .../apache/ignite/internal/util/IgniteUtils.java | 37 ++++++++++++++++++++++ .../internal/raft/server/impl/JraftServerImpl.java | 11 +++++-- .../raft/jraft/rpc/impl/IgniteRpcServer.java | 8 ++++- .../ignite/internal/replicator/ReplicaManager.java | 25 ++------------- .../ignite/internal/app/ThreadPoolsManager.java | 16 +++++++--- 6 files changed, 69 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java index e1bd09ec7a..9cef2d122e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java @@ -28,7 +28,9 @@ public enum ThreadOperation { /** Access TX State storage. */ TX_STATE_STORAGE_ACCESS, /** Make a blocking wait (involving taking a lock or waiting on a conditional variable or waiting for time to pass. */ - WAIT; + WAIT, + /** This permission allows tread process RAFT action request. */ + PROCESS_RAFT_REQ; /** * Empty list of operations denoting that no potentially blocking/time consuming operations are allowed diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 2f9eed2924..9871464625 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -75,6 +75,9 @@ import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.thread.PublicApiThreading; +import org.apache.ignite.internal.thread.ThreadAttributes; +import org.apache.ignite.internal.thread.ThreadOperation; import org.apache.ignite.internal.util.worker.IgniteWorker; import org.jetbrains.annotations.Nullable; @@ -1260,4 +1263,38 @@ public class IgniteUtils { public static CompletableFuture<Void> stopAsync(ComponentContext componentContext, Collection<? extends IgniteComponent> components) { return stopAsync(componentContext, components.stream()); } + + /** + * The method checks the list of allowed operations in the current thread and returns false if the thread is fit to continue or true if + * we must switch to another. + * + * @param requiredOperationPermissions Set of thread operations that have to be supported by the current thread. + * @return True if we have to switch to a specific pool, otherwise we can continue in the current thread. + */ + public static boolean shouldSwitchToRequestsExecutor(ThreadOperation... requiredOperationPermissions) { + if (Thread.currentThread() instanceof ThreadAttributes) { + ThreadAttributes thread = (ThreadAttributes) Thread.currentThread(); + + for (ThreadOperation op : requiredOperationPermissions) { + if (!thread.allows(op)) { + return true; + } + } + + return false; + } else { + if (PublicApiThreading.executingSyncPublicApi()) { + // It's a user thread, it executes a sync public API call, so it can do anything, no switch is needed. + return false; + } + if (PublicApiThreading.executingAsyncPublicApi()) { + // It's a user thread, it executes an async public API call, so it cannot do anything, a switch is needed. + return true; + } + + // It's something else: either a JRE thread or an Ignite thread not marked with ThreadAttributes. As we are not sure, + // let's switch: false negative can produce assertion errors. + return true; + } + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 2c0ee7a116..54a6c9982f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.raft.server.impl; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -39,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.stream.IntStream; @@ -65,6 +68,7 @@ import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.IgniteThreadFactory; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.raft.jraft.Closure; import org.apache.ignite.raft.jraft.Iterator; import org.apache.ignite.raft.jraft.JRaftUtils; @@ -264,7 +268,10 @@ public class JraftServerImpl implements RaftServer { opts.setSnapshotTimer(JRaftUtils.createTimer(opts, "JRaft-SnapshotTimer")); } - requestExecutor = JRaftUtils.createRequestExecutor(opts); + requestExecutor = Executors.newFixedThreadPool( + opts.getRaftRpcThreadPoolSize(), + IgniteThreadFactory.create(opts.getServerName(), "JRaft-Request-Processor", LOG, PROCESS_RAFT_REQ) + ); rpcServer = new IgniteRpcServer( service, @@ -397,7 +404,7 @@ public class JraftServerImpl implements RaftServer { ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor()); } - ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor); + IgniteUtils.shutdownAndAwaitTermination(requestExecutor, 10, SECONDS); return nullCompletedFuture(); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java index 6a676223d0..f5b397deea 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java @@ -16,6 +16,8 @@ */ package org.apache.ignite.raft.jraft.rpc.impl; +import static org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ; +import static org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -175,7 +177,11 @@ public class IgniteRpcServer implements RpcServer<Void> { RpcProcessor<NetworkMessage> finalPrc = prc; try { - executor.execute(() -> finalPrc.handleRequest(new NetworkRpcContext(executor, sender, correlationId), message)); + if (shouldSwitchToRequestsExecutor(PROCESS_RAFT_REQ)) { + executor.execute(() -> finalPrc.handleRequest(new NetworkRpcContext(executor, sender, correlationId), message)); + } else { + finalPrc.handleRequest(new NetworkRpcContext(executor, sender, correlationId), message); + } } catch (RejectedExecutionException e) { // The rejection is ok if an executor has been stopped, otherwise it shouldn't happen. LOG.warn("A request execution was rejected [sender={} req={} reason={}]", sender, S.toString(message), e.getMessage()); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index b54ed44ff1..9ca9bb1edd 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -34,6 +34,7 @@ import static org.apache.ignite.internal.util.CompletableFutures.isCompletedSucc import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; +import static org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import java.io.IOException; @@ -111,8 +112,6 @@ import org.apache.ignite.internal.replicator.message.TimestampAware; import org.apache.ignite.internal.thread.ExecutorChooser; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.thread.NamedThreadFactory; -import org.apache.ignite.internal.thread.PublicApiThreading; -import org.apache.ignite.internal.thread.ThreadAttributes; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.ClusterNode; @@ -355,33 +354,13 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc // If the request actually came from the network, we are already in the correct thread that has permissions to do storage reads // and writes. // But if this is a local call (in the same Ignite instance), we might still be in a thread that does not have those permissions. - if (shouldSwitchToRequestsExecutor()) { + if (shouldSwitchToRequestsExecutor(STORAGE_READ, STORAGE_WRITE, TX_STATE_STORAGE_ACCESS)) { requestsExecutor.execute(() -> handleReplicaRequest(request, sender, correlationId)); } else { handleReplicaRequest(request, sender, correlationId); } } - private static boolean shouldSwitchToRequestsExecutor() { - if (Thread.currentThread() instanceof ThreadAttributes) { - ThreadAttributes thread = (ThreadAttributes) Thread.currentThread(); - return !thread.allows(STORAGE_READ) || !thread.allows(STORAGE_WRITE) || !thread.allows(TX_STATE_STORAGE_ACCESS); - } else { - if (PublicApiThreading.executingSyncPublicApi()) { - // It's a user thread, it executes a sync public API call, so it can do anything, no switch is needed. - return false; - } - if (PublicApiThreading.executingAsyncPublicApi()) { - // It's a user thread, it executes an async public API call, so it cannot do anything, a switch is needed. - return true; - } - - // It's something else: either a JRE thread or an Ignite thread not marked with ThreadAttributes. As we are not sure, - // let's switch: false negative can produce assertion errors. - return true; - } - } - private void handleReplicaRequest(ReplicaRequest request, ClusterNode sender, @Nullable Long correlationId) { if (!busyLock.enterBusy()) { if (LOG.isInfoEnabled()) { diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java index 0831a23d3a..ce11befebe 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.app; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; import static org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS; @@ -72,12 +73,17 @@ public class ThreadPoolsManager implements IgniteComponent { IgniteThreadFactory.create(nodeName, "tableManager-io", LOG, STORAGE_READ, STORAGE_WRITE)); int partitionsOperationsThreads = Math.min(cpus * 3, 25); - partitionOperationsExecutor = new ThreadPoolExecutor( + partitionOperationsExecutor = Executors.newFixedThreadPool( partitionsOperationsThreads, - partitionsOperationsThreads, - 0, SECONDS, - new LinkedBlockingQueue<>(), - IgniteThreadFactory.create(nodeName, "partition-operations", LOG, STORAGE_READ, STORAGE_WRITE, TX_STATE_STORAGE_ACCESS) + IgniteThreadFactory.create( + nodeName, + "partition-operations", + LOG, + STORAGE_READ, + STORAGE_WRITE, + TX_STATE_STORAGE_ACCESS, + PROCESS_RAFT_REQ + ) ); commonScheduler = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, "common-scheduler", LOG));