This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c92cf4262c IGNITE-26002 Network thread even distribution (#6370)
4c92cf4262c is described below

commit 4c92cf4262ccad9297196e26da98ab9065ccf961
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Aug 8 19:10:56 2025 +0300

    IGNITE-26002 Network thread even distribution (#6370)
---
 .../ignite/client/handler/ClientHandlerModule.java |  16 +-
 .../handler/ClientInboundMessageHandler.java       |  17 +-
 .../ignite/client/TestClientHandlerModule.java     |   3 +-
 .../internal/network/NettyBootstrapFactory.java    |  18 +-
 .../handshake/HandshakeEventLoopSwitcher.java      | 205 +++++++++++++++++++++
 .../network/netty/ChannelEventLoopsSource.java     |  33 ----
 .../ignite/internal/network/netty/ChannelKey.java  |   2 +-
 .../internal/network/netty/ConnectionManager.java  |  13 +-
 .../network/recovery/HandshakeManagerUtils.java    |  57 ------
 .../recovery/RecoveryAcceptorHandshakeManager.java |  12 +-
 .../RecoveryInitiatorHandshakeManager.java         |  11 +-
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   3 +-
 .../network/DefaultMessagingServiceTest.java       |   2 +-
 .../handshake/NoOpHandshakeEventLoopSwitcher.java  |  56 ++++++
 .../network/netty/RecoveryHandshakeTest.java       |   6 +-
 .../RecoveryAcceptorHandshakeManagerTest.java      |   4 +-
 .../RecoveryInitiatorHandshakeManagerTest.java     |   4 +-
 17 files changed, 327 insertions(+), 135 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 52d5f292868..9af9a4f672d 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.ssl.SslContextProvider;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.schema.SchemaSyncService;
@@ -346,7 +347,11 @@ public class ClientHandlerModule implements 
IgniteComponent, PlatformComputeTran
                                 ch.pipeline().addFirst("ssl", 
sslContext.newHandler(ch.alloc()));
                             }
 
-                            ClientInboundMessageHandler messageHandler = 
createInboundMessageHandler(configuration, connectionId);
+                            ClientInboundMessageHandler messageHandler = 
createInboundMessageHandler(
+                                    
bootstrapFactory.handshakeEventLoopSwitcher(),
+                                    configuration,
+                                    connectionId
+                            );
 
                             //noinspection TestOnlyProblems
                             handler = messageHandler;
@@ -423,7 +428,11 @@ public class ClientHandlerModule implements 
IgniteComponent, PlatformComputeTran
         return result;
     }
 
