This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 513d54d637 IGNITE-22837 Invocation of the local raft client happens in 
a different pool (#4209)
513d54d637 is described below

commit 513d54d637b64215b7fc04e3992aeb80faee9f66
Author: Vladislav Pyatkov <vldpyat...@gmail.com>
AuthorDate: Fri Aug 9 21:43:14 2024 +0300

    IGNITE-22837 Invocation of the local raft client happens in a different 
pool (#4209)
---
 .../ignite/internal/thread/ThreadOperation.java    |  4 ++-
 .../apache/ignite/internal/util/IgniteUtils.java   | 37 ++++++++++++++++++++++
 .../internal/raft/server/impl/JraftServerImpl.java | 11 +++++--
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       |  8 ++++-
 .../ignite/internal/replicator/ReplicaManager.java | 25 ++-------------
 .../ignite/internal/app/ThreadPoolsManager.java    | 16 +++++++---
 6 files changed, 69 insertions(+), 32 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
index e1bd09ec7a..9cef2d122e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
@@ -28,7 +28,9 @@ public enum ThreadOperation {
     /** Access TX State storage. */
     TX_STATE_STORAGE_ACCESS,
     /** Make a blocking wait (involving taking a lock or waiting on a 
conditional variable or waiting for time to pass. */
-    WAIT;
+    WAIT,
+    /** This permission allows tread process RAFT action request. */
+    PROCESS_RAFT_REQ;
 
     /**
      * Empty list of operations denoting that no potentially blocking/time 
consuming operations are allowed
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 2f9eed2924..9871464625 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -75,6 +75,9 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.internal.thread.ThreadAttributes;
+import org.apache.ignite.internal.thread.ThreadOperation;
 import org.apache.ignite.internal.util.worker.IgniteWorker;
 import org.jetbrains.annotations.Nullable;
 
@@ -1260,4 +1263,38 @@ public class IgniteUtils {
     public static CompletableFuture<Void> stopAsync(ComponentContext 
componentContext, Collection<? extends IgniteComponent> components) {
         return stopAsync(componentContext, components.stream());
     }
+
+    /**
+     * The method checks the list of allowed operations in the current thread 
and returns false if the thread is fit to continue or true if
+     * we must switch to another.
+     *
+     * @param requiredOperationPermissions Set of thread operations that have 
to be supported by the current thread.
+     * @return True if we have to switch to a specific pool, otherwise we can 
continue in the current thread.
+     */
+    public static boolean shouldSwitchToRequestsExecutor(ThreadOperation... 
requiredOperationPermissions) {
+        if (Thread.currentThread() instanceof ThreadAttributes) {
+            ThreadAttributes thread = (ThreadAttributes) 
Thread.currentThread();
+
+            for (ThreadOperation op : requiredOperationPermissions) {
+                if (!thread.allows(op)) {
+                    return true;
+                }
+            }
+
+            return false;
+        } else {
+            if (PublicApiThreading.executingSyncPublicApi()) {
+                // It's a user thread, it executes a sync public API call, so 
it can do anything, no switch is needed.
+                return false;
+            }
+            if (PublicApiThreading.executingAsyncPublicApi()) {
+                // It's a user thread, it executes an async public API call, 
so it cannot do anything, a switch is needed.
+                return true;
+            }
+
+            // It's something else: either a JRE thread or an Ignite thread 
not marked with ThreadAttributes. As we are not sure,
+            // let's switch: false negative can produce assertion errors.
+            return true;
+        }
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 2c0ee7a116..54a6c9982f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.raft.server.impl;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static 
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -39,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.stream.IntStream;
@@ -65,6 +68,7 @@ import 
org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
 import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.Iterator;
 import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -264,7 +268,10 @@ public class JraftServerImpl implements RaftServer {
             opts.setSnapshotTimer(JRaftUtils.createTimer(opts, 
"JRaft-SnapshotTimer"));
         }
 
-        requestExecutor = JRaftUtils.createRequestExecutor(opts);
+        requestExecutor = Executors.newFixedThreadPool(
+                opts.getRaftRpcThreadPoolSize(),
+                IgniteThreadFactory.create(opts.getServerName(), 
"JRaft-Request-Processor", LOG, PROCESS_RAFT_REQ)
+        );
 
         rpcServer = new IgniteRpcServer(
                 service,
@@ -397,7 +404,7 @@ public class JraftServerImpl implements RaftServer {
             
ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
         }
 
-        ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor);
+        IgniteUtils.shutdownAndAwaitTermination(requestExecutor, 10, SECONDS);
 
         return nullCompletedFuture();
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 6a676223d0..f5b397deea 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.raft.jraft.rpc.impl;
 
+import static 
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -175,7 +177,11 @@ public class IgniteRpcServer implements RpcServer<Void> {
             RpcProcessor<NetworkMessage> finalPrc = prc;
 
             try {
-                executor.execute(() -> finalPrc.handleRequest(new 
NetworkRpcContext(executor, sender, correlationId), message));
+                if (shouldSwitchToRequestsExecutor(PROCESS_RAFT_REQ)) {
+                    executor.execute(() -> finalPrc.handleRequest(new 
NetworkRpcContext(executor, sender, correlationId), message));
+                } else {
+                    finalPrc.handleRequest(new NetworkRpcContext(executor, 
sender, correlationId), message);
+                }
             } catch (RejectedExecutionException e) {
                 // The rejection is ok if an executor has been stopped, 
otherwise it shouldn't happen.
                 LOG.warn("A request execution was rejected [sender={} req={} 
reason={}]", sender, S.toString(message), e.getMessage());
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b54ed44ff1..9ca9bb1edd 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -34,6 +34,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSucc
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.io.IOException;
@@ -111,8 +112,6 @@ import 
org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.PublicApiThreading;
-import org.apache.ignite.internal.thread.ThreadAttributes;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
@@ -355,33 +354,13 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         // If the request actually came from the network, we are already in 
the correct thread that has permissions to do storage reads
         // and writes.
         // But if this is a local call (in the same Ignite instance), we might 
still be in a thread that does not have those permissions.
-        if (shouldSwitchToRequestsExecutor()) {
+        if (shouldSwitchToRequestsExecutor(STORAGE_READ, STORAGE_WRITE, 
TX_STATE_STORAGE_ACCESS)) {
             requestsExecutor.execute(() -> handleReplicaRequest(request, 
sender, correlationId));
         } else {
             handleReplicaRequest(request, sender, correlationId);
         }
     }
 
-    private static boolean shouldSwitchToRequestsExecutor() {
-        if (Thread.currentThread() instanceof ThreadAttributes) {
-            ThreadAttributes thread = (ThreadAttributes) 
Thread.currentThread();
-            return !thread.allows(STORAGE_READ) || 
!thread.allows(STORAGE_WRITE) || !thread.allows(TX_STATE_STORAGE_ACCESS);
-        } else {
-            if (PublicApiThreading.executingSyncPublicApi()) {
-                // It's a user thread, it executes a sync public API call, so 
it can do anything, no switch is needed.
-                return false;
-            }
-            if (PublicApiThreading.executingAsyncPublicApi()) {
-                // It's a user thread, it executes an async public API call, 
so it cannot do anything, a switch is needed.
-                return true;
-            }
-
-            // It's something else: either a JRE thread or an Ignite thread 
not marked with ThreadAttributes. As we are not sure,
-            // let's switch: false negative can produce assertion errors.
-            return true;
-        }
-    }
-
     private void handleReplicaRequest(ReplicaRequest request, ClusterNode 
sender, @Nullable Long correlationId) {
         if (!busyLock.enterBusy()) {
             if (LOG.isInfoEnabled()) {
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index 0831a23d3a..ce11befebe 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.app;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static 
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS;
@@ -72,12 +73,17 @@ public class ThreadPoolsManager implements IgniteComponent {
                 IgniteThreadFactory.create(nodeName, "tableManager-io", LOG, 
STORAGE_READ, STORAGE_WRITE));
 
         int partitionsOperationsThreads = Math.min(cpus * 3, 25);
-        partitionOperationsExecutor = new ThreadPoolExecutor(
+        partitionOperationsExecutor = Executors.newFixedThreadPool(
                 partitionsOperationsThreads,
-                partitionsOperationsThreads,
-                0, SECONDS,
-                new LinkedBlockingQueue<>(),
-                IgniteThreadFactory.create(nodeName, "partition-operations", 
LOG, STORAGE_READ, STORAGE_WRITE, TX_STATE_STORAGE_ACCESS)
+                IgniteThreadFactory.create(
+                        nodeName,
+                        "partition-operations",
+                        LOG,
+                        STORAGE_READ,
+                        STORAGE_WRITE,
+                        TX_STATE_STORAGE_ACCESS,
+                        PROCESS_RAFT_REQ
+                )
         );
 
         commonScheduler = 
Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, 
"common-scheduler", LOG));

Reply via email to