This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6182cc78bd IGNITE-21068 Ignite node must not communicate with a node 
removed from the Physical Topology (#2999)
6182cc78bd is described below

commit 6182cc78bdae11b84fc14184a211a84dfd6a5c29
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Fri Dec 29 17:40:34 2023 +0400

    IGNITE-21068 Ignite node must not communicate with a node removed from the 
Physical Topology (#2999)
---
 ...niteDistributionZoneManagerNodeRestartTest.java |   4 +-
 modules/network/build.gradle                       |   1 +
 .../scalecube/ItScaleCubeNetworkMessagingTest.java | 284 ++++++++++++++++-----
 .../internal/network/netty/ConnectionManager.java  |  29 +++
 .../internal/network/netty/ConnectorKey.java       |   6 +
 .../internal/network/netty/HandshakeHandler.java   |   4 +-
 .../internal/network/netty/InNetworkObject.java    |  18 +-
 .../internal/network/netty/MessageHandler.java     |  16 +-
 .../recovery/RecoveryClientHandshakeManager.java   |   2 +-
 .../recovery/RecoveryServerHandshakeManager.java   |   2 +-
 .../{VaultStateIds.java => VaultStaleIds.java}     |   6 +-
 .../ignite/network/DefaultMessagingService.java    |  27 +-
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  37 ++-
 ...ultStateIdsTest.java => VaultStaleIdsTest.java} |   8 +-
 .../network/DefaultMessagingServiceTest.java       |   5 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   4 +-
 17 files changed, 354 insertions(+), 103 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 6ec20de671..79fefe2e28 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -100,7 +100,7 @@ import org.apache.ignite.internal.metastorage.server.If;
 import 
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
-import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.network.recovery.VaultStaleIds;
 import 
org.apache.ignite.internal.security.authentication.validator.AuthenticationProvidersValidatorImpl;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -193,7 +193,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
                 networkConfiguration,
                 nettyBootstrapFactory,
                 defaultSerializationRegistry(),
-                new VaultStateIds(vault)
+                new VaultStaleIds(vault)
         );
 
         var clusterStateStorage = new TestClusterStateStorage();
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index ff153a7ff3..01d9df2b8c 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -82,4 +82,5 @@ dependencies {
     integrationTestImplementation libs.netty.handler
     integrationTestImplementation libs.scalecube.cluster
     integrationTestImplementation libs.javapoet
+    integrationTestImplementation libs.jetbrains.annotations
 }
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 529169a5a6..874d05921e 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -17,15 +17,20 @@
 
 package org.apache.ignite.network.scalecube;
 
+import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -45,15 +50,18 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.messages.TestMessage;
 import org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
 import 
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
@@ -61,6 +69,7 @@ import 
org.apache.ignite.internal.testframework.log4j2.LogInspector;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.DefaultMessagingService;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NodeFinder;
@@ -69,8 +78,11 @@ import org.apache.ignite.network.TopologyEventHandler;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.apache.logging.log4j.core.LogEvent;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import reactor.core.publisher.Mono;
 
 /**
@@ -90,9 +102,15 @@ class ItScaleCubeNetworkMessagingTest {
     /** List of test log inspectors. */
     private final List<LogInspector> logInspectors = new ArrayList<>();
 
