This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 29dedf63f6 IGNITE-20852 Simultaneous incoming and outgoing connection 
attempts may cause connection failure (#2850)
29dedf63f6 is described below

commit 29dedf63f6699bf9705874de7b9e990b4b0ecddc
Author: Sergey Chugunov <sergey.chugu...@gmail.com>
AuthorDate: Thu Nov 23 18:17:58 2023 +0400

    IGNITE-20852 Simultaneous incoming and outgoing connection attempts may 
cause connection failure (#2850)
    
    * Handshake protocol is extended to allow a node losing a clinch notify its 
origin
    * As a result of a handshake, the caller always gets a NettySender, even if 
the caller lost the clinch
    * If, during a handshake, a party cannot obtain a lock at its side, it 
gives the competitor way unconditionally (as the competitor has advanced 
further)
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../network/netty/ItConnectionManagerTest.java     |  41 +---
 .../network/handshake/HandshakeManager.java        |  18 +-
 .../internal/network/netty/ConnectionManager.java  |  49 +++++
 .../internal/network/netty/HandshakeHandler.java   |   6 +-
 .../ignite/internal/network/netty/NettyClient.java |   2 +-
 .../network/recovery/DescriptorAcquiry.java        |  67 +++++++
 .../network/recovery/HandshakeTieBreaker.java      |  13 +-
 .../recovery/RecoveryClientHandshakeManager.java   | 155 ++++++++++-----
 .../network/recovery/RecoveryDescriptor.java       |  35 +++-
 .../recovery/RecoveryServerHandshakeManager.java   | 127 +++++++++----
 .../recovery/message/HandshakeRejectedMessage.java |  20 +-
 ...dMessage.java => HandshakeRejectionReason.java} |  28 ++-
 .../ignite/network/DefaultMessagingService.java    |  23 +--
 .../internal/network/netty/NettyClientTest.java    |   8 +-
 .../internal/network/netty/NettyServerTest.java    |   5 +-
 .../network/netty/RecoveryHandshakeTest.java       | 108 +++++++++--
 .../network/recovery/DescriptorAcquiryTest.java    |  56 ++++++
 .../RecoveryClientHandshakeManagerTest.java        | 207 +++++++++++++++++++++
 .../network/recovery/RecoveryDescriptorTest.java   | 141 ++++++++++++++
 .../RecoveryServerHandshakeManagerTest.java        | 169 +++++++++++++++++
 20 files changed, 1070 insertions(+), 208 deletions(-)

diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 9d48f98304..399b216e09 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -70,6 +70,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
@@ -321,6 +322,7 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
      * @throws Exception If failed.
      */
     @RepeatedTest(100)
+    @Timeout(10)
     public void testOneChannelLeftIfConnectToEachOther() throws Exception {
         try (
                 ConnectionManagerWrapper manager1 = startManager(4000);
@@ -329,42 +331,11 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
             CompletableFuture<NettySender> fut1 = 
manager1.openChannelTo(manager2).toCompletableFuture();
             CompletableFuture<NettySender> fut2 = 
manager2.openChannelTo(manager1).toCompletableFuture();
 
-            NettySender sender1 = null;
-            NettySender sender2 = null;
+            NettySender sender1 = fut1.get(1, TimeUnit.SECONDS);
+            NettySender sender2 = fut2.get(1, TimeUnit.SECONDS);
 
-            try {
-                sender1 = fut1.get(1, TimeUnit.SECONDS);
-            } catch (Exception ignored) {
-                // No-op.
-            }
-            try {
-                sender2 = fut2.get(1, TimeUnit.SECONDS);
-            } catch (Exception ignored) {
-                // No-op.
-            }
-
-            NettySender highlander = null;
-
-            assertTrue(sender1 != null || sender2 != null);
-
-            if (sender1 != null && sender1.isOpen()) {
-                highlander = sender1;
-
-                boolean sender2NullOrClosed = sender2 == null || 
!sender2.isOpen();
-
-                assertTrue(sender2NullOrClosed);
-            }
-
-            if (sender2 != null && sender2.isOpen()) {
-                highlander = sender2;
-
-                boolean sender1NullOrClosed = sender1 == null || 
!sender1.isOpen();
-
-                assertTrue(sender1NullOrClosed);
-            }
-
-            assertNotNull(highlander);
-            assertTrue(highlander.isOpen());
+            assertTrue(sender1.isOpen());
+            assertTrue(sender2.isOpen());
 
             assertTrue(
                     waitForCondition(
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
index 30b3278288..c39930e52c 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.handshake;
 
 import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.network.NetworkMessage;
 
@@ -48,9 +49,20 @@ public interface HandshakeManager {
     void onMessage(NetworkMessage message);
 
     /**
-     * Returns future that represents the handshake operation.
+     * Returns local future that represents the handshake operation. This is 
the future that
+     * gets completed when the handshake itself terminates either successfully 
or with an exception.
+     * This is used to complete the current handshake; to get the final 
outcome of the connection attempt
+     * please use {@link #finalHandshakeFuture()}.
      *
-     * @return Future that represents the handshake operation.
+     * @return Local future that represents the handshake operation.
      */
-    CompletableFuture<NettySender> handshakeFuture();
+    CompletableFuture<NettySender> localHandshakeFuture();
+
+    /**
+     * Returns final future that represents the handshake operation. This 
represents completion of either
+     * current handshake or the inverse handshake if it wins (and the current 
one loses).
+     *
+     * @return Final future that represents the handshake operation.
+     */
+    CompletionStage<NettySender> finalHandshakeFuture();
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index aedaa070ea..6587b91bde 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static java.util.function.Function.identity;
 import static org.apache.ignite.network.ChannelType.getChannel;
 
 import io.netty.bootstrap.Bootstrap;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.configuration.NetworkView;
+import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
@@ -65,6 +67,8 @@ public class ConnectionManager implements 
ChannelCreationListener {
     /** Latest version of the direct marshalling protocol. */
     public static final byte DIRECT_PROTOCOL_VERSION = 1;
 
+    private static final int MAX_RETRIES_TO_OPEN_CHANNEL = 10;
+
     /** Client bootstrap. */
     private final Bootstrap clientBootstrap;
 
@@ -223,6 +227,51 @@ public class ConnectionManager implements 
ChannelCreationListener {
      * @return Sender.
      */
     public OrderingFuture<NettySender> channel(@Nullable String consistentId, 
ChannelType type, InetSocketAddress address) {
+        return getChannelWithRetry(consistentId, type, address, 0);
+    }
+
+    private OrderingFuture<NettySender> getChannelWithRetry(
+            @Nullable String consistentId,
+            ChannelType type,
+            InetSocketAddress address,
+            int attempt
+    ) {
+        if (attempt > MAX_RETRIES_TO_OPEN_CHANNEL) {
+            return OrderingFuture.failedFuture(new IllegalStateException("Too 
many attempts to open channel to " + consistentId));
+        }
+
+        return doGetChannel(consistentId, type, address)
+                .handle((res, ex) -> {
+                    if (ex instanceof ChannelAlreadyExistsException) {
+                        return 
getChannelWithRetry(((ChannelAlreadyExistsException) ex).consistentId(), type, 
address, attempt + 1);
+                    }
+                    if (ex != null && ex.getCause() instanceof 
ChannelAlreadyExistsException) {
+                        return getChannelWithRetry(
+                                ((ChannelAlreadyExistsException) 
ex.getCause()).consistentId(),
+                                type,
+                                address,
+                                attempt + 1
+                        );
+                    }
+                    if (ex != null) {
+                        return OrderingFuture.<NettySender>failedFuture(ex);
+                    }
+
+                    assert res != null;
+                    if (res.isOpen()) {
+                        return OrderingFuture.completedFuture(res);
+                    } else {
+                        return getChannelWithRetry(res.consistentId(), type, 
address, attempt + 1);
+                    }
+                })
+                .thenCompose(identity());
+    }
+
+    private OrderingFuture<NettySender> doGetChannel(
+            @Nullable String consistentId,
+            ChannelType type,
+            InetSocketAddress address
+    ) {
         // Problem is we can't look up a channel by consistent id because 
consistent id is not known yet.
         if (consistentId != null) {
             // If consistent id is known, try looking up a channel by 
consistent id. There can be an outbound connection
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index 83b60e4834..de57be659d 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -75,7 +75,7 @@ public class HandshakeHandler extends 
ChannelInboundHandlerAdapter {
             throw e;
         }
 
-        manager.handshakeFuture().whenComplete((unused, throwable) -> {
+        manager.localHandshakeFuture().whenComplete((unused, throwable) -> {
             if (throwable != null) {
                 LOG.debug("Error when performing handshake", throwable);
 
@@ -97,7 +97,7 @@ public class HandshakeHandler extends 
ChannelInboundHandlerAdapter {
     public void channelInactive(ChannelHandlerContext ctx) {
         // If this method is called that means channel has been closed before 
handshake has finished or handshake
         // has failed.
-        manager.handshakeFuture().completeExceptionally(
+        manager.localHandshakeFuture().completeExceptionally(
                 new HandshakeException("Channel has been closed before 
handshake has finished or handshake has failed")
         );
 
@@ -107,7 +107,7 @@ public class HandshakeHandler extends 
ChannelInboundHandlerAdapter {
     /** {@inheritDoc} */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        manager.handshakeFuture().completeExceptionally(cause);
+        manager.localHandshakeFuture().completeExceptionally(cause);
     }
 
     /**
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 34dba15646..2748b3f4ea 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -146,7 +146,7 @@ public class NettyClient {
                             } else if (throwable != null) {
                                 return 
CompletableFuture.<NettySender>failedFuture(throwable);
                             } else {
-                                return handshakeManager.handshakeFuture();
+                                return handshakeManager.finalHandshakeFuture();
                             }
                         }
                     })
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java
new file mode 100644
index 0000000000..0b893eb193
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.ignite.internal.network.netty.NettySender;
+
+/**
+ * Context around a fact that a {@link RecoveryDescriptor} is acquired by some 
channel.
+ */
+class DescriptorAcquiry {
+    private final Channel channel;
+    private final CompletableFuture<NettySender> handshakeCompleteFuture;
+
+    private final CompletableFuture<Void> clinchResolved = new 
CompletableFuture<>();
+
+    DescriptorAcquiry(Channel channel, CompletableFuture<NettySender> 
handshakeCompleteFuture) {
+        this.channel = channel;
+        this.handshakeCompleteFuture = handshakeCompleteFuture;
+    }
+
+    /**
+     * Returns the channel that owns the descriptor.
+     */
+    Channel channel() {
+        return channel;
+    }
+
+    /**
+     * Returns a completion stage that gets completed when a clinch associated 
with this acquiry is resolved
+     * (that is, the owning handshake gave up and released the recovery 
descriptor).
+     */
+    CompletionStage<Void> clinchResolved() {
+        return clinchResolved;
+    }
+
+    /**
+     * Signals that the owner of this recovery descriptor gave up and, hence, 
the clinch has been resolved.
+     */
+    void markClinchResolved() {
+        clinchResolved.complete(null);
+    }
+
+    /**
+     * Returns the future that gets completed when the handshake performed by 
the owner of the descriptor completes.
+     */
+    CompletionStage<NettySender> handshakeCompleteFuture() {
+        return handshakeCompleteFuture;
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
index e3bf6018ef..6210b5793b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
@@ -20,15 +20,18 @@ package org.apache.ignite.internal.network.recovery;
 import java.util.UUID;
 
 /**
- * The HandshakeTieBreaker class provides a mechanism for determining whether 
an existing channel should be closed during a handshake
- * process.
+ * The HandshakeTieBreaker class provides a mechanism for determining whether 
an existing channel should be closed in case of a clinch
+ * during a handshake process.
+ *
+ * <p>A clinch is a situation when two parallel handshakes (one from node A to 
B, another from B to A) acquire locks (now these are
+ * recovery descriptors) on different sides, then each of them tries to take a 
lock on the opposite side, which is impossible as
+ * it's already held by the corresponding competitor. To resolve such a 
deadlock, one of the handshakes must be terminated.
  */
 class HandshakeTieBreaker {
     /**
      * Determines whether an existing channel should be closed based on the 
comparison of the server's launch id and the client's launch id.
-     * If the client's launch id is greater than the server's launch id, the 
existing channel should be closed in favor of the new one. If
-     * the server's launch id is less than or equal to the client's launch id, 
the existing channel should be closed in favor of the new
-     * one.
+     * If the client's launch id is greater than the server's launch id, the 
existing channel should be closed in favor of the new one;
+     * otherwise, the new channel should be closed.
      *
      * @param serverLaunchId Server's launch id.
      * @param clientLaunchId Client's launch id.
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 67f03822d8..040e44ccaa 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.network.recovery;
 
 import static java.util.Collections.emptyList;
-import static 
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
+import static java.util.function.Function.identity;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.netty.PipelineUtils;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.lang.IgniteException;
@@ -74,7 +76,13 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
     private final short connectionId;
 
     /** Handshake completion future. */
-    private final CompletableFuture<NettySender> handshakeCompleteFuture = new 
CompletableFuture<>();
+    private final CompletableFuture<NettySender> localHandshakeCompleteFuture 
= new CompletableFuture<>();
+
+    /**
+     * Master future used to complete the handshake either with the results of 
this handshake of the competing one
+     * (in the opposite direction), if it wins.
+     */
+    private final CompletableFuture<CompletionStage<NettySender>> 
masterHandshakeCompleteFuture = new CompletableFuture<>();
 
     /** Remote node's launch id. */
     private UUID remoteLaunchId;
@@ -120,10 +128,13 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
         this.staleIdDetector = staleIdDetector;
         this.stopping = stopping;
 
-        this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
+        localHandshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
             if (throwable != null) {
                 releaseResources();
 
+                // Complete the master future if it has not yet been completed 
by the competitor.
+                
masterHandshakeCompleteFuture.complete(localHandshakeCompleteFuture);
+
                 return;
             }
 
@@ -159,22 +170,7 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
         }
 
         if (message instanceof HandshakeRejectedMessage) {
-            HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
-
-            boolean ignorable = stopping.get() || !msg.critical();
-
-            if (ignorable) {
-                LOG.debug("Handshake rejected by server: {}", msg.reason());
-            } else {
-                LOG.warn("Handshake rejected by server: {}", msg.reason());
-            }
-
-            handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(msg.reason()));
-
-            if (!ignorable) {
-                // TODO: IGNITE-16899 Perhaps we need to fail the node by 
FailureHandler
-                failureHandler.handleFailure(new IgniteException("Handshake 
rejected by server: " + msg.reason()));
-            }
+            onHandshakeRejectedMessage((HandshakeRejectedMessage) message);
 
             return;
         }
@@ -237,24 +233,23 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
                 connectionId
         );
 
-        while (!descriptor.acquire(ctx)) {
-            if (shouldCloseChannel(remoteLaunchId, launchId)) {
-                Channel holderChannel = descriptor.holderChannel();
-
-                if (holderChannel == null) {
-                    continue;
-                }
+        while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+            // Don't use the tie-braking logic as this handshake attempt is 
late: the competitor has already acquired
+            // recovery descriptors on both sides, so this handshake attempt 
must fail regardless of the Tie Breaker's opinion.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Failed to acquire recovery descriptor during 
handshake, it is held by: {}.", descriptor.holderDescription());
+            }
 
-                holderChannel.close().awaitUninterruptibly();
-            } else {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Failed to acquire recovery descriptor during 
handshake, it is held by: {}", descriptor.holderDescription());
-                }
+            DescriptorAcquiry competitorAcquiry = descriptor.holder();
+            if (competitorAcquiry == null) {
+                continue;
+            }
 
-                handshakeCompleteFuture.completeExceptionally(new 
ChannelAlreadyExistsException(remoteConsistentId));
+            // Complete our master future with the competitor's future. After 
this our local future has no effect on the final result
+            // of this handshake.
+            
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
 
-                return;
-            }
+            return;
         }
 
         this.recoveryDescriptor = descriptor;
@@ -262,25 +257,81 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
         handshake(this.recoveryDescriptor);
     }
 
+    private void 
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry 
competitorAcquiry) {
+        
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+        localHandshakeCompleteFuture.completeExceptionally(
+                new HandshakeException("Stepping aside to allow an incoming 
handshake from " + remoteConsistentId + " finish.")
+        );
+    }
+
     private void handleStaleServerId(HandshakeStartMessage msg) {
-        String reason = msg.consistentId() + ":" + msg.launchId() + " is 
stale, server should be restarted so that clients can connect";
+        String message = msg.consistentId() + ":" + msg.launchId() + " is 
stale, server should be restarted so that clients can connect";
         HandshakeRejectedMessage rejectionMessage = 
MESSAGE_FACTORY.handshakeRejectedMessage()
-                .critical(true)
-                .reason(reason)
+                .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+                .message(message)
                 .build();
 
-        sendHandshakeRejectedMessage(rejectionMessage, reason);
+        sendHandshakeRejectedMessage(rejectionMessage, message);
     }
 
     private void 
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
-        String reason = msg.consistentId() + ":" + msg.launchId() + " tried to 
establish a connection with " + consistentId
+        String message = msg.consistentId() + ":" + msg.launchId() + " tried 
to establish a connection with " + consistentId
                 + ", but it's stopping";
         HandshakeRejectedMessage rejectionMessage = 
MESSAGE_FACTORY.handshakeRejectedMessage()
-                .critical(false)
-                .reason(reason)
+                .reasonString(HandshakeRejectionReason.STOPPING.name())
+                .message(message)
                 .build();
 
-        sendHandshakeRejectedMessage(rejectionMessage, reason);
+        sendHandshakeRejectedMessage(rejectionMessage, message);
+    }
+
+    private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+        boolean ignorable = stopping.get() || !msg.reason().critical();
+
+        if (ignorable) {
+            LOG.debug("Handshake rejected by server: {}", msg.message());
+        } else {
+            LOG.warn("Handshake rejected by server: {}", msg.message());
+        }
+
+        if (msg.reason() == HandshakeRejectionReason.CLINCH) {
+            giveUpClinch();
+        } else {
+            localHandshakeCompleteFuture.completeExceptionally(new 
HandshakeException(msg.message()));
+        }
+
+        if (!ignorable) {
+            // TODO: IGNITE-16899 Perhaps we need to fail the node by 
FailureHandler
+            failureHandler.handleFailure(new IgniteException("Handshake 
rejected by server: " + msg.message()));
+        }
+    }
+
+    private void giveUpClinch() {
+        RecoveryDescriptor descriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(
+                remoteConsistentId,
+                remoteLaunchId,
+                connectionId
+        );
+
+        DescriptorAcquiry myAcquiry = descriptor.holder();
+        assert myAcquiry != null;
+        assert myAcquiry.channel() == ctx.channel() : "Expected the descriptor 
to be held by current channel " + ctx.channel()
+                + ", but it's held by another channel " + myAcquiry.channel();
+
+        descriptor.release(ctx);
+
+        // Complete the future to allow the competitor that should wait on it 
acquire the descriptor and finish its handshake.
+        myAcquiry.markClinchResolved();
+
+        DescriptorAcquiry competitorAcquiry = descriptor.holder();
+        if (competitorAcquiry != null) {
+            // The competitor is available, so just complete our master future 
with the competitor future.
+            
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
+        } else {
+            // The competitor is not at the lock yet. Maybe it will arrive 
soon, maybe it will never arrive.
+            // The safest thing is to just retry the whole handshake procedure.
+            localHandshakeCompleteFuture.completeExceptionally(new 
ChannelAlreadyExistsException(remoteConsistentId));
+        }
     }
 
     private void sendHandshakeRejectedMessage(HandshakeRejectedMessage 
rejectionMessage, String reason) {
@@ -288,19 +339,25 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, 
throwable) -> {
             if (throwable != null) {
-                handshakeCompleteFuture.completeExceptionally(
+                localHandshakeCompleteFuture.completeExceptionally(
                         new HandshakeException("Failed to send handshake 
rejected message: " + throwable.getMessage(), throwable)
                 );
             } else {
-                handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(reason));
+                localHandshakeCompleteFuture.completeExceptionally(new 
HandshakeException(reason));
             }
         });
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
-        return handshakeCompleteFuture;
+    public CompletableFuture<NettySender> localHandshakeFuture() {
+        return localHandshakeCompleteFuture;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletionStage<NettySender> finalHandshakeFuture() {
+        return masterHandshakeCompleteFuture.thenCompose(identity());
     }
 
     private void handshake(RecoveryDescriptor descriptor) {
@@ -317,7 +374,7 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, 
throwable) -> {
             if (throwable != null) {
-                handshakeCompleteFuture.completeExceptionally(
+                localHandshakeCompleteFuture.completeExceptionally(
                         new HandshakeException("Failed to send handshake 
response: " + throwable.getMessage(), throwable)
                 );
             }
@@ -340,6 +397,8 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
         // Removes handshake handler from the pipeline as the handshake is 
finished
         this.ctx.pipeline().remove(this.handler);
 
-        handshakeCompleteFuture.complete(new NettySender(channel, 
remoteLaunchId.toString(), remoteConsistentId, connectionId));
+        // Complete the master future with the local future of the current 
handshake as there was no competitor (or we won the competition).
+        masterHandshakeCompleteFuture.complete(localHandshakeCompleteFuture);
+        localHandshakeCompleteFuture.complete(new NettySender(channel, 
remoteLaunchId.toString(), remoteConsistentId, connectionId));
     }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
index 463f2ed49c..33c03fad2a 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
@@ -23,7 +23,9 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.network.OutNetworkObject;
 import org.jetbrains.annotations.Nullable;
@@ -44,8 +46,8 @@ public class RecoveryDescriptor {
     /** Count of received messages. */
     private long receivedCount;
 
-    /** Current owner channel of this descriptor. */
-    private final AtomicReference<Channel> channelHolder = new 
AtomicReference<>();
+    /** Some context around current owner channel of this descriptor. */
+    private final AtomicReference<DescriptorAcquiry> channelHolder = new 
AtomicReference<>();
 
     /**
      * Constructor.
@@ -138,22 +140,35 @@ public class RecoveryDescriptor {
      * @param ctx Channel handler context.
      */
     public void release(ChannelHandlerContext ctx) {
-        channelHolder.compareAndSet(ctx.channel(), null);
+        DescriptorAcquiry oldAcquiry = channelHolder.getAndUpdate(acquiry -> {
+            if (acquiry != null && acquiry.channel() == ctx.channel()) {
+                return null;
+            }
+
+            return acquiry;
+        });
+
+        if (oldAcquiry != null && oldAcquiry.channel() == ctx.channel()) {
+            // We have successfully released the descriptor.
+            // Let's mark the clinch resolved just in case.
+            oldAcquiry.markClinchResolved();
+        }
     }
 
     /**
      * Acquire this descriptor.
      *
      * @param ctx Channel handler context.
+     * @param handshakeCompleteFuture Future that gets completed when the 
corresponding handshake completes.
      */
-    public boolean acquire(ChannelHandlerContext ctx) {
-        return channelHolder.compareAndSet(null, ctx.channel());
+    public boolean acquire(ChannelHandlerContext ctx, 
CompletableFuture<NettySender> handshakeCompleteFuture) {
+        return channelHolder.compareAndSet(null, new 
DescriptorAcquiry(ctx.channel(), handshakeCompleteFuture));
     }
 
     /**
-     * Returns the channel, that holds this descriptor.
+     * Returns context around the channel that holds this descriptor.
      */
-    @Nullable Channel holderChannel() {
+    @Nullable DescriptorAcquiry holder() {
         return channelHolder.get();
     }
 
@@ -161,13 +176,13 @@ public class RecoveryDescriptor {
      * Returns {@code toString()} representation of a {@link Channel}, that 
holds this descriptor.
      */
     String holderDescription() {
-        Channel channel = channelHolder.get();
+        DescriptorAcquiry acquiry = channelHolder.get();
 
-        if (channel == null) {
+        if (acquiry == null) {
             // This can happen if channel was already closed and it released 
the descriptor.
             return "No channel";
         }
 
-        return channel.toString();
+        return acquiry.channel().toString();
     }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index f65948eed2..36693c9cea 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.netty.PipelineUtils;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.lang.IgniteException;
@@ -52,6 +54,8 @@ import org.apache.ignite.network.OutNetworkObject;
 public class RecoveryServerHandshakeManager implements HandshakeManager {
     private static final IgniteLogger LOG = 
Loggers.forClass(RecoveryServerHandshakeManager.class);
 
+    private static final int MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS = 1000;
+
     /** Launch id. */
     private final UUID launchId;
 
@@ -180,22 +184,7 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
         }
 
         if (message instanceof HandshakeRejectedMessage) {
-            HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
-
-            boolean ignorable = stopping.get() || !msg.critical();
-
-            if (ignorable) {
-                LOG.debug("Handshake rejected by client: {}", msg.reason());
-            } else {
-                LOG.warn("Handshake rejected by client: {}", msg.reason());
-            }
-
-            handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(msg.reason()));
-
-            if (!ignorable) {
-                // TODO: IGNITE-16899 Perhaps we need to fail the node by 
FailureHandler
-                failureHandler.handleFailure(new IgniteException("Handshake 
rejected by client: " + msg.reason()));
-            }
+            onHandshakeRejectedMessage((HandshakeRejectedMessage) message);
 
             return;
         }
@@ -232,27 +221,84 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
         this.receivedCount = remoteReceivedCount;
         this.remoteChannelId = remoteChannelId;
 
+        tryAcquireDescriptorAndFinishHandshake();
+    }
+
+    private void handleStaleClientId(HandshakeStartResponseMessage msg) {
+        String message = msg.consistentId() + ":" + msg.launchId() + " is 
stale, client should be restarted to be allowed to connect";
+        HandshakeRejectedMessage rejectionMessage = 
messageFactory.handshakeRejectedMessage()
+                .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+                .message(message)
+                .build();
+
+        sendHandshakeRejectedMessage(rejectionMessage, message);
+    }
+
+    private void 
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage 
msg) {
+        String message = msg.consistentId() + ":" + msg.launchId() + " tried 
to establish a connection with " + consistentId
+                + ", but it's stopping";
+        HandshakeRejectedMessage rejectionMessage = 
messageFactory.handshakeRejectedMessage()
+                .reasonString(HandshakeRejectionReason.STOPPING.name())
+                .message(message)
+                .build();
+
+        sendHandshakeRejectedMessage(rejectionMessage, message);
+    }
+
+    private void tryAcquireDescriptorAndFinishHandshake() {
+        tryAcquireDescriptorAndFinishHandshake(0);
+    }
+
+    private void tryAcquireDescriptorAndFinishHandshake(int attempt) {
+        if (attempt > MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS) {
+            throw new IllegalStateException("Too many attempts during 
handshake from " + remoteConsistentId + "(" + remoteLaunchId
+                    + ":" + remoteChannelId + ") via " + ctx.channel());
+        }
+
         RecoveryDescriptor descriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(
                 this.remoteConsistentId,
                 this.remoteLaunchId,
                 this.remoteChannelId
         );
 
-        while (!descriptor.acquire(ctx)) {
+        while (!descriptor.acquire(ctx, handshakeCompleteFuture)) {
             if (shouldCloseChannel(launchId, remoteLaunchId)) {
-                Channel holderChannel = descriptor.holderChannel();
+                // A competitor is holding the descriptor and we win the 
clinch; so we need to wait on the 'clinch resolved' future till
+                // the competitor realises it should terminate (this 
realization will happen on the other side of the channel), send
+                // the corresponding message to this node, terminate its 
handshake and complete the 'clinch resolved' future.
+                DescriptorAcquiry competitorAcquiry = descriptor.holder();
 
-                if (holderChannel == null) {
+                if (competitorAcquiry == null) {
                     continue;
                 }
 
-                holderChannel.close().awaitUninterruptibly();
+                competitorAcquiry.clinchResolved().whenComplete((res, ex) -> {
+                    // The competitor has finished terminating its handshake, 
it must've already released the descriptor,
+                    // so let's try again.
+                    if (ctx.executor().inEventLoop()) {
+                        tryAcquireDescriptorAndFinishHandshake(attempt + 1);
+                    } else {
+                        ctx.executor().execute(() -> 
tryAcquireDescriptorAndFinishHandshake(attempt + 1));
+                    }
+                });
+
+                return;
             } else {
-                String err = "Failed to acquire recovery descriptor during 
handshake, it is held by: " + descriptor.holderDescription();
+                // A competitor is holding the descriptor and we lose the 
clinch; so we need to send the correspnding message
+                // to the other side, where the code handling the message will 
terminate our handshake and complete the 'clinch resolved'
+                // future, making it possible for the competitor to proceed 
and finish the handshake.
+                String localErrorMessage = "Failed to acquire recovery 
descriptor during handshake, it is held by: "
+                        + descriptor.holderDescription();
+
+                LOG.debug(localErrorMessage);
 
-                LOG.info(err);
+                HandshakeRejectedMessage rejectionMessage = 
messageFactory.handshakeRejectedMessage()
+                        .reasonString(HandshakeRejectionReason.CLINCH.name())
+                        .message("Handshake clinch detected, this handshake 
will be terminated, winning channel is "
+                                + descriptor.holderDescription())
+                        .build();
 
-                handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(err));
+                sendHandshakeRejectedMessage(rejectionMessage, 
localErrorMessage);
 
                 return;
             }
@@ -263,25 +309,21 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
         handshake(descriptor);
     }
 
-    private void handleStaleClientId(HandshakeStartResponseMessage msg) {
-        String reason = msg.consistentId() + ":" + msg.launchId() + " is 
stale, client should be restarted to be allowed to connect";
-        HandshakeRejectedMessage rejectionMessage = 
messageFactory.handshakeRejectedMessage()
-                .critical(true)
-                .reason(reason)
-                .build();
+    private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+        boolean ignorable = stopping.get() || !msg.reason().critical();
 
-        sendHandshakeRejectedMessage(rejectionMessage, reason);
-    }
+        if (ignorable) {
+            LOG.debug("Handshake rejected by client: {}", msg.message());
+        } else {
+            LOG.warn("Handshake rejected by client: {}", msg.message());
+        }
 
-    private void 
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage 
msg) {
-        String reason = msg.consistentId() + ":" + msg.launchId() + " tried to 
establish a connection with " + consistentId
-                + ", but it's stopping";
-        HandshakeRejectedMessage rejectionMessage = 
messageFactory.handshakeRejectedMessage()
-                .critical(false)
-                .reason(reason)
-                .build();
+        handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(msg.message()));
 
-        sendHandshakeRejectedMessage(rejectionMessage, reason);
+        if (!ignorable) {
+            // TODO: IGNITE-16899 Perhaps we need to fail the node by 
FailureHandler
+            failureHandler.handleFailure(new IgniteException("Handshake 
rejected by client: " + msg.message()));
+        }
     }
 
     private void sendHandshakeRejectedMessage(HandshakeRejectedMessage 
rejectionMessage, String reason) {
@@ -353,7 +395,12 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
+    public CompletableFuture<NettySender> localHandshakeFuture() {
+        return handshakeCompleteFuture;
+    }
+
+    @Override
+    public CompletionStage<NettySender> finalHandshakeFuture() {
         return handshakeCompleteFuture;
     }
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
index 9871de3597..e83abeb8ef 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
@@ -24,21 +24,25 @@ import org.apache.ignite.network.annotations.Transferable;
 /**
  * Handshake rejected message, contains the reason for a rejection.
  * This message is sent from a server to a client or wise versa.
- * After this message is received it makes no sense to retry connections with 
same node identity (launch ID must be changed
- * to make a retry).
  */
 @Transferable(HANDSHAKE_REJECTED)
 public interface HandshakeRejectedMessage extends InternalMessage {
     /**
-     * Returns rejection reason.
+     * Returns rejection message.
+     */
+    String message();
+
+    /**
+     * Returns rejection reason (this is the name of a member of {@link 
HandshakeRejectionReason}).
      *
-     * @return Reason of the rejection.
+     * @see HandshakeRejectionReason
      */
-    String reason();
+    String reasonString();
 
     /**
-     * Returns {@code true} iff the rejection is not expected and should be 
treated as a critical failure (requiring
-     * the rejected node to restart).
+     * Returns rejection reason.
      */
-    boolean critical();
+    default HandshakeRejectionReason reason() {
+        return HandshakeRejectionReason.valueOf(reasonString());
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
similarity index 61%
copy from 
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
index 9871de3597..b70fea80aa 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
@@ -17,28 +17,26 @@
 
 package org.apache.ignite.internal.network.recovery.message;
 
-import static 
org.apache.ignite.internal.network.NetworkMessageTypes.HANDSHAKE_REJECTED;
-
-import org.apache.ignite.network.annotations.Transferable;
-
 /**
- * Handshake rejected message, contains the reason for a rejection.
- * This message is sent from a server to a client or wise versa.
- * After this message is received it makes no sense to retry connections with 
same node identity (launch ID must be changed
- * to make a retry).
+ * Reason for handshake rejection.
  */
-@Transferable(HANDSHAKE_REJECTED)
-public interface HandshakeRejectedMessage extends InternalMessage {
+public enum HandshakeRejectionReason {
+    /** The sender is stopping. */
+    STOPPING,
     /**
-     * Returns rejection reason.
-     *
-     * @return Reason of the rejection.
+     * The sender has detected that the counterpart launch ID is stale (was 
earlier used to establish a connection).
+     * After this is received it makes no sense to retry connections with same 
node identity (launch ID must be changed
+     * to make a retry).
      */
-    String reason();
+    STALE_LAUNCH_ID,
+    /** The sender has detected a clinch and decided to terminate this 
handshake in favor of the competitor. */
+    CLINCH;
 
     /**
      * Returns {@code true} iff the rejection is not expected and should be 
treated as a critical failure (requiring
      * the rejected node to restart).
      */
-    boolean critical();
+    public boolean critical() {
+        return this == STALE_LAUNCH_ID;
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 6fb114ae90..b5f52fa336 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -28,7 +28,6 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -37,12 +36,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
-import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
@@ -298,24 +295,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             return failedFuture(new IgniteException("Failed to marshal 
message: " + e.getMessage(), e));
         }
 
-        OrderingFuture<NettySender> channel = 
connectionManager.channel(consistentId, type, addr);
-
-        return channel.handle((sender, throwable) -> {
-            if (throwable != null) {
-                if (throwable instanceof CompletionException && 
throwable.getCause() instanceof ChannelAlreadyExistsException) {
-                    ChannelAlreadyExistsException e = 
(ChannelAlreadyExistsException) throwable.getCause();
-
-                    OrderingFuture<NettySender> channelFut = 
connectionManager.channel(e.consistentId(), type, addr);
-
-                    return channelFut.thenComposeToCompletable(nettySender -> {
-                        return nettySender.send(new OutNetworkObject(message, 
descriptors));
-                    });
-                }
-
-                throw new CompletionException(throwable);
-            }
-            return sender.send(new OutNetworkObject(message, descriptors));
-        }).thenComposeToCompletable(Function.identity());
+        return connectionManager.channel(consistentId, type, addr)
+                .thenComposeToCompletable(sender -> sender.send(new 
OutNetworkObject(message, descriptors)));
     }
 
     private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws 
Exception {
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index b407968e19..f1f69b9c4f 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -30,6 +30,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -267,10 +268,15 @@ public class NettyClientTest extends 
BaseIgniteAbstractTest {
 
         /** {@inheritDoc} */
         @Override
-        public CompletableFuture<NettySender> handshakeFuture() {
+        public CompletableFuture<NettySender> localHandshakeFuture() {
             return CompletableFuture.completedFuture(sender);
         }
 
+        @Override
+        public CompletionStage<NettySender> finalHandshakeFuture() {
+            return localHandshakeFuture();
+        }
+
         /** {@inheritDoc} */
         @Override
         public void onInit(ChannelHandlerContext ctx) {
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index bf58a812d6..a8a8431933 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -149,7 +149,8 @@ public class NettyServerTest extends BaseIgniteAbstractTest 
{
     public void testHandshakeManagerInvoked() throws Exception {
         HandshakeManager handshakeManager = mock(HandshakeManager.class);
 
-        
when(handshakeManager.handshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
+        
when(handshakeManager.localHandshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
+        
when(handshakeManager.finalHandshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
 
         MessageSerializationRegistry registry = 
mock(MessageSerializationRegistry.class);
 
@@ -218,7 +219,7 @@ public class NettyServerTest extends BaseIgniteAbstractTest 
{
 
         order.verify(handshakeManager, timeout()).onInit(any());
         order.verify(handshakeManager, timeout()).onConnectionOpen();
-        order.verify(handshakeManager, timeout()).handshakeFuture();
+        order.verify(handshakeManager, timeout()).localHandshakeFuture();
         order.verify(handshakeManager, timeout()).onMessage(any());
     }
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 305a43c2b2..4d4f9cc922 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -54,6 +54,8 @@ import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Recovery protocol handshake flow test.
@@ -62,6 +64,9 @@ public class RecoveryHandshakeTest {
     /** Connection id. */
     private static final short CONNECTION_ID = 1337;
 
+    private static final UUID LOWER_UUID = new UUID(100, 200);
+    private static final UUID HIGHER_UUID = new UUID(300, 400);
+
     /** Serialization registry. */
     private static final MessageSerializationRegistry MESSAGE_REGISTRY = 
defaultSerializationRegistry();
 
@@ -209,12 +214,12 @@ public class RecoveryHandshakeTest {
     }
 
     @Test
-    public void testPairedRecoveryDescriptors() throws Exception {
+    public void testPairedRecoveryDescriptorsClinch() throws Exception {
         RecoveryDescriptorProvider node1Recovery = 
createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider node2Recovery = 
createRecoveryDescriptorProvider();
 
-        UUID node1Uuid = new UUID(100, 200);
-        UUID node2Uuid = new UUID(300, 400);
+        UUID node1Uuid = LOWER_UUID;
+        UUID node2Uuid = HIGHER_UUID;
 
         RecoveryClientHandshakeManager chm1 = 
createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
         RecoveryServerHandshakeManager shm1 = 
createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
@@ -222,14 +227,14 @@ public class RecoveryHandshakeTest {
         RecoveryClientHandshakeManager chm2 = 
createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
         RecoveryServerHandshakeManager shm2 = 
createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
 
-        // Channel opened from node1 to node2 - channel 1.
-        // Channel opened from node2 to node1 - channel 2.
+        // Channel opened from node1 to node2 is channel 1.
+        // Channel opened from node2 to node1 is channel 2.
 
         // Channel 1.
         EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
         EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
 
-        // Channel 1.
+        // Channel 2.
         EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
         EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
 
@@ -239,7 +244,8 @@ public class RecoveryHandshakeTest {
         exchangeClientToServer(channel2Dst, channel2Src);
         exchangeClientToServer(channel1Dst, channel1Src);
 
-        // 2 -> 1 is alive, while 1 -> 2 closes because of the tie-breaking.
+        // 2 -> 1 (Channel 2) is alive, while 1 -> 2 (Channel 1) closes 
because of the tie-breaking.
+        exchangeServerToClient(channel1Dst, channel1Src);
         exchangeServerToClient(channel2Dst, channel2Src);
         assertFalse(channel1Src.isOpen());
         assertFalse(channel1Dst.isOpen());
@@ -253,6 +259,60 @@ public class RecoveryHandshakeTest {
         assertFalse(channel1Dst.finish());
     }
 
+    /**
+     * This tests the following scenario: two handshakes in the opposite 
directions are started,
+     * Handshake 1 is faster and it takes both client-side and server-side 
locks (using recovery descriptors
+     * as locks), and only then Handshake 2 tries to take the first lock (the 
one on the client side).
+     * In such a situation, tie-breaking logic should not be applied (as 
Handshake 1 could have already
+     * established, or almost established, a logical connection); instead, 
Handshake 2 must stop
+     * itself (regardless of what that the Tie Breaker would prescribe).
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testLateHandshakeDoesNotUseTieBreaker(boolean 
node1LaunchIdIsLower) throws Exception {
+        RecoveryDescriptorProvider node1Recovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider node2Recovery = 
createRecoveryDescriptorProvider();
+
+        UUID node1Uuid = node1LaunchIdIsLower ? LOWER_UUID : HIGHER_UUID;
+        UUID node2Uuid = node1LaunchIdIsLower ? HIGHER_UUID : LOWER_UUID;
+
+        RecoveryClientHandshakeManager chm1 = 
createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
+        RecoveryServerHandshakeManager shm1 = 
createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+
+        RecoveryClientHandshakeManager chm2 = 
createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
+        RecoveryServerHandshakeManager shm2 = 
createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+
+        // Channel opened from node1 to node2 is channel 1.
+        // Channel opened from node2 to node1 is channel 2.
+
+        // Channel 1.
+        EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
+        EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
+
+        // Channel 2.
+        EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
+        EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
+
+        // Channel 2's handshake acquires both locks.
+        exchangeServerToClient(channel2Dst, channel2Src);
+        exchangeClientToServer(channel2Dst, channel2Src);
+
+        // Now Channel 1's handshake cannot acquire even first lock.
+        exchangeServerToClient(channel1Dst, channel1Src);
+
+        // 2 -> 1 is alive, while 1 -> 2 closes because it is late.
+        exchangeServerToClient(channel2Dst, channel2Src);
+        assertFalse(channel1Src.isOpen());
+
+        assertTrue(channel2Src.isOpen());
+        assertTrue(channel2Dst.isOpen());
+
+        assertFalse(channel1Src.finish());
+        assertFalse(channel2Dst.finish());
+        assertFalse(channel2Src.finish());
+        assertFalse(channel1Dst.finish());
+    }
+
     @Test
     public void testExactlyOnceServer() throws Exception {
         testExactlyOnce(true);
@@ -464,25 +524,41 @@ public class RecoveryHandshakeTest {
     }
 
     private void checkHandshakeNotCompleted(HandshakeManager manager) {
-        CompletableFuture<NettySender> handshakeFuture = 
manager.handshakeFuture();
-        assertFalse(handshakeFuture.isDone());
-        assertFalse(handshakeFuture.isCompletedExceptionally());
-        assertFalse(handshakeFuture.isCancelled());
+        CompletableFuture<NettySender> localHandshakeFuture = 
manager.localHandshakeFuture();
+        assertFalse(localHandshakeFuture.isDone());
+        assertFalse(localHandshakeFuture.isCompletedExceptionally());
+        assertFalse(localHandshakeFuture.isCancelled());
+
+        CompletableFuture<NettySender> finalHandshakeFuture = 
manager.finalHandshakeFuture().toCompletableFuture();
+        assertFalse(finalHandshakeFuture.isDone());
+        assertFalse(finalHandshakeFuture.isCompletedExceptionally());
+        assertFalse(finalHandshakeFuture.isCancelled());
     }
 
     private void checkHandshakeCompleted(HandshakeManager manager) {
-        CompletableFuture<NettySender> handshakeFuture = 
manager.handshakeFuture();
-        assertTrue(handshakeFuture.isDone());
-        assertFalse(handshakeFuture.isCompletedExceptionally());
-        assertFalse(handshakeFuture.isCancelled());
+        CompletableFuture<NettySender> localHandshakeFuture = 
manager.localHandshakeFuture();
+        assertTrue(localHandshakeFuture.isDone());
+        assertFalse(localHandshakeFuture.isCompletedExceptionally());
+        assertFalse(localHandshakeFuture.isCancelled());
+
+        CompletableFuture<NettySender> finalHandshakeFuture = 
manager.finalHandshakeFuture().toCompletableFuture();
+        assertTrue(finalHandshakeFuture.isDone());
+        assertFalse(finalHandshakeFuture.isCompletedExceptionally());
+        assertFalse(finalHandshakeFuture.isCancelled());
     }
 
     private void checkHandshakeCompletedExceptionally(HandshakeManager 
manager) {
-        CompletableFuture<NettySender> handshakeFuture = 
manager.handshakeFuture();
+        CompletableFuture<NettySender> handshakeFuture = 
manager.localHandshakeFuture();
 
         assertTrue(handshakeFuture.isDone());
         assertTrue(handshakeFuture.isCompletedExceptionally());
         assertFalse(handshakeFuture.isCancelled());
+
+        CompletableFuture<NettySender> finalHandshakeFuture = 
manager.finalHandshakeFuture().toCompletableFuture();
+
+        assertTrue(finalHandshakeFuture.isDone());
+        assertTrue(finalHandshakeFuture.isCompletedExceptionally());
+        assertFalse(finalHandshakeFuture.isCancelled());
     }
 
     private void addUnacknowledgedMessages(RecoveryDescriptor 
recoveryDescriptor) {
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java
new file mode 100644
index 0000000000..349a34957a
--- /dev/null
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DescriptorAcquiryTest extends BaseIgniteAbstractTest {
+    @Mock
+    private Channel channel;
+
+    private final CompletableFuture<NettySender> handshakeCompleteFuture = new 
CompletableFuture<>();
+
+    @Test
+    void clinchResolvedStagedIsInitiallyIncomplete() {
+        DescriptorAcquiry acquiry = new DescriptorAcquiry(channel, 
handshakeCompleteFuture);
+
+        assertThat(acquiry.clinchResolved().toCompletableFuture(), 
is(not(completedFuture())));
+    }
+
+    @Test
+    void clinchGetsResolved() {
+        DescriptorAcquiry acquiry = new DescriptorAcquiry(channel, 
handshakeCompleteFuture);
+
+        acquiry.markClinchResolved();
+
+        assertThat(acquiry.clinchResolved().toCompletableFuture(), 
is(completedFuture()));
+    }
+}
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
new file mode 100644
index 0000000000..f728b7b94b
--- /dev/null
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.NettySender;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(10)
+class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
+    private static final UUID LOWER_ID = new UUID(1, 1);
+    private static final UUID HIGHER_ID = new UUID(2, 2);
+
+    private static final String CLIENT_CONSISTENT_ID = "client";
+    private static final String SERVER_CONSISTENT_ID = "server";
+
+    private static final short CONNECTION_INDEX = 0;
+
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new 
NetworkMessagesFactory();
+
+    @Mock
+    private Channel thisChannel;
+    @Mock
+    private Channel competitorChannel;
+
+    @Mock
+    private ChannelHandlerContext thisContext;
+    @Mock
+    private ChannelHandlerContext competitorContext;
+
+    @Mock
+    private ChannelCreationListener channelCreationListener;
+
+    @Mock
+    private RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+    @Mock
+    private EventExecutor eventExecutor;
+
+    @Mock
+    private NettySender competitorNettySender;
+
+    private final RecoveryDescriptor recoveryDescriptor = new 
RecoveryDescriptor(100);
+
+    @BeforeEach
+    void initMocks() {
+        lenient().when(thisContext.channel()).thenReturn(thisChannel);
+        
lenient().when(competitorContext.channel()).thenReturn(competitorChannel);
+
+        lenient().when(thisContext.executor()).thenReturn(eventExecutor);
+        lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
+
+        lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(any(), 
any(), anyShort()))
+                .thenReturn(recoveryDescriptor);
+    }
+
+    /**
+     * This tests the following scenario: two handshakes in the opposite 
directions are started,
+     * Handshake 1 is faster and it takes both client-side and server-side 
locks (using recovery descriptors
+     * as locks), and only then Handshake 2 tries to take the first lock (the 
one on the client side).
+     * In such a situation, tie-breaking logic should not be applied (as 
Handshake 1 could have already
+     * established, or almost established, a logical connection); instead, 
Handshake 2 must stop
+     * itself (regardless of what that the Tie Breaker would prescribe).
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void terminatesCurrentHandshakeWhenCannotAcquireLockAtClientSide(boolean 
clientLaunchIdIsLower) {
+        UUID clientLaunchId = clientLaunchIdIsLower ? LOWER_ID : HIGHER_ID;
+        UUID serverLaunchId = clientLaunchIdIsLower ? HIGHER_ID : LOWER_ID;
+
+        RecoveryClientHandshakeManager manager = 
clientHandshakeManager(clientLaunchId);
+        CompletableFuture<NettySender> localHandshakeFuture = 
manager.localHandshakeFuture();
+        CompletionStage<NettySender> finalHandshakeFuture = 
manager.finalHandshakeFuture();
+
+        recoveryDescriptor.acquire(thisContext, 
completedFuture(competitorNettySender));
+
+        manager.onMessage(handshakeStartMessageFrom(serverLaunchId));
+
+        verify(thisChannel, never()).close();
+        verify(thisChannel, never()).close(any(ChannelPromise.class));
+
+        HandshakeException ex = assertWillThrowFast(localHandshakeFuture, 
HandshakeException.class);
+        assertThat(ex.getMessage(), is("Stepping aside to allow an incoming 
handshake from server finish."));
+
+        assertThat(finalHandshakeFuture.toCompletableFuture(), 
willCompleteSuccessfully());
+        assertThat(finalHandshakeFuture.toCompletableFuture().join(), 
is(competitorNettySender));
+    }
+
+    private RecoveryClientHandshakeManager clientHandshakeManager(UUID 
launchId) {
+        RecoveryClientHandshakeManager manager = new 
RecoveryClientHandshakeManager(
+                launchId,
+                CLIENT_CONSISTENT_ID,
+                CONNECTION_INDEX,
+                recoveryDescriptorProvider,
+                new AllIdsAreFresh(),
+                channelCreationListener,
+                new AtomicBoolean(false)
+        );
+
+        manager.onInit(thisContext);
+
+        return manager;
+    }
+
+    private static HandshakeStartMessage handshakeStartMessageFrom(UUID 
serverLaunchId) {
+        return MESSAGE_FACTORY.handshakeStartMessage()
+                .launchId(serverLaunchId)
+                .consistentId(SERVER_CONSISTENT_ID)
+                .build();
+    }
+
+    @Test
+    void 
switchesToCompetitorFutureWhenRejectedDueToClinchAndCompetitorIsHere() {
+        RecoveryClientHandshakeManager manager = 
clientHandshakeManager(randomUUID());
+        CompletableFuture<NettySender> localHandshakeFuture = 
manager.localHandshakeFuture();
+        CompletionStage<NettySender> finalHandshakeFuture = 
manager.finalHandshakeFuture();
+
+        recoveryDescriptor.acquire(thisContext, new CompletableFuture<>());
+
+        DescriptorAcquiry thisAcquiry = recoveryDescriptor.holder();
+        assertThat(thisAcquiry, notNullValue());
+        thisAcquiry.clinchResolved().whenComplete(((unused, ex) -> {
+            assertThat(recoveryDescriptor.acquire(competitorContext, 
completedFuture(competitorNettySender)), is(true));
+        }));
+
+        manager.onMessage(handshakeRejectedMessageDueToClinchFrom());
+
+        HandshakeException ex = assertWillThrowFast(localHandshakeFuture, 
HandshakeException.class);
+        assertThat(ex.getMessage(), startsWith("Stepping aside to allow an 
incoming handshake from "));
+
+        assertThat(finalHandshakeFuture.toCompletableFuture(), 
willCompleteSuccessfully());
+        assertThat(finalHandshakeFuture.toCompletableFuture().join(), 
is(competitorNettySender));
+    }
+
+    @Test
+    void 
finishesWithChannelAlreadyExistsExceptionWhenRejectedDueToClinchAndCompetitorIsNotHere()
 {
+        RecoveryClientHandshakeManager manager = 
clientHandshakeManager(randomUUID());
+        CompletableFuture<NettySender> localHandshakeFuture = 
manager.localHandshakeFuture();
+        CompletionStage<NettySender> finalHandshakeFuture = 
manager.finalHandshakeFuture();
+
+        recoveryDescriptor.acquire(thisContext, new CompletableFuture<>());
+
+        manager.onMessage(handshakeRejectedMessageDueToClinchFrom());
+
+        assertWillThrowFast(localHandshakeFuture, 
ChannelAlreadyExistsException.class);
+        assertWillThrowFast(finalHandshakeFuture.toCompletableFuture(), 
ChannelAlreadyExistsException.class);
+    }
+
+    private static HandshakeRejectedMessage 
handshakeRejectedMessageDueToClinchFrom() {
+        return MESSAGE_FACTORY.handshakeRejectedMessage()
+                .reasonString(HandshakeRejectionReason.CLINCH.name())
+                .message("Rejected")
+                .build();
+    }
+}
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java
new file mode 100644
index 0000000000..1261b637f0
--- /dev/null
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.lenient;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class RecoveryDescriptorTest extends BaseIgniteAbstractTest {
+    private final RecoveryDescriptor descriptor = new RecoveryDescriptor(100);
+
+    @Mock
+    private Channel channel1;
+    @Mock
+    private Channel channel2;
+
+    @Mock
+    private ChannelHandlerContext context1;
+    @Mock
+    private ChannelHandlerContext context2;
+
+    private final CompletableFuture<NettySender> handshakeCompleteFuture1 = 
new CompletableFuture<>();
+    private final CompletableFuture<NettySender> handshakeCompleteFuture2 = 
new CompletableFuture<>();
+
+    @BeforeEach
+    void setupMocks() {
+        lenient().when(context1.channel()).thenReturn(channel1);
+        lenient().when(context2.channel()).thenReturn(channel2);
+    }
+
+    @Test
+    void acquiresNonAcquiredDescriptor() {
+        assertTrue(descriptor.acquire(context1, handshakeCompleteFuture1));
+    }
+
+    @Test
+    void acquiryIsAbsentOnNonAcquiredDescriptor() {
+        assertThat(descriptor.holder(), is(nullValue()));
+    }
+
+    @Test
+    void acquiryInformationIsAvailabeAfterAcquiring() {
+        descriptor.acquire(context1, handshakeCompleteFuture1);
+
+        DescriptorAcquiry acquiry = descriptor.holder();
+        assertThat(acquiry, is(notNullValue()));
+        assertThat(acquiry.channel(), is(channel1));
+        assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(), 
is(false));
+    }
+
+    @Test
+    void cannotAcquireAcquiredDescriptor() {
+        descriptor.acquire(context1, handshakeCompleteFuture1);
+
+        assertFalse(descriptor.acquire(context2, handshakeCompleteFuture2));
+        assertThat(descriptor.holder().channel(), is(channel1));
+    }
+
+    @Test
+    void releaseMakesDescriptorAvailable() {
+        descriptor.acquire(context1, handshakeCompleteFuture1);
+
+        descriptor.release(context1);
+
+        assertTrue(descriptor.acquire(context1, handshakeCompleteFuture1));
+        DescriptorAcquiry acquiry = descriptor.holder();
+        assertThat(acquiry, is(notNullValue()));
+        assertThat(acquiry.channel(), is(channel1));
+    }
+
+    @Test
+    void releaseRemovesAcquiryInformation() {
+        descriptor.acquire(context1, handshakeCompleteFuture1);
+
+        descriptor.release(context1);
+
+        assertThat(descriptor.holder(), is(nullValue()));
+    }
+
+    @Test
+    void releaseWithAnotherContextHasNoEffect() {
+        descriptor.acquire(context1, handshakeCompleteFuture1);
+
+        descriptor.release(context2);
+
+        DescriptorAcquiry acquiry = descriptor.holder();
+        assertThat(acquiry, is(notNullValue()));
+        assertThat(acquiry.channel(), is(channel1));
+        assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(), 
is(false));
+
+        assertFalse(descriptor.acquire(context2, handshakeCompleteFuture2));
+
+        acquiry = descriptor.holder();
+        assertThat(acquiry, is(notNullValue()));
+        assertThat(acquiry.channel(), is(channel1));
+        assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(), 
is(false));
+    }
+
+    @Test
+    void releaseCompletesClinchReleasedStage() {
+        descriptor.acquire(context1, handshakeCompleteFuture1);
+        CompletionStage<Void> clinchResolved = 
descriptor.holder().clinchResolved();
+
+        descriptor.release(context1);
+
+        assertThat(clinchResolved.toCompletableFuture(), 
is(completedFuture()));
+    }
+}
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
new file mode 100644
index 0000000000..9ad44b62cf
--- /dev/null
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelProgressivePromise;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.NettySender;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.OutNetworkObject;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
+    private static final UUID LOWER_ID = new UUID(1, 1);
+    private static final UUID HIGHER_ID = new UUID(2, 2);
+
+    private static final String CLIENT_CONSISTENT_ID = "client";
+    private static final String SERVER_CONSISTENT_ID = "server";
+
+    private static final short CONNECTION_INDEX = 0;
+
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new 
NetworkMessagesFactory();
+
+    @Mock
+    private Channel channel;
+
+    @Mock
+    private ChannelHandlerContext context;
+
+    @Mock
+    private ChannelCreationListener channelCreationListener;
+
+    @Mock
+    private RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+    @Mock
+    private EventExecutor eventExecutor;
+
+    @Captor
+    private ArgumentCaptor<OutNetworkObject> sentMessageCaptor;
+
+    private final RecoveryDescriptor recoveryDescriptor = new 
RecoveryDescriptor(100);
+
+    @BeforeEach
+    void initMocks() {
+        lenient().when(context.channel()).thenReturn(channel);
+        lenient().when(channel.close()).thenAnswer(invocation -> {
+            recoveryDescriptor.release(context);
+            return mock(ChannelFuture.class);
+        });
+        
lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(anyString(), 
any(), anyShort()))
+                .thenReturn(recoveryDescriptor);
+
+        lenient().when(context.executor()).thenReturn(eventExecutor);
+        lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
+
+        lenient().when(channel.writeAndFlush(any())).then(invocation -> {
+            DefaultChannelProgressivePromise future = new 
DefaultChannelProgressivePromise(channel, eventExecutor);
+            future.setSuccess();
+            return future;
+        });
+    }
+
+    @Test
+    @Timeout(10)
+    void 
terminatesCurrentHandshakeInClinchWhenOngoingHandshakeLosesDueToTieBreaking() {
+        UUID clientLaunchId = LOWER_ID;
+        UUID serverLaunchId = HIGHER_ID;
+
+        RecoveryServerHandshakeManager manager = 
serverHandshakeManager(serverLaunchId);
+        CompletableFuture<NettySender> handshakeFuture = 
manager.localHandshakeFuture();
+
+        recoveryDescriptor.acquire(context, new CompletableFuture<>());
+
+        manager.onMessage(handshakeStartResponseMessageFrom(clientLaunchId));
+
+        verify(channel, never()).close();
+        verify(channel, never()).close(any(ChannelPromise.class));
+
+        HandshakeException ex = assertWillThrowFast(handshakeFuture, 
HandshakeException.class);
+        assertThat(ex.getMessage(), startsWith("Failed to acquire recovery 
descriptor during handshake, it is held by: "));
+
+        verify(channel).writeAndFlush(sentMessageCaptor.capture());
+
+        OutNetworkObject outObject = sentMessageCaptor.getValue();
+        assertThat(outObject.shouldBeSavedForRecovery(), is(false));
+        assertThat(outObject.networkMessage(), 
is(instanceOf(HandshakeRejectedMessage.class)));
+
+        HandshakeRejectedMessage rejectedMessage = (HandshakeRejectedMessage) 
outObject.networkMessage();
+        assertThat(rejectedMessage.reason(), 
is(HandshakeRejectionReason.CLINCH));
+        assertThat(
+                rejectedMessage.message(),
+                startsWith("Handshake clinch detected, this handshake will be 
terminated, winning channel is ")
+        );
+    }
+
+    private RecoveryServerHandshakeManager serverHandshakeManager(UUID 
launchId) {
+        RecoveryServerHandshakeManager manager = new 
RecoveryServerHandshakeManager(
+                launchId,
+                SERVER_CONSISTENT_ID,
+                MESSAGE_FACTORY,
+                recoveryDescriptorProvider,
+                new AllIdsAreFresh(),
+                channelCreationListener,
+                new AtomicBoolean(false)
+        );
+
+        manager.onInit(context);
+
+        return manager;
+    }
+
+    private static HandshakeStartResponseMessage 
handshakeStartResponseMessageFrom(UUID clientLaunchId) {
+        return MESSAGE_FACTORY.handshakeStartResponseMessage()
+                .launchId(clientLaunchId)
+                .consistentId(CLIENT_CONSISTENT_ID)
+                .connectionId(CONNECTION_INDEX)
+                .receivedCount(0)
+                .build();
+    }
+}


Reply via email to