-    private ClientInboundMessageHandler 
createInboundMessageHandler(ClientConnectorView configuration, long 
connectionId) {
+    private ClientInboundMessageHandler createInboundMessageHandler(
+            HandshakeEventLoopSwitcher handshakeEventLoopSwitcher,
+            ClientConnectorView configuration,
+            long connectionId
+    ) {
         return new ClientInboundMessageHandler(
                 igniteTables,
                 txManager,
@@ -442,7 +451,8 @@ public class ClientHandlerModule implements 
IgniteComponent, PlatformComputeTran
                 partitionOperationsExecutor,
                 SUPPORTED_FEATURES,
                 Map.of(),
-                computeExecutors::remove
+                computeExecutors::remove,
+                handshakeEventLoopSwitcher
         );
     }
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 89719ac82a5..0851d81dbd1 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -140,6 +140,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.IgniteClusterImpl;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.schema.SchemaVersionMismatchException;
@@ -259,6 +260,8 @@ public class ClientInboundMessageHandler
 
     private final Map<Long, CompletableFuture<ClientMessageUnpacker>> 
serverToClientRequests = new ConcurrentHashMap<>();
 
+    private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
+
     /**
      * Constructor.
      *
@@ -297,7 +300,8 @@ public class ClientInboundMessageHandler
             Executor partitionOperationsExecutor,
             BitSet features,
             Map<HandshakeExtension, Object> extensions,
-            Function<String, CompletableFuture<PlatformComputeConnection>> 
computeConnectionFunc
+            Function<String, CompletableFuture<PlatformComputeConnection>> 
computeConnectionFunc,
+            HandshakeEventLoopSwitcher handshakeEventLoopSwitcher
     ) {
         assert igniteTables != null;
         assert txManager != null;
@@ -329,6 +333,7 @@ public class ClientInboundMessageHandler
         this.clockService = clockService;
         this.primaryReplicaTracker = primaryReplicaTracker;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
+        this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
 
         jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
         jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(
@@ -460,16 +465,16 @@ public class ClientInboundMessageHandler
                 return;
             }
 
-            authenticationManager
-                    
.authenticateAsync(createAuthenticationRequest(clientHandshakeExtensions))
-                    .handleAsync((user, err) -> {
+            AuthenticationRequest<?, ?> authReq = 
createAuthenticationRequest(clientHandshakeExtensions);
+
+            
handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channelHandlerContext.channel())
+                    .thenCompose(unused -> 
authenticationManager.authenticateAsync(authReq))
+                    .whenCompleteAsync((user, err) -> {
                         if (err != null) {
                             handshakeError(ctx, packer, err);
                         } else {
                             handshakeSuccess(ctx, packer, user, 
clientFeatures, clientVer, clientCode);
                         }
-
-                        return null;
                     }, ctx.executor());
         } catch (Throwable t) {
             handshakeError(ctx, packer, t);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index f25c24f6381..1164e4e78f6 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -270,7 +270,8 @@ public class TestClientHandlerModule implements 
IgniteComponent {
                                         Runnable::run,
                                         features,
                                         randomExtensions(),
-                                        unused -> null
+                                        unused -> null,
+                                        
bootstrapFactory.handshakeEventLoopSwitcher()
                                 )
                         );
                     }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
index 257b493ff34..f2bcff10105 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
@@ -38,7 +38,7 @@ import 
org.apache.ignite.internal.network.configuration.InboundView;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.configuration.OutboundView;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
 import 
org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup.NetworkThread;
 import org.jetbrains.annotations.TestOnly;
@@ -46,7 +46,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Netty bootstrap factory. Holds shared {@link EventLoopGroup} instances and 
encapsulates common Netty {@link Bootstrap} creation logic.
  */
-public class NettyBootstrapFactory implements IgniteComponent, 
ChannelEventLoopsSource {
+public class NettyBootstrapFactory implements IgniteComponent {
     /** Network configuration. */
     private final NetworkConfiguration networkConfiguration;
 
@@ -59,8 +59,7 @@ public class NettyBootstrapFactory implements 
IgniteComponent, ChannelEventLoops
     /** Work socket channel handler event loop group (this group does network 
I/O). */
     private EventLoopGroup workerGroup;
 
-    /** All event loops with which {@link io.netty.channel.Channel}s might be 
registered. */
-    private volatile List<EventLoop> channelEventLoops;
+    private volatile HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
 
     /**
      * Constructor.
@@ -153,11 +152,15 @@ public class NettyBootstrapFactory implements 
IgniteComponent, ChannelEventLoops
         bossGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + 
"-network-accept");
         workerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + 
"-network-worker");
 
-        this.channelEventLoops = List.copyOf(eventLoopsAt(workerGroup));
+        this.handshakeEventLoopSwitcher = new 
HandshakeEventLoopSwitcher(eventLoopsAt(workerGroup));
 
         return nullCompletedFuture();
     }
 
+    public HandshakeEventLoopSwitcher handshakeEventLoopSwitcher() {
+        return handshakeEventLoopSwitcher;
+    }
+
     private static List<EventLoop> eventLoopsAt(EventLoopGroup ... groups) {
         List<EventLoop> channelEventLoops = new ArrayList<>();
 
@@ -198,11 +201,6 @@ public class NettyBootstrapFactory implements 
IgniteComponent, ChannelEventLoops
         return nullCompletedFuture();
     }
 
-    @Override
-    public List<EventLoop> channelEventLoops() {
-        return channelEventLoops;
-    }
-
     /**
      * Returns worker event loop group.
      */
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeEventLoopSwitcher.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeEventLoopSwitcher.java
new file mode 100644
index 00000000000..2d0a4c6c726
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeEventLoopSwitcher.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import io.netty.channel.EventLoop;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.netty.ChannelKey;
+import org.apache.ignite.network.ClusterNode;
+import reactor.util.annotation.Nullable;
+
+/**
+ * A class responsible for managing the assignment of Netty channels to event 
loops
+ * and switching channels between event loops when necessary.
+ */
+public class HandshakeEventLoopSwitcher {
+    private static final IgniteLogger LOG = 
Loggers.forClass(HandshakeEventLoopSwitcher.class);
+
+    /** List of available event loops. */
+    private final List<EventLoop> executors;
+
+    /** Map to track the number of channels assigned to each event loop. */
+    private final Map<Integer, Set<ChannelId>> activeChannelMap;
+
+    /**
+     * Map to track channel reservations for specific communication 
connections.
+     * The map prevents applying different event loops for the same chаnell  
key.
+     */
+    private final Map<ChannelKey, Integer> channelReservationMap = new 
HashMap<>();
+
+    /**
+     * Constructs a new instance of HandshakeEventLoopSwitcher.
+     *
+     * @param eventLoops The list of event loops to manage.
+     */
+    public HandshakeEventLoopSwitcher(List<EventLoop> eventLoops) {
+        this.executors = eventLoops;
+        this.activeChannelMap = new HashMap<>(eventLoops.size());
+    }
+
+    /**
+     * Switches the event loop of a given channel if needed.
+     *
+     * @param channel The channel to potentially switch to a different event 
loop.
+     * @return A CompletableFuture that completes when the event loop is 
switched. The future completes in the target event loop.
+     */
+    public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel) {
+        return switchEventLoopIfNeeded(channel, null);
+    }
+
+    /**
+     * Switches the event loop of a given channel if needed.
+     *
+     * @param channel The channel to potentially switch to a different event 
loop.
+     * @param channelKey The unique key identifying the channel. That is a 
logical identifier of the connection between two nodes
+     *         (it would be {@code null} for a client-server connection). The 
purpose is to have strict message ordering in a reliable
+     *         connection.
+     * @return A CompletableFuture that completes when the event loop is 
switched. The future completes in the target event loop.
+     */
+    public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel, 
@Nullable ChannelKey channelKey) {
+        ChannelId channelId = channel.id();
+
+        EventLoop targetEventLoop = eventLoopForKey(channelId, channelKey);
+
+        if (targetEventLoop != channel.eventLoop()) {
+            CompletableFuture<Void> fut = new CompletableFuture<>();
+
+            channel.deregister().addListener(deregistrationFuture -> {
+                if (!deregistrationFuture.isSuccess()) {
+                    LOG.error("Cannot deregister a channel from an event 
loop", deregistrationFuture.cause());
+
+                    fut.completeExceptionally(deregistrationFuture.cause());
+
+                    channel.close();
+
+                    return;
+                }
+
+                
targetEventLoop.register(channel).addListener(registrationFuture -> {
+                    if (!registrationFuture.isSuccess()) {
+                        LOG.error("Cannot register a channel with an event 
loop", registrationFuture.cause());
+
+                        
fut.completeExceptionally(deregistrationFuture.cause());
+
+                        channel.close();
+
+                        return;
+                    }
+
+                    channel.closeFuture().addListener(future -> {
+                        channelUnregistered(channelId);
+                    });
+
+                    fut.complete(null);
+                });
+            });
+
+            return fut;
+        }
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Determines the appropriate event loop for a given channel key.
+     *
+     * @param channelId The ID of the channel.
+     * @param channelKey The unique key identifying the channel.
+     * @return The selected event loop for the channel.
+     */
+    private synchronized EventLoop eventLoopForKey(ChannelId channelId, 
@Nullable ChannelKey channelKey) {
+        if (channelKey != null) {
+            Integer idx = channelReservationMap.get(channelKey);
+
+            if (idx != null) {
+                return executors.get(idx);
+            }
+        }
+
+        int minCnt = Integer.MAX_VALUE;
+        int index = 0;
+
+        for (int i = 0; i < executors.size(); i++) {
+            Set<ChannelId> channelIds = activeChannelMap.getOrDefault(i, 
Set.of());
+
+            if (channelIds.contains(channelId)) {
+                assert channelKey == null : "Channel key should be null if the 
channel is already registered in the event loop";
+
+                return executors.get(index);
+            }
+
+            int cnt = channelIds.size();
+
+            if (cnt < minCnt) {
+                minCnt = cnt;
+                index = i;
+
+                if (cnt == 0) {
+                    // If we found an event loop with no channels, we can stop 
searching.
+                    break;
+                }
+            }
+        }
+
+        activeChannelMap.computeIfAbsent(index, key -> new 
HashSet<>()).add(channelId);
+
+        if (channelKey != null) {
+            channelReservationMap.put(channelKey, index);
+        }
+
+        EventLoop eventLoop = executors.get(index);
+
+        LOG.debug("Channel mapped to the loop [channelId={}, channelKey={}, 
eventLoopIndex={}]", channelId, channelKey, index);
+
+        return eventLoop;
+    }
+
+    /**
+     * Removes a channel from the event loop tracking map when it is 
unregistered.
+     *
+     * @param channelId The unique ID identifying the channel to unregister.
+     */
+    private synchronized void channelUnregistered(ChannelId channelId) {
+        for (Set<ChannelId> channelKeys : activeChannelMap.values()) {
+            if (channelKeys.remove(channelId)) {
+                break;
+            }
+        }
+    }
+
+    /**
+     * Removes all channels associated with a node from the event loop 
tracking map
+     * when the node leaves the topology.
+     *
+     * @param node The node that left the topology.
+     */
+    public synchronized void nodeLeftTopology(ClusterNode node) {
+        channelReservationMap.entrySet().removeIf(entry -> 
entry.getKey().launchId().equals(node.id()));
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
deleted file mode 100644
index a499eb5843e..00000000000
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.netty;
-
-import io.netty.channel.EventLoop;
-import java.util.List;
-
-/**
- * Allows to obtain a list of all event loops with which {@link 
io.netty.channel.Channel}s might be registered.
- */
-@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
-public interface ChannelEventLoopsSource {
-    /**
-     * Returns list of all event loops with which {@link 
io.netty.channel.Channel}s might be registered.
-     * This must always return the same event loops in the same order.
-     */
-    List<EventLoop> channelEventLoops();
-}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
index c13252fe78f..fc568a04bc2 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
@@ -43,7 +43,7 @@ public class ChannelKey {
         this.connectionId = connectionId;
     }
 
-    UUID launchId() {
+    public UUID launchId() {
         return launchId;
     }
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 165cc3c28f2..3f8ca85d15b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -530,7 +530,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
                     localNode,
                     connectionId,
                     descriptorProvider,
-                    bootstrapFactory,
+                    bootstrapFactory.handshakeEventLoopSwitcher(),
                     staleIdDetector,
                     clusterIdSupplier,
                     this,
@@ -554,7 +554,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
                 localNodeFuture.join(),
                 FACTORY,
                 descriptorProvider,
-                bootstrapFactory,
+                bootstrapFactory.handshakeEventLoopSwitcher(),
                 staleIdDetector,
                 clusterIdSupplier,
                 this,
@@ -632,11 +632,14 @@ public class ConnectionManager implements 
ChannelCreationListener {
      * it's ID that gets regenerated at each node restart) and recovery 
descriptors corresponding to it.
      *
      * @param id ID of the node (it must have already left the topology).
+     * @return Future that completes when all the channels and descriptors are 
closed.
      */
-    public void handleNodeLeft(UUID id) {
+    public CompletableFuture<Void> handleNodeLeft(UUID id) {
         // We rely on the fact that the node with the given ID has already 
left the physical topology.
         assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
 
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
         // TODO: IGNITE-21207 - remove descriptors for good.
 
         connectionMaintenanceExecutor.execute(
@@ -644,8 +647,12 @@ public class ConnectionManager implements 
ChannelCreationListener {
                     // Closing descriptors separately (as some of them might 
not have an operating channel attached, but they
                     // still might have unacknowledged messages/futures).
                     disposeRecoveryDescriptorsOfLeftNode(id);
+
+                    future.complete(null);
                 }, connectionMaintenanceExecutor)
         );
+
+        return future;
     }
 
     private CompletableFuture<Void> closeChannelsWith(UUID id) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index b205c0010ff..4266f5070ce 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -18,12 +18,9 @@
 package org.apache.ignite.internal.network.recovery;
 
 import static java.util.Collections.emptyList;
-import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoop;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -31,8 +28,6 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import org.apache.ignite.internal.network.message.ClusterNodeMessage;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
-import org.apache.ignite.internal.network.netty.ChannelKey;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.netty.NettyUtils;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -81,58 +76,6 @@ class HandshakeManagerUtils {
         });
     }
 
-    /**
-     * Moves a channel from its current event loop to the event loop 
corresponding to the channel key. This is needed
-     * because all channels in the same logical connection must be served by 
the same thread.
-     *
-     * @param channel Channel to move.
-     * @param channelKey Key of the logical connection.
-     * @param eventLoopsSource Used to get all event loops with which a 
channel might be registered.
-     * @param afterSwitching Action to execute after switching (it will be 
executed on the new event loop).
-     */
-    static void switchEventLoopIfNeeded(
-            Channel channel,
-            ChannelKey channelKey,
-            ChannelEventLoopsSource eventLoopsSource,
-            Runnable afterSwitching
-    ) {
-        EventLoop targetEventLoop = eventLoopForKey(channelKey, 
eventLoopsSource);
-
-        if (targetEventLoop != channel.eventLoop()) {
-            channel.deregister().addListener(deregistrationFuture -> {
-                if (!deregistrationFuture.isSuccess()) {
-                    LOG.error("Cannot deregister a channel from an event 
loop", deregistrationFuture.cause());
-
-                    channel.close();
-
-                    return;
-                }
-
-                
targetEventLoop.register(channel).addListener(registrationFuture -> {
-                    if (!registrationFuture.isSuccess()) {
-                        LOG.error("Cannot register a channel with an event 
loop", registrationFuture.cause());
-
-                        channel.close();
-
-                        return;
-                    }
-
-                    afterSwitching.run();
-                });
-            });
-        } else {
-            afterSwitching.run();
-        }
-    }
-
-    private static EventLoop eventLoopForKey(ChannelKey channelKey, 
ChannelEventLoopsSource eventLoopsSource) {
-        List<EventLoop> eventLoops = eventLoopsSource.channelEventLoops();
-
-        int index = safeAbs(channelKey.hashCode()) % eventLoops.size();
-
-        return eventLoops.get(index);
-    }
-
     static ClusterNodeMessage clusterNodeToMessage(ClusterNode node) {
         return MESSAGE_FACTORY.clusterNodeMessage()
                 .id(node.id())
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
index b66752fc783..c8109e3e8b6 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.network.recovery;
 
 import static java.util.Collections.emptyList;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.switchEventLoopIfNeeded;
 import static 
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
 
 import io.netty.channel.Channel;
@@ -37,10 +36,10 @@ import org.apache.ignite.internal.network.ClusterIdSupplier;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
 import org.apache.ignite.internal.network.netty.ChannelKey;
 import org.apache.ignite.internal.network.netty.HandshakeHandler;
 import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -92,7 +91,7 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
     /** Recovery descriptor provider. */
     private final RecoveryDescriptorProvider recoveryDescriptorProvider;
 
-    private final ChannelEventLoopsSource channelEventLoopsSource;
+    private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
 
     /** Used to detect that a peer uses a stale ID. */
     private final StaleIdDetector staleIdDetector;
@@ -119,7 +118,7 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
             ClusterNode localNode,
             NetworkMessagesFactory messageFactory,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
-            ChannelEventLoopsSource channelEventLoopsSource,
+            HandshakeEventLoopSwitcher handshakeEventLoopSwitcher,
             StaleIdDetector staleIdDetector,
             ClusterIdSupplier clusterIdSupplier,
             ChannelCreationListener channelCreationListener,
@@ -129,7 +128,7 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
         this.localNode = localNode;
         this.messageFactory = messageFactory;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
-        this.channelEventLoopsSource = channelEventLoopsSource;
+        this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
         this.staleIdDetector = staleIdDetector;
         this.clusterIdSupplier = clusterIdSupplier;
         this.stopping = stopping;
@@ -229,7 +228,8 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
         this.remoteChannelId = message.connectionId();
 
         ChannelKey channelKey = new ChannelKey(remoteNode.name(), 
remoteNode.id(), remoteChannelId);
-        switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource, 
() -> tryAcquireDescriptorAndFinishHandshake(message));
+        handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channel, channelKey)
+                .thenRun(() -> 
tryAcquireDescriptorAndFinishHandshake(message));
     }
 
     private boolean 
possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessage message) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index 72e88cf9f0d..8c1e6d14263 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -22,7 +22,6 @@ import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
 import static 
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.clusterNodeToMessage;
-import static 
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.switchEventLoopIfNeeded;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -43,10 +42,10 @@ import 
org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
 import org.apache.ignite.internal.network.netty.ChannelKey;
 import org.apache.ignite.internal.network.netty.HandshakeHandler;
 import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -77,7 +76,7 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
     /** Recovery descriptor provider. */
     private final RecoveryDescriptorProvider recoveryDescriptorProvider;
 
-    private final ChannelEventLoopsSource channelEventLoopsSource;
+    private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
 
     /** Used to detect that a peer uses a stale ID. */
     private final StaleIdDetector staleIdDetector;
@@ -127,7 +126,7 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
             ClusterNode localNode,
             short connectionId,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
-            ChannelEventLoopsSource channelEventLoopsSource,
+            HandshakeEventLoopSwitcher handshakeEventLoopSwitcher,
             StaleIdDetector staleIdDetector,
             ClusterIdSupplier clusterIdSupplier,
             ChannelCreationListener channelCreationListener,
@@ -137,7 +136,7 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
         this.localNode = localNode;
         this.connectionId = connectionId;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
-        this.channelEventLoopsSource = channelEventLoopsSource;
+        this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
         this.staleIdDetector = staleIdDetector;
         this.clusterIdSupplier = clusterIdSupplier;
         this.stopping = stopping;
@@ -265,7 +264,7 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
         this.remoteNode = handshakeStartMessage.serverNode().asClusterNode();
 
         ChannelKey channelKey = new ChannelKey(remoteNode.name(), 
remoteNode.id(), connectionId);
-        switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource, 
() -> proceedAfterSavingIds(handshakeStartMessage));
+        handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channel, 
channelKey).thenRun(() -> proceedAfterSavingIds(handshakeStartMessage));
     }
 
     private void proceedAfterSavingIds(HandshakeStartMessage 
handshakeStartMessage) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
index 34e5b619599..3f535717ecb 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -174,7 +174,8 @@ public class ScaleCubeClusterServiceFactory {
                 topologyService.addEventHandler(new TopologyEventHandler() {
                     @Override
                     public void onDisappeared(ClusterNode member) {
-                        connectionMgr.handleNodeLeft(member.id());
+                        connectionMgr.handleNodeLeft(member.id()).thenRun(() ->
+                                
nettyBootstrapFactory.handshakeEventLoopSwitcher().nodeLeftTopology(member));
                     }
                 });
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index aac8acee17e..a8690bcd3d4 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -664,7 +664,7 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
                         localNode,
                         connectionId,
                         recoveryDescriptorProvider,
-                        bootstrapFactory,
+                        bootstrapFactory.handshakeEventLoopSwitcher(),
                         staleIdDetector,
                         clusterIdSupplier,
                         channel -> {},
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/handshake/NoOpHandshakeEventLoopSwitcher.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/handshake/NoOpHandshakeEventLoopSwitcher.java
new file mode 100644
index 00000000000..4318eb5ad6a
--- /dev/null
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/handshake/NoOpHandshakeEventLoopSwitcher.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+import io.netty.channel.Channel;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.netty.ChannelKey;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A no-operation implementation of the HandshakeEventLoopSwitcher.
+ * This class provides empty or default implementations for all methods,
+ * effectively disabling any event loop switching behavior.
+ */
+public class NoOpHandshakeEventLoopSwitcher extends HandshakeEventLoopSwitcher 
{
+    /**
+     * Constructs a new instance of NoOpHandshakeEventLoopSwitcher.
+     * This implementation does not require any event loops because it never 
switches an event loop,
+     * so an empty list is passed to the superclass.
+     */
+    public NoOpHandshakeEventLoopSwitcher() {
+        super(List.of());
+    }
+
+    @Override
+    public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel) {
+        return CompletableFutures.nullCompletedFuture();
+    }
+
+    @Override
+    public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel, 
ChannelKey channelKey) {
+        return CompletableFutures.nullCompletedFuture();
+    }
+
+    @Override
+    public synchronized void nodeLeftTopology(ClusterNode node) {
+        // No operation for NoOpHandshakeEventLoopSwitcher
+    }
+}
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index be8e54a4f0f..a10306d3dac 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -29,7 +29,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.embedded.EmbeddedChannel;
 import java.util.Collections;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import 
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.messages.TestMessage;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
@@ -765,7 +765,7 @@ public class RecoveryHandshakeTest extends 
BaseIgniteAbstractTest {
                 new ClusterNodeImpl(launchId, consistentId, new 
NetworkAddress(INITIATOR_HOST, PORT)),
                 CONNECTION_ID,
                 provider,
-                () -> List.of(initiatorSideChannel.eventLoop()),
+                new NoOpHandshakeEventLoopSwitcher(),
                 staleIdDetector,
                 clusterIdSupplier,
                 channel -> {},
@@ -801,7 +801,7 @@ public class RecoveryHandshakeTest extends 
BaseIgniteAbstractTest {
                 new ClusterNodeImpl(launchId, consistentId, new 
NetworkAddress(ACCEPTOR_HOST, PORT)),
                 MESSAGE_FACTORY,
                 provider,
-                () -> List.of(acceptorSideChannel.eventLoop()),
+                new NoOpHandshakeEventLoopSwitcher(),
                 staleIdDetector,
                 clusterIdSupplier,
                 channel -> {},
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
index 9c867e83193..7646fb87060 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
@@ -40,7 +40,6 @@ import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelProgressivePromise;
 import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.EventExecutor;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -52,6 +51,7 @@ import 
org.apache.ignite.internal.network.ConstantClusterIdSupplier;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
+import 
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
 import org.apache.ignite.internal.network.netty.NettySender;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -177,7 +177,7 @@ class RecoveryAcceptorHandshakeManagerTest extends 
BaseIgniteAbstractTest {
                 new ClusterNodeImpl(launchId, ACCEPTOR_CONSISTENT_ID, new 
NetworkAddress(ACCEPTOR_HOST, PORT)),
                 MESSAGE_FACTORY,
                 recoveryDescriptorProvider,
-                () -> List.of(channel.eventLoop()),
+                new NoOpHandshakeEventLoopSwitcher(),
                 new AllIdsAreFresh(),
                 new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
                 channelCreationListener,
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
index 02c4414c92b..91fd7a44507 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
@@ -41,7 +41,6 @@ import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelProgressivePromise;
 import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.EventExecutor;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -54,6 +53,7 @@ import 
org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
+import 
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
 import org.apache.ignite.internal.network.netty.NettySender;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -188,7 +188,7 @@ class RecoveryInitiatorHandshakeManagerTest extends 
BaseIgniteAbstractTest {
                 new ClusterNodeImpl(launchId, INITIATOR_CONSISTENT_ID, new 
NetworkAddress(INITIATOR_HOST, PORT)),
                 CONNECTION_INDEX,
                 recoveryDescriptorProvider,
-                () -> List.of(thisChannel.eventLoop()),
+                new NoOpHandshakeEventLoopSwitcher(),
                 new AllIdsAreFresh(),
                 new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
                 channelCreationListener,


Reply via email to