-    /** Tear down method. */
+    private TestInfo testInfo;
+
+    @BeforeEach
+    void saveTestInfo(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
     @AfterEach
-    public void tearDown() {
+    void tearDown() {
         testCluster.shutdown();
         logInspectors.forEach(LogInspector::stop);
         logInspectors.clear();
@@ -104,7 +122,7 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception in case of errors.
      */
     @Test
-    public void messageWasSentToAllMembersSuccessfully(TestInfo testInfo) 
throws Exception {
+    public void messageWasSentToAllMembersSuccessfully() throws Exception {
         Map<String, TestMessage> messageStorage = new ConcurrentHashMap<>();
 
         var messageReceivedLatch = new CountDownLatch(3);
@@ -146,8 +164,8 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testShutdown(TestInfo testInfo) throws Exception {
-        testShutdown0(testInfo, false);
+    public void testShutdown() throws Exception {
+        testShutdown0(false);
     }
 
     /**
@@ -156,8 +174,8 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testForcefulShutdown(TestInfo testInfo) throws Exception {
-        testShutdown0(testInfo, true);
+    public void testForcefulShutdown() throws Exception {
+        testShutdown0(true);
     }
 
     /**
@@ -166,7 +184,7 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception in case of errors.
      */
     @Test
-    public void testSendMessageToSelf(TestInfo testInfo) throws Exception {
+    public void testSendMessageToSelf() throws Exception {
         testCluster = new Cluster(1, testInfo);
         testCluster.startAwait();
 
@@ -213,7 +231,7 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception in case of errors.
      */
     @Test
-    public void testInvokeMessageToSelf(TestInfo testInfo) throws Exception {
+    public void testInvokeMessageToSelf() throws Exception {
         testCluster = new Cluster(1, testInfo);
         testCluster.startAwait();
 
@@ -246,7 +264,7 @@ class ItScaleCubeNetworkMessagingTest {
      * exceptionally.
      */
     @Test
-    public void testInvokeAfterStop(TestInfo testInfo) throws 
InterruptedException {
+    public void testInvokeAfterStop() throws InterruptedException {
         testCluster = new Cluster(2, testInfo);
         testCluster.startAwait();
 
@@ -282,7 +300,7 @@ class ItScaleCubeNetworkMessagingTest {
      * exceptionally.
      */
     @Test
-    public void testInvokeDuringStop(TestInfo testInfo) throws 
InterruptedException {
+    public void testInvokeDuringStop() throws InterruptedException {
         testCluster = new Cluster(2, testInfo);
         testCluster.startAwait();
 
@@ -320,7 +338,7 @@ class ItScaleCubeNetworkMessagingTest {
      * exceptionally.
      */
     @Test
-    public void testStopDuringAwaitingForInvokeResponse(TestInfo testInfo) 
throws InterruptedException {
+    public void testStopDuringAwaitingForInvokeResponse() throws 
InterruptedException {
         testCluster = new Cluster(2, testInfo);
         testCluster.startAwait();
 
@@ -366,7 +384,7 @@ class ItScaleCubeNetworkMessagingTest {
      * Tests that Scalecube messages are not blocked if some message handler 
blocks handling of 'normal' messages.
      */
     @Test
-    public void scalecubeMessagesAreSentSeparatelyFromOtherMessages(TestInfo 
testInfo) throws InterruptedException {
+    public void scalecubeMessagesAreSentSeparatelyFromOtherMessages() throws 
InterruptedException {
         testCluster = new Cluster(2, testInfo);
         testCluster.startAwait();
 
@@ -420,7 +438,7 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception in case of errors.
      */
     @Test
-    public void testMessageGroupsHandlers(TestInfo testInfo) throws Exception {
+    public void testMessageGroupsHandlers() throws Exception {
         testCluster = new Cluster(2, testInfo);
         testCluster.startAwait();
 
@@ -477,27 +495,183 @@ class ItScaleCubeNetworkMessagingTest {
      * @throws Exception in case of errors.
      */
     @SuppressWarnings("ConstantConditions")
-    @Test
-    public void nodeCannotReuseOldId(TestInfo testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void nodeCannotReuseOldId(boolean keepPreExistingConnections) 
throws Exception {
         testCluster = new Cluster(3, testInfo);
 
         testCluster.startAwait();
 
         String outcastName = 
testCluster.members.get(testCluster.members.size() - 1).nodeName();
 
-        knockOutNode(outcastName);
+        knockOutNode(outcastName, !keepPreExistingConnections);
 
         IgniteBiTuple<CountDownLatch, AtomicBoolean> pair = 
reanimateNode(outcastName);
         CountDownLatch ready = pair.get1();
         AtomicBoolean reappeared = pair.get2();
 
-        assertTrue(ready.await(10, TimeUnit.SECONDS), "Node neither 
reappeared, not was rejected");
+        assertTrue(ready.await(10, TimeUnit.SECONDS), "Node neither 
reappeared, nor was rejected");
 
         assertThat(reappeared.get(), is(false));
     }
 
-    private void knockOutNode(String outcastName) throws InterruptedException {
-        CountDownLatch disappeared = new CountDownLatch(2);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void nodeCannotCommunicateAfterLeavingPhysicalTopology(boolean 
keepPreExistingConnections) throws Exception {
+        testCluster = new Cluster(3, testInfo);
+
+        testCluster.startAwait();
+
+        ClusterService notOutcast = testCluster.members.get(0);
+        ClusterService outcast = 
testCluster.members.get(testCluster.members.size() - 1);
+
+        ClusterNode outcastNode = 
notOutcast.topologyService().getByConsistentId(outcast.nodeName());
+        ClusterNode notOutcastNode = 
outcast.topologyService().getByConsistentId(notOutcast.nodeName());
+        assertNotNull(outcastNode);
+        assertNotNull(notOutcastNode);
+
+        if (keepPreExistingConnections) {
+            assertThat(notOutcast.messagingService().send(outcastNode, 
messageFactory.testMessage().build()), willCompleteSuccessfully());
+            assertThat(outcast.messagingService().send(notOutcastNode, 
messageFactory.testMessage().build()), willCompleteSuccessfully());
+        }
+
+        knockOutNode(outcast.nodeName(), !keepPreExistingConnections);
+
+        stopDroppingMessagesTo(outcast.nodeName());
+
+        CompletableFuture<Void> sendFromOutcast = 
outcast.messagingService().send(notOutcastNode, 
messageFactory.testMessage().build());
+        assertThat(sendFromOutcast, willThrow(HandshakeException.class));
+
+        CompletableFuture<?> invokeFromOutcast = 
outcast.messagingService().invoke(
+                notOutcastNode,
+                messageFactory.testMessage().build(),
+                10_000
+        );
+        assertThat(invokeFromOutcast, willThrow(HandshakeException.class));
+
+        CompletableFuture<Void> sendToOutcast = 
notOutcast.messagingService().send(outcastNode, 
messageFactory.testMessage().build());
+        assertThat(sendToOutcast, willThrow(HandshakeException.class));
+
+        CompletableFuture<?> invokeToOutcast = 
notOutcast.messagingService().invoke(
+                outcastNode,
+                messageFactory.testMessage().build(),
+                10_000
+        );
+        assertThat(invokeToOutcast, willThrow(HandshakeException.class));
+    }
+
+    @Test
+    public void reconnectsAfterConnectionDrop() throws Exception {
+        testCluster = new Cluster(2, testInfo);
+
+        testCluster.startAwait();
+
+        ClusterService sender = testCluster.members.get(0);
+        ClusterService receiver = testCluster.members.get(1);
+
+        receiver.messagingService().addMessageHandler(
+                TestMessageTypes.class,
+                (message, senderConsistentId, correlationId) -> {
+                    receiver.messagingService().respond(
+                            sender.topologyService().localMember(),
+                            message,
+                            requireNonNull(correlationId)
+                    );
+                }
+        );
+
+        // Make an invocation to establish a connection.
+        CompletableFuture<?> firstInvoke = sender.messagingService().invoke(
+                receiver.topologyService().localMember(),
+                messageFactory.testMessage().build(),
+                10_000
+        );
+        assertThat(firstInvoke, willCompleteSuccessfully());
+
+        closeAllChannels(sender.messagingService());
+
+        // Now try again.
+        CompletableFuture<?> secondInvoke = sender.messagingService().invoke(
+                receiver.topologyService().localMember(),
+                messageFactory.testMessage().build(),
+                10_000
+        );
+        assertThat(secondInvoke, willCompleteSuccessfully());
+    }
+
+    @Test
+    public void doesNotDeliverMessagesWhoseSenderLeftPhysicalTopology() throws 
Exception {
+        testCluster = new Cluster(2, testInfo);
+
+        testCluster.startAwait();
+
+        ClusterService sender = testCluster.members.get(0);
+        ClusterService receiver = testCluster.members.get(1);
+
+        // We are going to send 3 messages, of which 2 will arrive after the 
sender has been removed from the physical topology,
+        // so we expect 2 messages to be 'skipped' and not delivered on the 
receiver.
+        CountDownLatch messagesSkipped = new CountDownLatch(2);
+        logInspectors.add(
+                new LogInspector(
+                        DefaultMessagingService.class.getName(),
+                        event -> 
event.getMessage().getFormattedMessage().contains("is stale, so skipping 
message handling"),
+                        messagesSkipped::countDown
+                )
+        );
+        logInspectors.forEach(LogInspector::start);
+
+        AtomicBoolean first = new AtomicBoolean(true);
+        CountDownLatch canProceed = new CountDownLatch(1);
+        CountDownLatch blockingStarted = new CountDownLatch(1);
+        AtomicInteger messagesDelivered = new AtomicInteger();
+
+        receiver.messagingService().addMessageHandler(
+                TestMessageTypes.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (first.compareAndSet(true, false)) {
+                        blockingStarted.countDown();
+
+                        try {
+                            assertTrue(canProceed.await(10, TimeUnit.SECONDS));
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    messagesDelivered.incrementAndGet();
+                }
+        );
+
+        TestMessage message = messageFactory.testMessage().build();
+
+        // This message will get stuck.
+        
sender.messagingService().send(receiver.topologyService().localMember(), 
message);
+
+        // These 2 will be handled after the sender has left the physical 
topology.
+        
sender.messagingService().send(receiver.topologyService().localMember(), 
message);
+        
sender.messagingService().invoke(receiver.topologyService().localMember(), 
message, 10_000);
+
+        assertTrue(blockingStarted.await(10, TimeUnit.SECONDS));
+
+        knockOutNode(sender.nodeName(), false);
+
+        canProceed.countDown();
+
+        assertTrue(messagesSkipped.await(10, TimeUnit.SECONDS), "Messages were 
not skipped");
+        assertThat(messagesDelivered.get(), is(1));
+    }
+
+    private static void closeAllChannels(MessagingService messagingService) 
throws InterruptedException {
+        ConnectionManager connectionManager = ((DefaultMessagingService) 
messagingService).connectionManager();
+
+        for (NettySender sender : connectionManager.channels().values()) {
+            sender.close().await(10, TimeUnit.SECONDS);
+        }
+    }
+
+    private void knockOutNode(String outcastName, boolean 
closeConnectionsForcibly) throws InterruptedException {
+        CountDownLatch disappeared = new 
CountDownLatch(testCluster.members.size() - 1);
 
         TopologyEventHandler disappearListener = new TopologyEventHandler() {
             @Override
@@ -524,17 +698,15 @@ class ItScaleCubeNetworkMessagingTest {
         // Wait until all nodes see disappearance of the outcast.
         assertTrue(disappeared.await(10, TimeUnit.SECONDS), "Node did not 
disappear in time");
 
-        DefaultMessagingService messagingService = (DefaultMessagingService) 
testCluster.members.stream()
-                .filter(service -> outcastName.equals(service.nodeName()))
-                .findFirst()
-                .get().messagingService();
-
-        ConnectionManager cm = messagingService.connectionManager();
+        if (closeConnectionsForcibly) {
+            MessagingService messagingService = testCluster.members.stream()
+                    .filter(service -> outcastName.equals(service.nodeName()))
+                    .findFirst().orElseThrow()
+                    .messagingService();
 
-        // Forcefully close channels, so that nodes will create new channels 
on reanimation of the outcast.
-        cm.channels().forEach((stringConnectorKey, nettySender) -> {
-            nettySender.close().awaitUninterruptibly();
-        });
+            // Forcefully close channels, so that nodes will create new 
channels on reanimation of the outcast.
+            closeAllChannels(messagingService);
+        }
     }
 
     private IgniteBiTuple<CountDownLatch, AtomicBoolean> reanimateNode(String 
outcastName) {
@@ -557,33 +729,36 @@ class ItScaleCubeNetworkMessagingTest {
         logInspectors.add(new LogInspector(
                 RecoveryClientHandshakeManager.class.getName(),
                 matcher,
-                ready::countDown));
+                () -> ready.countDown()));
 
         logInspectors.add(new LogInspector(
                 RecoveryServerHandshakeManager.class.getName(),
                 matcher,
-                ready::countDown));
+                () -> ready.countDown()));
 
         logInspectors.forEach(LogInspector::start);
 
+        stopDroppingMessagesTo(outcastName);
+
+        return new IgniteBiTuple<>(ready, reappeared);
+    }
+
+    private void stopDroppingMessagesTo(String outcastName) {
         testCluster.members.stream()
                 .filter(service -> !outcastName.equals(service.nodeName()))
                 .forEach(service -> {
                     DefaultMessagingService messagingService = 
(DefaultMessagingService) service.messagingService();
                     messagingService.stopDroppingMessages();
                 });
-
-        return new IgniteBiTuple<>(ready, reappeared);
     }
 
     /**
      * Tests shutdown.
      *
-     * @param testInfo Test info.
      * @param forceful Whether shutdown should be forceful.
      * @throws Exception If failed.
      */
-    private void testShutdown0(TestInfo testInfo, boolean forceful) throws 
Exception {
+    private void testShutdown0(boolean forceful) throws Exception {
         testCluster = new Cluster(2, testInfo);
         testCluster.startAwait();
 
@@ -651,9 +826,6 @@ class ItScaleCubeNetworkMessagingTest {
         /** Members of the cluster. */
         final List<ClusterService> members;
 
-        /** Latch that is locked until all members are visible in the 
topology. */
-        private final CountDownLatch startupLatch;
-
         /** Node finder. */
         private final NodeFinder nodeFinder;
 
@@ -664,18 +836,14 @@ class ItScaleCubeNetworkMessagingTest {
          * @param testInfo   Test info.
          */
         Cluster(int numOfNodes, TestInfo testInfo) {
-            startupLatch = new CountDownLatch(numOfNodes - 1);
-
             int initialPort = 3344;
 
             List<NetworkAddress> addresses = findLocalAddresses(initialPort, 
initialPort + numOfNodes);
 
             this.nodeFinder = new StaticNodeFinder(addresses);
 
-            var isInitial = new AtomicBoolean(true);
-
             members = addresses.stream()
-                    .map(addr -> startNode(testInfo, addr, 
isInitial.getAndSet(false)))
+                    .map(addr -> startNode(testInfo, addr))
                     .collect(toUnmodifiableList());
         }
 
@@ -684,25 +852,10 @@ class ItScaleCubeNetworkMessagingTest {
          *
          * @param testInfo Test info.
          * @param addr     Node address.
-         * @param initial  Whether this node is the first one.
          * @return Started cluster node.
          */
-        private ClusterService startNode(
-                TestInfo testInfo, NetworkAddress addr, boolean initial
-        ) {
-            ClusterService clusterSvc = 
ClusterServiceTestUtils.clusterService(testInfo, addr.port(), nodeFinder);
-
-            if (initial) {
-                clusterSvc.topologyService().addEventHandler(new 
TopologyEventHandler() {
-                    /** {@inheritDoc} */
-                    @Override
-                    public void onAppeared(ClusterNode member) {
-                        startupLatch.countDown();
-                    }
-                });
-            }
-
-            return clusterSvc;
+        private ClusterService startNode(TestInfo testInfo, NetworkAddress 
addr) {
+            return ClusterServiceTestUtils.clusterService(testInfo, 
addr.port(), nodeFinder);
         }
 
         /**
@@ -714,11 +867,18 @@ class ItScaleCubeNetworkMessagingTest {
         void startAwait() throws InterruptedException {
             members.forEach(ClusterService::start);
 
-            if (!startupLatch.await(3, TimeUnit.SECONDS)) {
+            if (!waitForCondition(this::allMembersSeeEachOther, 
TimeUnit.SECONDS.toMillis(3))) {
                 throw new AssertionError();
             }
         }
 
+        private boolean allMembersSeeEachOther() {
+            int totalMembersSeen = members.stream()
+                    .mapToInt(m -> m.topologyService().allMembers().size())
+                    .sum();
+            return totalMembersSeen == members.size() * members.size();
+        }
+
         /**
          * Stops the cluster.
          */
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 6587b91bde..4651b4557c 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.network.netty;
 
 import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.network.ChannelType.getChannel;
 
 import io.netty.bootstrap.Bootstrap;
@@ -26,6 +27,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -327,6 +329,15 @@ public class ConnectionManager implements 
ChannelCreationListener {
         // Old channel can still be in the map, but it must be closed already 
by the tie breaker in the
         // handshake manager.
         assert oldChannel == null || !oldChannel.isOpen() : "Incorrect channel 
creation flow";
+
+        // Preventing a race between calling closeConnectionsWith() and 
putting a new channel that was just opened (with the node
+        // which is already stale). If it's stale, then the stale detector 
already knows it (and it knows it before
+        // closeConnectionsWith() gets called as it subscribes first).
+        // This is the only place where a new sender might be added to the map.
+        if (staleIdDetector.isIdStale(channel.launchId())) {
+            channel.close();
+            channels.remove(key, channel);
+        }
     }
 
     /**
@@ -486,4 +497,22 @@ public class ConnectionManager implements 
ChannelCreationListener {
     public void initiateStopping() {
         stopping.set(true);
     }
+
+    /**
+     * Closes physical connections with an Ignite node identified by the given 
ID (it's not consistentId,
+     * it's ID that gets regenerated at each node restart).
+     *
+     * @param id ID of the node.
+     */
+    public void closeConnectionsWith(String id) {
+        List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = 
channels.entrySet().stream()
+                .filter(entry -> entry.getValue().launchId().equals(id))
+                .collect(toList());
+
+        for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) 
{
+            entry.getValue().close();
+
+            channels.remove(entry.getKey());
+        }
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
index 9b0f39e4a9..8ec2a86411 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectorKey.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.network.ChannelType;
 
 /**
@@ -69,4 +70,9 @@ public class ConnectorKey<T> {
         result = 31 * result + (type != null ? type.hashCode() : 0);
         return result;
     }
+
+    @Override
+    public String toString() {
+        return S.toString(ConnectorKey.class, this);
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index aebd477a27..6ab57d9e2f 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -117,7 +117,7 @@ public class HandshakeHandler extends 
ChannelInboundHandlerAdapter {
      * @param connectionIndex Index of the connection (corresponds to the 
channel ID).
      * @return Message handler.
      */
-    public MessageHandler createMessageHandler(String remoteConsistentId, 
short connectionIndex) {
-        return new MessageHandler(messageListener, remoteConsistentId, 
connectionIndex, serializationService);
+    public MessageHandler createMessageHandler(String remoteLaunchId, String 
remoteConsistentId, short connectionIndex) {
+        return new MessageHandler(messageListener, remoteLaunchId, 
remoteConsistentId, connectionIndex, serializationService);
     }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
index 739fd07e5d..f7693685fe 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
@@ -28,6 +28,8 @@ public class InNetworkObject {
     /** Message. */
     private final NetworkMessage message;
 
+    private final String launchId;
+
     /** Sender's consistent id. */
     private final String consistentId;
 
@@ -37,8 +39,15 @@ public class InNetworkObject {
     private final DescriptorRegistry registry;
 
     /** Constructor. */
-    public InNetworkObject(NetworkMessage message, String consistentId, short 
connectionIndex, DescriptorRegistry registry) {
+    public InNetworkObject(
+            NetworkMessage message,
+            String launchId,
+            String consistentId,
+            short connectionIndex,
+            DescriptorRegistry registry
+    ) {
         this.message = message;
+        this.launchId = launchId;
         this.consistentId = consistentId;
         this.connectionIndex = connectionIndex;
         this.registry = registry;
@@ -53,6 +62,13 @@ public class InNetworkObject {
         return message;
     }
 
+    /**
+     * Returns node ID of the sender that does not survive node restart (aka 
launch ID).
+     */
+    public String launchId() {
+        return launchId;
+    }
+
     /**
      * Returns consistent id.
      *
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
index e07633270f..80b0e6ec6b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
@@ -34,6 +34,8 @@ public class MessageHandler extends 
ChannelInboundHandlerAdapter {
     /** Message listener. */
     private final Consumer<InNetworkObject> messageListener;
 
+    private final String launchId;
+
     /** Consistent id of the remote node. */
     private final String consistentId;
 
@@ -45,12 +47,20 @@ public class MessageHandler extends 
ChannelInboundHandlerAdapter {
      * Constructor.
      *
      * @param messageListener Message listener.
+     * @param launchId Launch ID (this is the ID that does not survive a 
restart) of the remote node.
      * @param consistentId Consistent id of the remote node.
+     * @param connectionIndex Connection index (aka channel ID).
      * @param serializationService Serialization service.
      */
-    public MessageHandler(Consumer<InNetworkObject> messageListener, String 
consistentId, short connectionIndex,
-            PerSessionSerializationService serializationService) {
+    public MessageHandler(
+            Consumer<InNetworkObject> messageListener,
+            String launchId,
+            String consistentId,
+            short connectionIndex,
+            PerSessionSerializationService serializationService
+    ) {
         this.messageListener = messageListener;
+        this.launchId = launchId;
         this.consistentId = consistentId;
         this.connectionIndex = connectionIndex;
         this.serializationService = serializationService;
@@ -66,7 +76,7 @@ public class MessageHandler extends 
ChannelInboundHandlerAdapter {
         }
 
         messageListener.accept(
-                new InNetworkObject(message, consistentId, connectionIndex, 
serializationService.compositeDescriptorRegistry())
+                new InNetworkObject(message, launchId, consistentId, 
connectionIndex, serializationService.compositeDescriptorRegistry())
         );
     }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index d44f082c00..9fd6fd6696 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -381,7 +381,7 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
      * @return New message handler.
      */
     private MessageHandler createMessageHandler() {
-        return handler.createMessageHandler(remoteConsistentId, connectionId);
+        return handler.createMessageHandler(remoteLaunchId.toString(), 
remoteConsistentId, connectionId);
     }
 
     /**
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index af5aa45b4c..54edec00d3 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -389,7 +389,7 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
      * @return New message handler.
      */
     private MessageHandler createMessageHandler() {
-        return handler.createMessageHandler(remoteConsistentId, 
remoteChannelId);
+        return handler.createMessageHandler(remoteLaunchId.toString(), 
remoteConsistentId, remoteChannelId);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
similarity index 94%
rename from 
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
rename to 
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
index e011b30978..2ad20206e4 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStaleIds.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.vault.VaultManager;
 /**
  * {@link StaleIds} implementating using Vault as a persistent storage.
  */
-public class VaultStateIds implements StaleIds {
+public class VaultStaleIds implements StaleIds {
     private static final ByteArray STALE_IDS_KEY = new 
ByteArray("network.staleIds");
 
     private static final int DEFAULT_MAX_IDS_TO_REMEMBER = 10_000;
@@ -41,11 +41,11 @@ public class VaultStateIds implements StaleIds {
 
     private Set<String> staleIds;
 
-    public VaultStateIds(VaultManager vaultManager) {
+    public VaultStaleIds(VaultManager vaultManager) {
         this(vaultManager, DEFAULT_MAX_IDS_TO_REMEMBER);
     }
 
-    public VaultStateIds(VaultManager vaultManager, int maxIdsToRemember) {
+    public VaultStaleIds(VaultManager vaultManager, int maxIdsToRemember) {
         this.vaultManager = vaultManager;
         this.maxIdsToRemember = maxIdsToRemember;
     }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 22115aa56f..63c0ac9b62 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.internal.network.netty.InNetworkObject;
 import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
 import 
org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
@@ -67,6 +68,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     /** Topology service. */
     private final TopologyService topologyService;
 
+    private final StaleIdDetector staleIdDetector;
+
     /** User object marshaller. */
     private final UserObjectMarshaller marshaller;
 
@@ -97,6 +100,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      *
      * @param factory Network messages factory.
      * @param topologyService Topology service.
+     * @param staleIdDetector Used to detect stale node IDs.
      * @param classDescriptorRegistry Descriptor registry.
      * @param marshaller Marshaller.
      */
@@ -104,11 +108,13 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             String nodeName,
             NetworkMessagesFactory factory,
             TopologyService topologyService,
+            StaleIdDetector staleIdDetector,
             ClassDescriptorRegistry classDescriptorRegistry,
             UserObjectMarshaller marshaller
     ) {
         this.factory = factory;
         this.topologyService = topologyService;
+        this.staleIdDetector = staleIdDetector;
         this.classDescriptorRegistry = classDescriptorRegistry;
         this.marshaller = marshaller;
 
@@ -220,7 +226,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         NetworkMessage message = correlationId != null ? 
responseFromMessage(msg, correlationId) : msg;
 
-        return sendMessage0(recipient.name(), type, recipientAddress, message);
+        return sendViaNetwork(recipient.name(), type, recipientAddress, 
message);
     }
 
     private boolean shouldDropMessage(ClusterNode recipient, NetworkMessage 
msg) {
@@ -264,7 +270,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         InvokeRequest message = requestFromMessage(msg, correlationId);
 
-        return sendMessage0(recipient.name(), type, recipientAddress, 
message).thenCompose(unused -> responseFuture);
+        return sendViaNetwork(recipient.name(), type, recipientAddress, 
message).thenCompose(unused -> responseFuture);
     }
 
     /**
@@ -277,21 +283,21 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      *
      * @return Future of the send operation.
      */
-    private CompletableFuture<Void> sendMessage0(
+    private CompletableFuture<Void> sendViaNetwork(
             @Nullable String consistentId,
             ChannelType type,
             InetSocketAddress addr,
             NetworkMessage message
     ) {
         if (isInNetworkThread()) {
-            return CompletableFuture.supplyAsync(() -> 
sendMessage0(consistentId, type, addr, message), outboundExecutor)
+            return CompletableFuture.supplyAsync(() -> 
sendViaNetwork(consistentId, type, addr, message), outboundExecutor)
                     .thenCompose(Function.identity());
         }
 
         List<ClassDescriptorMessage> descriptors;
 
         try {
-            descriptors = beforeRead(message);
+            descriptors = prepareMarshal(message);
         } catch (Exception e) {
             return failedFuture(new IgniteException("Failed to marshal 
message: " + e.getMessage(), e));
         }
@@ -300,7 +306,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
                 .thenComposeToCompletable(sender -> sender.send(new 
OutNetworkObject(message, descriptors)));
     }
 
-    private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws 
Exception {
+    private List<ClassDescriptorMessage> prepareMarshal(NetworkMessage msg) 
throws Exception {
         IntSet ids = new IntOpenHashSet();
 
         msg.prepareMarshal(ids, marshaller);
@@ -345,6 +351,11 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     }
 
     private void handleIncomingMessage(InNetworkObject obj) {
+        if (senderIdIsStale(obj)) {
+            LOG.info("Sender ID {} ({}) is stale, so skipping message 
handling: {}", obj.launchId(), obj.consistentId(), obj.message());
+            return;
+        }
+
         NetworkMessage msg = obj.message();
         DescriptorRegistry registry = obj.registry();
         try {
@@ -380,6 +391,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         }
     }
 
+    private boolean senderIdIsStale(InNetworkObject obj) {
+        return staleIdDetector.isIdStale(obj.launchId());
+    }
+
     private static void logAndRethrowIfError(InNetworkObject obj, Throwable e) 
{
         if (e instanceof UnresolvableConsistentIdException && obj.message() 
instanceof InvokeRequest) {
             LOG.info("onMessage() failed while processing {} from {} as the 
sender has left the topology",
diff --git 
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 68e0de4097..f8124e835b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -83,16 +83,26 @@ public class ScaleCubeClusterServiceFactory {
             MessageSerializationRegistry serializationRegistry,
             StaleIds staleIds
     ) {
-        var messageFactory = new NetworkMessagesFactory();
-
         var topologyService = new ScaleCubeTopologyService();
 
+        // Adding this handler as the first handler to make sure that StaleIds 
is at least up-to-date as any
+        // other component that watches topology events.
+        topologyService.addEventHandler(new TopologyEventHandler() {
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                staleIds.markAsStale(member.id());
+            }
+        });
+
+        var messageFactory = new NetworkMessagesFactory();
+
         UserObjectSerializationContext userObjectSerialization = 
createUserObjectSerializationContext();
 
         var messagingService = new DefaultMessagingService(
                 consistentId,
                 messageFactory,
                 topologyService,
+                staleIds,
                 userObjectSerialization.descriptorRegistry(),
                 userObjectSerialization.marshaller()
         );
@@ -114,7 +124,7 @@ public class ScaleCubeClusterServiceFactory {
 
                 NetworkView configView = networkConfiguration.value();
 
-                connectionMgr = new ConnectionManager(
+                ConnectionManager connectionMgr = new ConnectionManager(
                         configView,
                         serializationService,
                         launchId,
@@ -125,6 +135,13 @@ public class ScaleCubeClusterServiceFactory {
 
                 connectionMgr.start();
 
+                topologyService.addEventHandler(new TopologyEventHandler() {
+                    @Override
+                    public void onDisappeared(ClusterNode member) {
+                        connectionMgr.closeConnectionsWith(member.id());
+                    }
+                });
+
                 var transport = new ScaleCubeDirectMarshallerTransport(
                         connectionMgr.localAddress(),
                         messagingService,
@@ -133,7 +150,7 @@ public class ScaleCubeClusterServiceFactory {
                 );
 
                 NodeFinder finder = 
NodeFinderFactory.createNodeFinder(configView.nodeFinder());
-                cluster = new 
ClusterImpl(clusterConfig(configView.membership()))
+                ClusterImpl cluster = new 
ClusterImpl(clusterConfig(configView.membership()))
                         .handler(cl -> new ClusterMessageHandler() {
                             /** {@inheritDoc} */
                             @Override
@@ -149,24 +166,20 @@ public class ScaleCubeClusterServiceFactory {
                         .transport(opts -> 
opts.transportFactory(transportConfig -> transport))
                         .membership(opts -> 
opts.seedMembers(parseAddresses(finder.findNodes())));
 
-                shutdownFuture = cluster.onShutdown().toFuture();
+                this.shutdownFuture = cluster.onShutdown().toFuture();
 
                 // resolve cyclic dependencies
                 topologyService.setCluster(cluster);
                 messagingService.setConnectionManager(connectionMgr);
 
-                topologyService.addEventHandler(new TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        staleIds.markAsStale(member.id());
-                    }
-                });
-
                 cluster.startAwait();
 
                 // emit an artificial event as if the local member has joined 
the topology (ScaleCube doesn't do that)
                 var localMembershipEvent = createAdded(cluster.member(), null, 
System.currentTimeMillis());
                 topologyService.onMembershipEvent(localMembershipEvent);
+
+                this.cluster = cluster;
+                this.connectionMgr = connectionMgr;
             }
 
             /** {@inheritDoc} */
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
similarity index 95%
rename from 
modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
rename to 
modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
index e0ed786b8a..c3c4b9b38c 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStaleIdsTest.java
@@ -42,17 +42,17 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
-class VaultStateIdsTest extends BaseIgniteAbstractTest {
+class VaultStaleIdsTest extends BaseIgniteAbstractTest {
     @Mock
     private VaultManager vaultManager;
 
     private final ByteArray staleIdsKey = new ByteArray("network.staleIds");
 
-    private VaultStateIds staleIds;
+    private VaultStaleIds staleIds;
 
     @BeforeEach
     void createObjectToTest() {
-        staleIds = new VaultStateIds(vaultManager);
+        staleIds = new VaultStaleIds(vaultManager);
     }
 
     @Test
@@ -92,7 +92,7 @@ class VaultStateIdsTest extends BaseIgniteAbstractTest {
 
     @Test
     void respectsMaxIdsLimit() {
-        staleIds = new VaultStateIds(vaultManager, 2);
+        staleIds = new VaultStaleIds(vaultManager, 2);
 
         doReturn(nullCompletedFuture()).when(vaultManager).get(staleIdsKey);
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index c72990c4c9..c5e722155e 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -436,6 +436,8 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
             Runnable beforeHandshake,
             MessageSerializationRegistry registry
     ) {
+        StaleIdDetector staleIdDetector = new AllIdsAreFresh();
+
         ClassDescriptorRegistry classDescriptorRegistry = new 
ClassDescriptorRegistry();
         ClassDescriptorFactory classDescriptorFactory = new 
ClassDescriptorFactory(classDescriptorRegistry);
         UserObjectMarshaller marshaller = new 
DefaultUserObjectMarshaller(classDescriptorRegistry, classDescriptorFactory);
@@ -444,6 +446,7 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
                 node.name(),
                 networkMessagesFactory,
                 topologyService,
+                staleIdDetector,
                 classDescriptorRegistry,
                 marshaller
         );
@@ -458,8 +461,6 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         NettyBootstrapFactory bootstrapFactory = new 
NettyBootstrapFactory(networkConfig, eventLoopGroupNamePrefix);
         bootstrapFactory.start();
 
-        StaleIdDetector staleIdDetector = new AllIdsAreFresh();
-
         UUID launchId = UUID.randomUUID();
         ConnectionManager connectionManager = new ConnectionManager(
                 networkConfig.value(),
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 6cbfcb7eea..5fce588f45 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -106,7 +106,7 @@ import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
-import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.network.recovery.VaultStaleIds;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
@@ -250,7 +250,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 networkConfiguration,
                 nettyBootstrapFactory,
                 defaultSerializationRegistry(),
-                new VaultStateIds(vault)
+                new VaultStaleIds(vault)
         );
 
         var hybridClock = new HybridClockImpl();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fe896756fa..882429c919 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -109,7 +109,7 @@ import 
org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
 import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import 
org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
-import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.network.recovery.VaultStaleIds;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import org.apache.ignite.internal.raft.Loza;
@@ -380,7 +380,7 @@ public class IgniteImpl implements Ignite {
                 networkConfiguration,
                 nettyBootstrapFactory,
                 serializationRegistry,
-                new VaultStateIds(vaultMgr)
+                new VaultStaleIds(vaultMgr)
         );
 
         clock = new HybridClockImpl();

Reply via email to