This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4c92cf4262c IGNITE-26002 Network thread even distribution (#6370)
4c92cf4262c is described below
commit 4c92cf4262ccad9297196e26da98ab9065ccf961
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Aug 8 19:10:56 2025 +0300
IGNITE-26002 Network thread even distribution (#6370)
---
.../ignite/client/handler/ClientHandlerModule.java | 16 +-
.../handler/ClientInboundMessageHandler.java | 17 +-
.../ignite/client/TestClientHandlerModule.java | 3 +-
.../internal/network/NettyBootstrapFactory.java | 18 +-
.../handshake/HandshakeEventLoopSwitcher.java | 205 +++++++++++++++++++++
.../network/netty/ChannelEventLoopsSource.java | 33 ----
.../ignite/internal/network/netty/ChannelKey.java | 2 +-
.../internal/network/netty/ConnectionManager.java | 13 +-
.../network/recovery/HandshakeManagerUtils.java | 57 ------
.../recovery/RecoveryAcceptorHandshakeManager.java | 12 +-
.../RecoveryInitiatorHandshakeManager.java | 11 +-
.../scalecube/ScaleCubeClusterServiceFactory.java | 3 +-
.../network/DefaultMessagingServiceTest.java | 2 +-
.../handshake/NoOpHandshakeEventLoopSwitcher.java | 56 ++++++
.../network/netty/RecoveryHandshakeTest.java | 6 +-
.../RecoveryAcceptorHandshakeManagerTest.java | 4 +-
.../RecoveryInitiatorHandshakeManagerTest.java | 4 +-
17 files changed, 327 insertions(+), 135 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 52d5f292868..9af9a4f672d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.ssl.SslContextProvider;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.schema.SchemaSyncService;
@@ -346,7 +347,11 @@ public class ClientHandlerModule implements
IgniteComponent, PlatformComputeTran
ch.pipeline().addFirst("ssl",
sslContext.newHandler(ch.alloc()));
}
- ClientInboundMessageHandler messageHandler =
createInboundMessageHandler(configuration, connectionId);
+ ClientInboundMessageHandler messageHandler =
createInboundMessageHandler(
+
bootstrapFactory.handshakeEventLoopSwitcher(),
+ configuration,
+ connectionId
+ );
//noinspection TestOnlyProblems
handler = messageHandler;
@@ -423,7 +428,11 @@ public class ClientHandlerModule implements
IgniteComponent, PlatformComputeTran
return result;
}
- private ClientInboundMessageHandler
createInboundMessageHandler(ClientConnectorView configuration, long
connectionId) {
+ private ClientInboundMessageHandler createInboundMessageHandler(
+ HandshakeEventLoopSwitcher handshakeEventLoopSwitcher,
+ ClientConnectorView configuration,
+ long connectionId
+ ) {
return new ClientInboundMessageHandler(
igniteTables,
txManager,
@@ -442,7 +451,8 @@ public class ClientHandlerModule implements
IgniteComponent, PlatformComputeTran
partitionOperationsExecutor,
SUPPORTED_FEATURES,
Map.of(),
- computeExecutors::remove
+ computeExecutors::remove,
+ handshakeEventLoopSwitcher
);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 89719ac82a5..0851d81dbd1 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -140,6 +140,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.IgniteClusterImpl;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.schema.SchemaVersionMismatchException;
@@ -259,6 +260,8 @@ public class ClientInboundMessageHandler
private final Map<Long, CompletableFuture<ClientMessageUnpacker>>
serverToClientRequests = new ConcurrentHashMap<>();
+ private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
+
/**
* Constructor.
*
@@ -297,7 +300,8 @@ public class ClientInboundMessageHandler
Executor partitionOperationsExecutor,
BitSet features,
Map<HandshakeExtension, Object> extensions,
- Function<String, CompletableFuture<PlatformComputeConnection>>
computeConnectionFunc
+ Function<String, CompletableFuture<PlatformComputeConnection>>
computeConnectionFunc,
+ HandshakeEventLoopSwitcher handshakeEventLoopSwitcher
) {
assert igniteTables != null;
assert txManager != null;
@@ -329,6 +333,7 @@ public class ClientInboundMessageHandler
this.clockService = clockService;
this.primaryReplicaTracker = primaryReplicaTracker;
this.partitionOperationsExecutor = partitionOperationsExecutor;
+ this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(
@@ -460,16 +465,16 @@ public class ClientInboundMessageHandler
return;
}
- authenticationManager
-
.authenticateAsync(createAuthenticationRequest(clientHandshakeExtensions))
- .handleAsync((user, err) -> {
+ AuthenticationRequest<?, ?> authReq =
createAuthenticationRequest(clientHandshakeExtensions);
+
+
handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channelHandlerContext.channel())
+ .thenCompose(unused ->
authenticationManager.authenticateAsync(authReq))
+ .whenCompleteAsync((user, err) -> {
if (err != null) {
handshakeError(ctx, packer, err);
} else {
handshakeSuccess(ctx, packer, user,
clientFeatures, clientVer, clientCode);
}
-
- return null;
}, ctx.executor());
} catch (Throwable t) {
handshakeError(ctx, packer, t);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index f25c24f6381..1164e4e78f6 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -270,7 +270,8 @@ public class TestClientHandlerModule implements
IgniteComponent {
Runnable::run,
features,
randomExtensions(),
- unused -> null
+ unused -> null,
+
bootstrapFactory.handshakeEventLoopSwitcher()
)
);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
index 257b493ff34..f2bcff10105 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyBootstrapFactory.java
@@ -38,7 +38,7 @@ import
org.apache.ignite.internal.network.configuration.InboundView;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.OutboundView;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
import
org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup.NetworkThread;
import org.jetbrains.annotations.TestOnly;
@@ -46,7 +46,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Netty bootstrap factory. Holds shared {@link EventLoopGroup} instances and
encapsulates common Netty {@link Bootstrap} creation logic.
*/
-public class NettyBootstrapFactory implements IgniteComponent,
ChannelEventLoopsSource {
+public class NettyBootstrapFactory implements IgniteComponent {
/** Network configuration. */
private final NetworkConfiguration networkConfiguration;
@@ -59,8 +59,7 @@ public class NettyBootstrapFactory implements
IgniteComponent, ChannelEventLoops
/** Work socket channel handler event loop group (this group does network
I/O). */
private EventLoopGroup workerGroup;
- /** All event loops with which {@link io.netty.channel.Channel}s might be
registered. */
- private volatile List<EventLoop> channelEventLoops;
+ private volatile HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
/**
* Constructor.
@@ -153,11 +152,15 @@ public class NettyBootstrapFactory implements
IgniteComponent, ChannelEventLoops
bossGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix +
"-network-accept");
workerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix +
"-network-worker");
- this.channelEventLoops = List.copyOf(eventLoopsAt(workerGroup));
+ this.handshakeEventLoopSwitcher = new
HandshakeEventLoopSwitcher(eventLoopsAt(workerGroup));
return nullCompletedFuture();
}
+ public HandshakeEventLoopSwitcher handshakeEventLoopSwitcher() {
+ return handshakeEventLoopSwitcher;
+ }
+
private static List<EventLoop> eventLoopsAt(EventLoopGroup ... groups) {
List<EventLoop> channelEventLoops = new ArrayList<>();
@@ -198,11 +201,6 @@ public class NettyBootstrapFactory implements
IgniteComponent, ChannelEventLoops
return nullCompletedFuture();
}
- @Override
- public List<EventLoop> channelEventLoops() {
- return channelEventLoops;
- }
-
/**
* Returns worker event loop group.
*/
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeEventLoopSwitcher.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeEventLoopSwitcher.java
new file mode 100644
index 00000000000..2d0a4c6c726
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeEventLoopSwitcher.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import io.netty.channel.EventLoop;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.netty.ChannelKey;
+import org.apache.ignite.network.ClusterNode;
+import reactor.util.annotation.Nullable;
+
+/**
+ * A class responsible for managing the assignment of Netty channels to event
loops
+ * and switching channels between event loops when necessary.
+ */
+public class HandshakeEventLoopSwitcher {
+ private static final IgniteLogger LOG =
Loggers.forClass(HandshakeEventLoopSwitcher.class);
+
+ /** List of available event loops. */
+ private final List<EventLoop> executors;
+
+ /** Map to track the number of channels assigned to each event loop. */
+ private final Map<Integer, Set<ChannelId>> activeChannelMap;
+
+ /**
+ * Map to track channel reservations for specific communication
connections.
+ * The map prevents applying different event loops for the same chаnell
key.
+ */
+ private final Map<ChannelKey, Integer> channelReservationMap = new
HashMap<>();
+
+ /**
+ * Constructs a new instance of HandshakeEventLoopSwitcher.
+ *
+ * @param eventLoops The list of event loops to manage.
+ */
+ public HandshakeEventLoopSwitcher(List<EventLoop> eventLoops) {
+ this.executors = eventLoops;
+ this.activeChannelMap = new HashMap<>(eventLoops.size());
+ }
+
+ /**
+ * Switches the event loop of a given channel if needed.
+ *
+ * @param channel The channel to potentially switch to a different event
loop.
+ * @return A CompletableFuture that completes when the event loop is
switched. The future completes in the target event loop.
+ */
+ public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel) {
+ return switchEventLoopIfNeeded(channel, null);
+ }
+
+ /**
+ * Switches the event loop of a given channel if needed.
+ *
+ * @param channel The channel to potentially switch to a different event
loop.
+ * @param channelKey The unique key identifying the channel. That is a
logical identifier of the connection between two nodes
+ * (it would be {@code null} for a client-server connection). The
purpose is to have strict message ordering in a reliable
+ * connection.
+ * @return A CompletableFuture that completes when the event loop is
switched. The future completes in the target event loop.
+ */
+ public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel,
@Nullable ChannelKey channelKey) {
+ ChannelId channelId = channel.id();
+
+ EventLoop targetEventLoop = eventLoopForKey(channelId, channelKey);
+
+ if (targetEventLoop != channel.eventLoop()) {
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+
+ channel.deregister().addListener(deregistrationFuture -> {
+ if (!deregistrationFuture.isSuccess()) {
+ LOG.error("Cannot deregister a channel from an event
loop", deregistrationFuture.cause());
+
+ fut.completeExceptionally(deregistrationFuture.cause());
+
+ channel.close();
+
+ return;
+ }
+
+
targetEventLoop.register(channel).addListener(registrationFuture -> {
+ if (!registrationFuture.isSuccess()) {
+ LOG.error("Cannot register a channel with an event
loop", registrationFuture.cause());
+
+
fut.completeExceptionally(deregistrationFuture.cause());
+
+ channel.close();
+
+ return;
+ }
+
+ channel.closeFuture().addListener(future -> {
+ channelUnregistered(channelId);
+ });
+
+ fut.complete(null);
+ });
+ });
+
+ return fut;
+ }
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Determines the appropriate event loop for a given channel key.
+ *
+ * @param channelId The ID of the channel.
+ * @param channelKey The unique key identifying the channel.
+ * @return The selected event loop for the channel.
+ */
+ private synchronized EventLoop eventLoopForKey(ChannelId channelId,
@Nullable ChannelKey channelKey) {
+ if (channelKey != null) {
+ Integer idx = channelReservationMap.get(channelKey);
+
+ if (idx != null) {
+ return executors.get(idx);
+ }
+ }
+
+ int minCnt = Integer.MAX_VALUE;
+ int index = 0;
+
+ for (int i = 0; i < executors.size(); i++) {
+ Set<ChannelId> channelIds = activeChannelMap.getOrDefault(i,
Set.of());
+
+ if (channelIds.contains(channelId)) {
+ assert channelKey == null : "Channel key should be null if the
channel is already registered in the event loop";
+
+ return executors.get(index);
+ }
+
+ int cnt = channelIds.size();
+
+ if (cnt < minCnt) {
+ minCnt = cnt;
+ index = i;
+
+ if (cnt == 0) {
+ // If we found an event loop with no channels, we can stop
searching.
+ break;
+ }
+ }
+ }
+
+ activeChannelMap.computeIfAbsent(index, key -> new
HashSet<>()).add(channelId);
+
+ if (channelKey != null) {
+ channelReservationMap.put(channelKey, index);
+ }
+
+ EventLoop eventLoop = executors.get(index);
+
+ LOG.debug("Channel mapped to the loop [channelId={}, channelKey={},
eventLoopIndex={}]", channelId, channelKey, index);
+
+ return eventLoop;
+ }
+
+ /**
+ * Removes a channel from the event loop tracking map when it is
unregistered.
+ *
+ * @param channelId The unique ID identifying the channel to unregister.
+ */
+ private synchronized void channelUnregistered(ChannelId channelId) {
+ for (Set<ChannelId> channelKeys : activeChannelMap.values()) {
+ if (channelKeys.remove(channelId)) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Removes all channels associated with a node from the event loop
tracking map
+ * when the node leaves the topology.
+ *
+ * @param node The node that left the topology.
+ */
+ public synchronized void nodeLeftTopology(ClusterNode node) {
+ channelReservationMap.entrySet().removeIf(entry ->
entry.getKey().launchId().equals(node.id()));
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
deleted file mode 100644
index a499eb5843e..00000000000
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelEventLoopsSource.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.netty;
-
-import io.netty.channel.EventLoop;
-import java.util.List;
-
-/**
- * Allows to obtain a list of all event loops with which {@link
io.netty.channel.Channel}s might be registered.
- */
-@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
-public interface ChannelEventLoopsSource {
- /**
- * Returns list of all event loops with which {@link
io.netty.channel.Channel}s might be registered.
- * This must always return the same event loops in the same order.
- */
- List<EventLoop> channelEventLoops();
-}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
index c13252fe78f..fc568a04bc2 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java
@@ -43,7 +43,7 @@ public class ChannelKey {
this.connectionId = connectionId;
}
- UUID launchId() {
+ public UUID launchId() {
return launchId;
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 165cc3c28f2..3f8ca85d15b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -530,7 +530,7 @@ public class ConnectionManager implements
ChannelCreationListener {
localNode,
connectionId,
descriptorProvider,
- bootstrapFactory,
+ bootstrapFactory.handshakeEventLoopSwitcher(),
staleIdDetector,
clusterIdSupplier,
this,
@@ -554,7 +554,7 @@ public class ConnectionManager implements
ChannelCreationListener {
localNodeFuture.join(),
FACTORY,
descriptorProvider,
- bootstrapFactory,
+ bootstrapFactory.handshakeEventLoopSwitcher(),
staleIdDetector,
clusterIdSupplier,
this,
@@ -632,11 +632,14 @@ public class ConnectionManager implements
ChannelCreationListener {
* it's ID that gets regenerated at each node restart) and recovery
descriptors corresponding to it.
*
* @param id ID of the node (it must have already left the topology).
+ * @return Future that completes when all the channels and descriptors are
closed.
*/
- public void handleNodeLeft(UUID id) {
+ public CompletableFuture<Void> handleNodeLeft(UUID id) {
// We rely on the fact that the node with the given ID has already
left the physical topology.
assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
// TODO: IGNITE-21207 - remove descriptors for good.
connectionMaintenanceExecutor.execute(
@@ -644,8 +647,12 @@ public class ConnectionManager implements
ChannelCreationListener {
// Closing descriptors separately (as some of them might
not have an operating channel attached, but they
// still might have unacknowledged messages/futures).
disposeRecoveryDescriptorsOfLeftNode(id);
+
+ future.complete(null);
}, connectionMaintenanceExecutor)
);
+
+ return future;
}
private CompletableFuture<Void> closeChannelsWith(UUID id) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index b205c0010ff..4266f5070ce 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -18,12 +18,9 @@
package org.apache.ignite.internal.network.recovery;
import static java.util.Collections.emptyList;
-import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoop;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -31,8 +28,6 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.message.ClusterNodeMessage;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
-import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -81,58 +76,6 @@ class HandshakeManagerUtils {
});
}
- /**
- * Moves a channel from its current event loop to the event loop
corresponding to the channel key. This is needed
- * because all channels in the same logical connection must be served by
the same thread.
- *
- * @param channel Channel to move.
- * @param channelKey Key of the logical connection.
- * @param eventLoopsSource Used to get all event loops with which a
channel might be registered.
- * @param afterSwitching Action to execute after switching (it will be
executed on the new event loop).
- */
- static void switchEventLoopIfNeeded(
- Channel channel,
- ChannelKey channelKey,
- ChannelEventLoopsSource eventLoopsSource,
- Runnable afterSwitching
- ) {
- EventLoop targetEventLoop = eventLoopForKey(channelKey,
eventLoopsSource);
-
- if (targetEventLoop != channel.eventLoop()) {
- channel.deregister().addListener(deregistrationFuture -> {
- if (!deregistrationFuture.isSuccess()) {
- LOG.error("Cannot deregister a channel from an event
loop", deregistrationFuture.cause());
-
- channel.close();
-
- return;
- }
-
-
targetEventLoop.register(channel).addListener(registrationFuture -> {
- if (!registrationFuture.isSuccess()) {
- LOG.error("Cannot register a channel with an event
loop", registrationFuture.cause());
-
- channel.close();
-
- return;
- }
-
- afterSwitching.run();
- });
- });
- } else {
- afterSwitching.run();
- }
- }
-
- private static EventLoop eventLoopForKey(ChannelKey channelKey,
ChannelEventLoopsSource eventLoopsSource) {
- List<EventLoop> eventLoops = eventLoopsSource.channelEventLoops();
-
- int index = safeAbs(channelKey.hashCode()) % eventLoops.size();
-
- return eventLoops.get(index);
- }
-
static ClusterNodeMessage clusterNodeToMessage(ClusterNode node) {
return MESSAGE_FACTORY.clusterNodeMessage()
.id(node.id())
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
index b66752fc783..c8109e3e8b6 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.network.recovery;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.switchEventLoopIfNeeded;
import static
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
import io.netty.channel.Channel;
@@ -37,10 +36,10 @@ import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -92,7 +91,7 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider recoveryDescriptorProvider;
- private final ChannelEventLoopsSource channelEventLoopsSource;
+ private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
@@ -119,7 +118,7 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
ClusterNode localNode,
NetworkMessagesFactory messageFactory,
RecoveryDescriptorProvider recoveryDescriptorProvider,
- ChannelEventLoopsSource channelEventLoopsSource,
+ HandshakeEventLoopSwitcher handshakeEventLoopSwitcher,
StaleIdDetector staleIdDetector,
ClusterIdSupplier clusterIdSupplier,
ChannelCreationListener channelCreationListener,
@@ -129,7 +128,7 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
this.localNode = localNode;
this.messageFactory = messageFactory;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
- this.channelEventLoopsSource = channelEventLoopsSource;
+ this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
this.staleIdDetector = staleIdDetector;
this.clusterIdSupplier = clusterIdSupplier;
this.stopping = stopping;
@@ -229,7 +228,8 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
this.remoteChannelId = message.connectionId();
ChannelKey channelKey = new ChannelKey(remoteNode.name(),
remoteNode.id(), remoteChannelId);
- switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource,
() -> tryAcquireDescriptorAndFinishHandshake(message));
+ handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channel, channelKey)
+ .thenRun(() ->
tryAcquireDescriptorAndFinishHandshake(message));
}
private boolean
possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessage message) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index 72e88cf9f0d..8c1e6d14263 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -22,7 +22,6 @@ import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
import static
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.clusterNodeToMessage;
-import static
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.switchEventLoopIfNeeded;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -43,10 +42,10 @@ import
org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
+import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
-import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
@@ -77,7 +76,7 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider recoveryDescriptorProvider;
- private final ChannelEventLoopsSource channelEventLoopsSource;
+ private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
@@ -127,7 +126,7 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
ClusterNode localNode,
short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider,
- ChannelEventLoopsSource channelEventLoopsSource,
+ HandshakeEventLoopSwitcher handshakeEventLoopSwitcher,
StaleIdDetector staleIdDetector,
ClusterIdSupplier clusterIdSupplier,
ChannelCreationListener channelCreationListener,
@@ -137,7 +136,7 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
this.localNode = localNode;
this.connectionId = connectionId;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
- this.channelEventLoopsSource = channelEventLoopsSource;
+ this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
this.staleIdDetector = staleIdDetector;
this.clusterIdSupplier = clusterIdSupplier;
this.stopping = stopping;
@@ -265,7 +264,7 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
this.remoteNode = handshakeStartMessage.serverNode().asClusterNode();
ChannelKey channelKey = new ChannelKey(remoteNode.name(),
remoteNode.id(), connectionId);
- switchEventLoopIfNeeded(channel, channelKey, channelEventLoopsSource,
() -> proceedAfterSavingIds(handshakeStartMessage));
+ handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channel,
channelKey).thenRun(() -> proceedAfterSavingIds(handshakeStartMessage));
}
private void proceedAfterSavingIds(HandshakeStartMessage
handshakeStartMessage) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
index 34e5b619599..3f535717ecb 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -174,7 +174,8 @@ public class ScaleCubeClusterServiceFactory {
topologyService.addEventHandler(new TopologyEventHandler() {
@Override
public void onDisappeared(ClusterNode member) {
- connectionMgr.handleNodeLeft(member.id());
+ connectionMgr.handleNodeLeft(member.id()).thenRun(() ->
+
nettyBootstrapFactory.handshakeEventLoopSwitcher().nodeLeftTopology(member));
}
});
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index aac8acee17e..a8690bcd3d4 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -664,7 +664,7 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
localNode,
connectionId,
recoveryDescriptorProvider,
- bootstrapFactory,
+ bootstrapFactory.handshakeEventLoopSwitcher(),
staleIdDetector,
clusterIdSupplier,
channel -> {},
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/handshake/NoOpHandshakeEventLoopSwitcher.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/handshake/NoOpHandshakeEventLoopSwitcher.java
new file mode 100644
index 00000000000..4318eb5ad6a
--- /dev/null
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/handshake/NoOpHandshakeEventLoopSwitcher.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+import io.netty.channel.Channel;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.netty.ChannelKey;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A no-operation implementation of the HandshakeEventLoopSwitcher.
+ * This class provides empty or default implementations for all methods,
+ * effectively disabling any event loop switching behavior.
+ */
+public class NoOpHandshakeEventLoopSwitcher extends HandshakeEventLoopSwitcher
{
+ /**
+ * Constructs a new instance of NoOpHandshakeEventLoopSwitcher.
+ * This implementation does not require any event loops because it never
switches an event loop,
+ * so an empty list is passed to the superclass.
+ */
+ public NoOpHandshakeEventLoopSwitcher() {
+ super(List.of());
+ }
+
+ @Override
+ public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<Void> switchEventLoopIfNeeded(Channel channel,
ChannelKey channelKey) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ @Override
+ public synchronized void nodeLeftTopology(ClusterNode node) {
+ // No operation for NoOpHandshakeEventLoopSwitcher
+ }
+}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index be8e54a4f0f..a10306d3dac 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -29,7 +29,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import java.util.Collections;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
@@ -765,7 +765,7 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
new ClusterNodeImpl(launchId, consistentId, new
NetworkAddress(INITIATOR_HOST, PORT)),
CONNECTION_ID,
provider,
- () -> List.of(initiatorSideChannel.eventLoop()),
+ new NoOpHandshakeEventLoopSwitcher(),
staleIdDetector,
clusterIdSupplier,
channel -> {},
@@ -801,7 +801,7 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
new ClusterNodeImpl(launchId, consistentId, new
NetworkAddress(ACCEPTOR_HOST, PORT)),
MESSAGE_FACTORY,
provider,
- () -> List.of(acceptorSideChannel.eventLoop()),
+ new NoOpHandshakeEventLoopSwitcher(),
staleIdDetector,
clusterIdSupplier,
channel -> {},
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
index 9c867e83193..7646fb87060 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
@@ -40,7 +40,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelProgressivePromise;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.EventExecutor;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -52,6 +51,7 @@ import
org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.handshake.HandshakeException;
+import
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.NettySender;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -177,7 +177,7 @@ class RecoveryAcceptorHandshakeManagerTest extends
BaseIgniteAbstractTest {
new ClusterNodeImpl(launchId, ACCEPTOR_CONSISTENT_ID, new
NetworkAddress(ACCEPTOR_HOST, PORT)),
MESSAGE_FACTORY,
recoveryDescriptorProvider,
- () -> List.of(channel.eventLoop()),
+ new NoOpHandshakeEventLoopSwitcher(),
new AllIdsAreFresh(),
new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
channelCreationListener,
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
index 02c4414c92b..91fd7a44507 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
@@ -41,7 +41,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelProgressivePromise;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.EventExecutor;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -54,6 +53,7 @@ import
org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeException;
+import
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.NettySender;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -188,7 +188,7 @@ class RecoveryInitiatorHandshakeManagerTest extends
BaseIgniteAbstractTest {
new ClusterNodeImpl(launchId, INITIATOR_CONSISTENT_ID, new
NetworkAddress(INITIATOR_HOST, PORT)),
CONNECTION_INDEX,
recoveryDescriptorProvider,
- () -> List.of(thisChannel.eventLoop()),
+ new NoOpHandshakeEventLoopSwitcher(),
new AllIdsAreFresh(),
new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
channelCreationListener,