This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 29dedf63f6 IGNITE-20852 Simultaneous incoming and outgoing connection attempts may cause connection failure (#2850) 29dedf63f6 is described below commit 29dedf63f6699bf9705874de7b9e990b4b0ecddc Author: Sergey Chugunov <sergey.chugu...@gmail.com> AuthorDate: Thu Nov 23 18:17:58 2023 +0400 IGNITE-20852 Simultaneous incoming and outgoing connection attempts may cause connection failure (#2850) * Handshake protocol is extended to allow a node losing a clinch notify its origin * As a result of a handshake, the caller always gets a NettySender, even if the caller lost the clinch * If, during a handshake, a party cannot obtain a lock at its side, it gives the competitor way unconditionally (as the competitor has advanced further) Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../network/netty/ItConnectionManagerTest.java | 41 +--- .../network/handshake/HandshakeManager.java | 18 +- .../internal/network/netty/ConnectionManager.java | 49 +++++ .../internal/network/netty/HandshakeHandler.java | 6 +- .../ignite/internal/network/netty/NettyClient.java | 2 +- .../network/recovery/DescriptorAcquiry.java | 67 +++++++ .../network/recovery/HandshakeTieBreaker.java | 13 +- .../recovery/RecoveryClientHandshakeManager.java | 155 ++++++++++----- .../network/recovery/RecoveryDescriptor.java | 35 +++- .../recovery/RecoveryServerHandshakeManager.java | 127 +++++++++---- .../recovery/message/HandshakeRejectedMessage.java | 20 +- ...dMessage.java => HandshakeRejectionReason.java} | 28 ++- .../ignite/network/DefaultMessagingService.java | 23 +-- .../internal/network/netty/NettyClientTest.java | 8 +- .../internal/network/netty/NettyServerTest.java | 5 +- .../network/netty/RecoveryHandshakeTest.java | 108 +++++++++-- .../network/recovery/DescriptorAcquiryTest.java | 56 ++++++ .../RecoveryClientHandshakeManagerTest.java | 207 +++++++++++++++++++++ .../network/recovery/RecoveryDescriptorTest.java | 141 ++++++++++++++ .../RecoveryServerHandshakeManagerTest.java | 169 +++++++++++++++++ 20 files changed, 1070 insertions(+), 208 deletions(-) diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java index 9d48f98304..399b216e09 100644 --- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java +++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java @@ -70,6 +70,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; /** @@ -321,6 +322,7 @@ public class ItConnectionManagerTest extends BaseIgniteAbstractTest { * @throws Exception If failed. */ @RepeatedTest(100) + @Timeout(10) public void testOneChannelLeftIfConnectToEachOther() throws Exception { try ( ConnectionManagerWrapper manager1 = startManager(4000); @@ -329,42 +331,11 @@ public class ItConnectionManagerTest extends BaseIgniteAbstractTest { CompletableFuture<NettySender> fut1 = manager1.openChannelTo(manager2).toCompletableFuture(); CompletableFuture<NettySender> fut2 = manager2.openChannelTo(manager1).toCompletableFuture(); - NettySender sender1 = null; - NettySender sender2 = null; + NettySender sender1 = fut1.get(1, TimeUnit.SECONDS); + NettySender sender2 = fut2.get(1, TimeUnit.SECONDS); - try { - sender1 = fut1.get(1, TimeUnit.SECONDS); - } catch (Exception ignored) { - // No-op. - } - try { - sender2 = fut2.get(1, TimeUnit.SECONDS); - } catch (Exception ignored) { - // No-op. - } - - NettySender highlander = null; - - assertTrue(sender1 != null || sender2 != null); - - if (sender1 != null && sender1.isOpen()) { - highlander = sender1; - - boolean sender2NullOrClosed = sender2 == null || !sender2.isOpen(); - - assertTrue(sender2NullOrClosed); - } - - if (sender2 != null && sender2.isOpen()) { - highlander = sender2; - - boolean sender1NullOrClosed = sender1 == null || !sender1.isOpen(); - - assertTrue(sender1NullOrClosed); - } - - assertNotNull(highlander); - assertTrue(highlander.isOpen()); + assertTrue(sender1.isOpen()); + assertTrue(sender2.isOpen()); assertTrue( waitForCondition( diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java index 30b3278288..c39930e52c 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.handshake; import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.apache.ignite.internal.network.netty.NettySender; import org.apache.ignite.network.NetworkMessage; @@ -48,9 +49,20 @@ public interface HandshakeManager { void onMessage(NetworkMessage message); /** - * Returns future that represents the handshake operation. + * Returns local future that represents the handshake operation. This is the future that + * gets completed when the handshake itself terminates either successfully or with an exception. + * This is used to complete the current handshake; to get the final outcome of the connection attempt + * please use {@link #finalHandshakeFuture()}. * - * @return Future that represents the handshake operation. + * @return Local future that represents the handshake operation. */ - CompletableFuture<NettySender> handshakeFuture(); + CompletableFuture<NettySender> localHandshakeFuture(); + + /** + * Returns final future that represents the handshake operation. This represents completion of either + * current handshake or the inverse handshake if it wins (and the current one loses). + * + * @return Final future that represents the handshake operation. + */ + CompletionStage<NettySender> finalHandshakeFuture(); } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java index aedaa070ea..6587b91bde 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.network.netty; +import static java.util.function.Function.identity; import static org.apache.ignite.network.ChannelType.getChannel; import io.netty.bootstrap.Bootstrap; @@ -40,6 +41,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.NetworkMessagesFactory; import org.apache.ignite.internal.network.configuration.NetworkView; +import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException; import org.apache.ignite.internal.network.handshake.HandshakeManager; import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager; import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory; @@ -65,6 +67,8 @@ public class ConnectionManager implements ChannelCreationListener { /** Latest version of the direct marshalling protocol. */ public static final byte DIRECT_PROTOCOL_VERSION = 1; + private static final int MAX_RETRIES_TO_OPEN_CHANNEL = 10; + /** Client bootstrap. */ private final Bootstrap clientBootstrap; @@ -223,6 +227,51 @@ public class ConnectionManager implements ChannelCreationListener { * @return Sender. */ public OrderingFuture<NettySender> channel(@Nullable String consistentId, ChannelType type, InetSocketAddress address) { + return getChannelWithRetry(consistentId, type, address, 0); + } + + private OrderingFuture<NettySender> getChannelWithRetry( + @Nullable String consistentId, + ChannelType type, + InetSocketAddress address, + int attempt + ) { + if (attempt > MAX_RETRIES_TO_OPEN_CHANNEL) { + return OrderingFuture.failedFuture(new IllegalStateException("Too many attempts to open channel to " + consistentId)); + } + + return doGetChannel(consistentId, type, address) + .handle((res, ex) -> { + if (ex instanceof ChannelAlreadyExistsException) { + return getChannelWithRetry(((ChannelAlreadyExistsException) ex).consistentId(), type, address, attempt + 1); + } + if (ex != null && ex.getCause() instanceof ChannelAlreadyExistsException) { + return getChannelWithRetry( + ((ChannelAlreadyExistsException) ex.getCause()).consistentId(), + type, + address, + attempt + 1 + ); + } + if (ex != null) { + return OrderingFuture.<NettySender>failedFuture(ex); + } + + assert res != null; + if (res.isOpen()) { + return OrderingFuture.completedFuture(res); + } else { + return getChannelWithRetry(res.consistentId(), type, address, attempt + 1); + } + }) + .thenCompose(identity()); + } + + private OrderingFuture<NettySender> doGetChannel( + @Nullable String consistentId, + ChannelType type, + InetSocketAddress address + ) { // Problem is we can't look up a channel by consistent id because consistent id is not known yet. if (consistentId != null) { // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java index 83b60e4834..de57be659d 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java @@ -75,7 +75,7 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter { throw e; } - manager.handshakeFuture().whenComplete((unused, throwable) -> { + manager.localHandshakeFuture().whenComplete((unused, throwable) -> { if (throwable != null) { LOG.debug("Error when performing handshake", throwable); @@ -97,7 +97,7 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter { public void channelInactive(ChannelHandlerContext ctx) { // If this method is called that means channel has been closed before handshake has finished or handshake // has failed. - manager.handshakeFuture().completeExceptionally( + manager.localHandshakeFuture().completeExceptionally( new HandshakeException("Channel has been closed before handshake has finished or handshake has failed") ); @@ -107,7 +107,7 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter { /** {@inheritDoc} */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - manager.handshakeFuture().completeExceptionally(cause); + manager.localHandshakeFuture().completeExceptionally(cause); } /** diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java index 34dba15646..2748b3f4ea 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java @@ -146,7 +146,7 @@ public class NettyClient { } else if (throwable != null) { return CompletableFuture.<NettySender>failedFuture(throwable); } else { - return handshakeManager.handshakeFuture(); + return handshakeManager.finalHandshakeFuture(); } } }) diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java new file mode 100644 index 0000000000..0b893eb193 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.recovery; + +import io.netty.channel.Channel; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.apache.ignite.internal.network.netty.NettySender; + +/** + * Context around a fact that a {@link RecoveryDescriptor} is acquired by some channel. + */ +class DescriptorAcquiry { + private final Channel channel; + private final CompletableFuture<NettySender> handshakeCompleteFuture; + + private final CompletableFuture<Void> clinchResolved = new CompletableFuture<>(); + + DescriptorAcquiry(Channel channel, CompletableFuture<NettySender> handshakeCompleteFuture) { + this.channel = channel; + this.handshakeCompleteFuture = handshakeCompleteFuture; + } + + /** + * Returns the channel that owns the descriptor. + */ + Channel channel() { + return channel; + } + + /** + * Returns a completion stage that gets completed when a clinch associated with this acquiry is resolved + * (that is, the owning handshake gave up and released the recovery descriptor). + */ + CompletionStage<Void> clinchResolved() { + return clinchResolved; + } + + /** + * Signals that the owner of this recovery descriptor gave up and, hence, the clinch has been resolved. + */ + void markClinchResolved() { + clinchResolved.complete(null); + } + + /** + * Returns the future that gets completed when the handshake performed by the owner of the descriptor completes. + */ + CompletionStage<NettySender> handshakeCompleteFuture() { + return handshakeCompleteFuture; + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java index e3bf6018ef..6210b5793b 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java @@ -20,15 +20,18 @@ package org.apache.ignite.internal.network.recovery; import java.util.UUID; /** - * The HandshakeTieBreaker class provides a mechanism for determining whether an existing channel should be closed during a handshake - * process. + * The HandshakeTieBreaker class provides a mechanism for determining whether an existing channel should be closed in case of a clinch + * during a handshake process. + * + * <p>A clinch is a situation when two parallel handshakes (one from node A to B, another from B to A) acquire locks (now these are + * recovery descriptors) on different sides, then each of them tries to take a lock on the opposite side, which is impossible as + * it's already held by the corresponding competitor. To resolve such a deadlock, one of the handshakes must be terminated. */ class HandshakeTieBreaker { /** * Determines whether an existing channel should be closed based on the comparison of the server's launch id and the client's launch id. - * If the client's launch id is greater than the server's launch id, the existing channel should be closed in favor of the new one. If - * the server's launch id is less than or equal to the client's launch id, the existing channel should be closed in favor of the new - * one. + * If the client's launch id is greater than the server's launch id, the existing channel should be closed in favor of the new one; + * otherwise, the new channel should be closed. * * @param serverLaunchId Server's launch id. * @param clientLaunchId Client's launch id. diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java index 67f03822d8..040e44ccaa 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.network.recovery; import static java.util.Collections.emptyList; -import static org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel; +import static java.util.function.Function.identity; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -41,6 +42,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils; import org.apache.ignite.internal.network.netty.PipelineUtils; import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage; +import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason; import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage; import org.apache.ignite.lang.IgniteException; @@ -74,7 +76,13 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { private final short connectionId; /** Handshake completion future. */ - private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>(); + private final CompletableFuture<NettySender> localHandshakeCompleteFuture = new CompletableFuture<>(); + + /** + * Master future used to complete the handshake either with the results of this handshake of the competing one + * (in the opposite direction), if it wins. + */ + private final CompletableFuture<CompletionStage<NettySender>> masterHandshakeCompleteFuture = new CompletableFuture<>(); /** Remote node's launch id. */ private UUID remoteLaunchId; @@ -120,10 +128,13 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { this.staleIdDetector = staleIdDetector; this.stopping = stopping; - this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> { + localHandshakeCompleteFuture.whenComplete((nettySender, throwable) -> { if (throwable != null) { releaseResources(); + // Complete the master future if it has not yet been completed by the competitor. + masterHandshakeCompleteFuture.complete(localHandshakeCompleteFuture); + return; } @@ -159,22 +170,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { } if (message instanceof HandshakeRejectedMessage) { - HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message; - - boolean ignorable = stopping.get() || !msg.critical(); - - if (ignorable) { - LOG.debug("Handshake rejected by server: {}", msg.reason()); - } else { - LOG.warn("Handshake rejected by server: {}", msg.reason()); - } - - handshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.reason())); - - if (!ignorable) { - // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler - failureHandler.handleFailure(new IgniteException("Handshake rejected by server: " + msg.reason())); - } + onHandshakeRejectedMessage((HandshakeRejectedMessage) message); return; } @@ -237,24 +233,23 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { connectionId ); - while (!descriptor.acquire(ctx)) { - if (shouldCloseChannel(remoteLaunchId, launchId)) { - Channel holderChannel = descriptor.holderChannel(); - - if (holderChannel == null) { - continue; - } + while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) { + // Don't use the tie-braking logic as this handshake attempt is late: the competitor has already acquired + // recovery descriptors on both sides, so this handshake attempt must fail regardless of the Tie Breaker's opinion. + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to acquire recovery descriptor during handshake, it is held by: {}.", descriptor.holderDescription()); + } - holderChannel.close().awaitUninterruptibly(); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("Failed to acquire recovery descriptor during handshake, it is held by: {}", descriptor.holderDescription()); - } + DescriptorAcquiry competitorAcquiry = descriptor.holder(); + if (competitorAcquiry == null) { + continue; + } - handshakeCompleteFuture.completeExceptionally(new ChannelAlreadyExistsException(remoteConsistentId)); + // Complete our master future with the competitor's future. After this our local future has no effect on the final result + // of this handshake. + completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry); - return; - } + return; } this.recoveryDescriptor = descriptor; @@ -262,25 +257,81 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { handshake(this.recoveryDescriptor); } + private void completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry competitorAcquiry) { + masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture()); + localHandshakeCompleteFuture.completeExceptionally( + new HandshakeException("Stepping aside to allow an incoming handshake from " + remoteConsistentId + " finish.") + ); + } + private void handleStaleServerId(HandshakeStartMessage msg) { - String reason = msg.consistentId() + ":" + msg.launchId() + " is stale, server should be restarted so that clients can connect"; + String message = msg.consistentId() + ":" + msg.launchId() + " is stale, server should be restarted so that clients can connect"; HandshakeRejectedMessage rejectionMessage = MESSAGE_FACTORY.handshakeRejectedMessage() - .critical(true) - .reason(reason) + .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name()) + .message(message) .build(); - sendHandshakeRejectedMessage(rejectionMessage, reason); + sendHandshakeRejectedMessage(rejectionMessage, message); } private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) { - String reason = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId + String message = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId + ", but it's stopping"; HandshakeRejectedMessage rejectionMessage = MESSAGE_FACTORY.handshakeRejectedMessage() - .critical(false) - .reason(reason) + .reasonString(HandshakeRejectionReason.STOPPING.name()) + .message(message) .build(); - sendHandshakeRejectedMessage(rejectionMessage, reason); + sendHandshakeRejectedMessage(rejectionMessage, message); + } + + private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) { + boolean ignorable = stopping.get() || !msg.reason().critical(); + + if (ignorable) { + LOG.debug("Handshake rejected by server: {}", msg.message()); + } else { + LOG.warn("Handshake rejected by server: {}", msg.message()); + } + + if (msg.reason() == HandshakeRejectionReason.CLINCH) { + giveUpClinch(); + } else { + localHandshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.message())); + } + + if (!ignorable) { + // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler + failureHandler.handleFailure(new IgniteException("Handshake rejected by server: " + msg.message())); + } + } + + private void giveUpClinch() { + RecoveryDescriptor descriptor = recoveryDescriptorProvider.getRecoveryDescriptor( + remoteConsistentId, + remoteLaunchId, + connectionId + ); + + DescriptorAcquiry myAcquiry = descriptor.holder(); + assert myAcquiry != null; + assert myAcquiry.channel() == ctx.channel() : "Expected the descriptor to be held by current channel " + ctx.channel() + + ", but it's held by another channel " + myAcquiry.channel(); + + descriptor.release(ctx); + + // Complete the future to allow the competitor that should wait on it acquire the descriptor and finish its handshake. + myAcquiry.markClinchResolved(); + + DescriptorAcquiry competitorAcquiry = descriptor.holder(); + if (competitorAcquiry != null) { + // The competitor is available, so just complete our master future with the competitor future. + completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry); + } else { + // The competitor is not at the lock yet. Maybe it will arrive soon, maybe it will never arrive. + // The safest thing is to just retry the whole handshake procedure. + localHandshakeCompleteFuture.completeExceptionally(new ChannelAlreadyExistsException(remoteConsistentId)); + } } private void sendHandshakeRejectedMessage(HandshakeRejectedMessage rejectionMessage, String reason) { @@ -288,19 +339,25 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> { if (throwable != null) { - handshakeCompleteFuture.completeExceptionally( + localHandshakeCompleteFuture.completeExceptionally( new HandshakeException("Failed to send handshake rejected message: " + throwable.getMessage(), throwable) ); } else { - handshakeCompleteFuture.completeExceptionally(new HandshakeException(reason)); + localHandshakeCompleteFuture.completeExceptionally(new HandshakeException(reason)); } }); } /** {@inheritDoc} */ @Override - public CompletableFuture<NettySender> handshakeFuture() { - return handshakeCompleteFuture; + public CompletableFuture<NettySender> localHandshakeFuture() { + return localHandshakeCompleteFuture; + } + + /** {@inheritDoc} */ + @Override + public CompletionStage<NettySender> finalHandshakeFuture() { + return masterHandshakeCompleteFuture.thenCompose(identity()); } private void handshake(RecoveryDescriptor descriptor) { @@ -317,7 +374,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> { if (throwable != null) { - handshakeCompleteFuture.completeExceptionally( + localHandshakeCompleteFuture.completeExceptionally( new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable) ); } @@ -340,6 +397,8 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { // Removes handshake handler from the pipeline as the handshake is finished this.ctx.pipeline().remove(this.handler); - handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, connectionId)); + // Complete the master future with the local future of the current handshake as there was no competitor (or we won the competition). + masterHandshakeCompleteFuture.complete(localHandshakeCompleteFuture); + localHandshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, connectionId)); } } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java index 463f2ed49c..33c03fad2a 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java @@ -23,7 +23,9 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.network.netty.NettySender; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.network.OutNetworkObject; import org.jetbrains.annotations.Nullable; @@ -44,8 +46,8 @@ public class RecoveryDescriptor { /** Count of received messages. */ private long receivedCount; - /** Current owner channel of this descriptor. */ - private final AtomicReference<Channel> channelHolder = new AtomicReference<>(); + /** Some context around current owner channel of this descriptor. */ + private final AtomicReference<DescriptorAcquiry> channelHolder = new AtomicReference<>(); /** * Constructor. @@ -138,22 +140,35 @@ public class RecoveryDescriptor { * @param ctx Channel handler context. */ public void release(ChannelHandlerContext ctx) { - channelHolder.compareAndSet(ctx.channel(), null); + DescriptorAcquiry oldAcquiry = channelHolder.getAndUpdate(acquiry -> { + if (acquiry != null && acquiry.channel() == ctx.channel()) { + return null; + } + + return acquiry; + }); + + if (oldAcquiry != null && oldAcquiry.channel() == ctx.channel()) { + // We have successfully released the descriptor. + // Let's mark the clinch resolved just in case. + oldAcquiry.markClinchResolved(); + } } /** * Acquire this descriptor. * * @param ctx Channel handler context. + * @param handshakeCompleteFuture Future that gets completed when the corresponding handshake completes. */ - public boolean acquire(ChannelHandlerContext ctx) { - return channelHolder.compareAndSet(null, ctx.channel()); + public boolean acquire(ChannelHandlerContext ctx, CompletableFuture<NettySender> handshakeCompleteFuture) { + return channelHolder.compareAndSet(null, new DescriptorAcquiry(ctx.channel(), handshakeCompleteFuture)); } /** - * Returns the channel, that holds this descriptor. + * Returns context around the channel that holds this descriptor. */ - @Nullable Channel holderChannel() { + @Nullable DescriptorAcquiry holder() { return channelHolder.get(); } @@ -161,13 +176,13 @@ public class RecoveryDescriptor { * Returns {@code toString()} representation of a {@link Channel}, that holds this descriptor. */ String holderDescription() { - Channel channel = channelHolder.get(); + DescriptorAcquiry acquiry = channelHolder.get(); - if (channel == null) { + if (acquiry == null) { // This can happen if channel was already closed and it released the descriptor. return "No channel"; } - return channel.toString(); + return acquiry.channel().toString(); } } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java index f65948eed2..36693c9cea 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -40,6 +41,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils; import org.apache.ignite.internal.network.netty.PipelineUtils; import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage; +import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason; import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage; import org.apache.ignite.lang.IgniteException; @@ -52,6 +54,8 @@ import org.apache.ignite.network.OutNetworkObject; public class RecoveryServerHandshakeManager implements HandshakeManager { private static final IgniteLogger LOG = Loggers.forClass(RecoveryServerHandshakeManager.class); + private static final int MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS = 1000; + /** Launch id. */ private final UUID launchId; @@ -180,22 +184,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager { } if (message instanceof HandshakeRejectedMessage) { - HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message; - - boolean ignorable = stopping.get() || !msg.critical(); - - if (ignorable) { - LOG.debug("Handshake rejected by client: {}", msg.reason()); - } else { - LOG.warn("Handshake rejected by client: {}", msg.reason()); - } - - handshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.reason())); - - if (!ignorable) { - // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler - failureHandler.handleFailure(new IgniteException("Handshake rejected by client: " + msg.reason())); - } + onHandshakeRejectedMessage((HandshakeRejectedMessage) message); return; } @@ -232,27 +221,84 @@ public class RecoveryServerHandshakeManager implements HandshakeManager { this.receivedCount = remoteReceivedCount; this.remoteChannelId = remoteChannelId; + tryAcquireDescriptorAndFinishHandshake(); + } + + private void handleStaleClientId(HandshakeStartResponseMessage msg) { + String message = msg.consistentId() + ":" + msg.launchId() + " is stale, client should be restarted to be allowed to connect"; + HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage() + .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name()) + .message(message) + .build(); + + sendHandshakeRejectedMessage(rejectionMessage, message); + } + + private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage msg) { + String message = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId + + ", but it's stopping"; + HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage() + .reasonString(HandshakeRejectionReason.STOPPING.name()) + .message(message) + .build(); + + sendHandshakeRejectedMessage(rejectionMessage, message); + } + + private void tryAcquireDescriptorAndFinishHandshake() { + tryAcquireDescriptorAndFinishHandshake(0); + } + + private void tryAcquireDescriptorAndFinishHandshake(int attempt) { + if (attempt > MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS) { + throw new IllegalStateException("Too many attempts during handshake from " + remoteConsistentId + "(" + remoteLaunchId + + ":" + remoteChannelId + ") via " + ctx.channel()); + } + RecoveryDescriptor descriptor = recoveryDescriptorProvider.getRecoveryDescriptor( this.remoteConsistentId, this.remoteLaunchId, this.remoteChannelId ); - while (!descriptor.acquire(ctx)) { + while (!descriptor.acquire(ctx, handshakeCompleteFuture)) { if (shouldCloseChannel(launchId, remoteLaunchId)) { - Channel holderChannel = descriptor.holderChannel(); + // A competitor is holding the descriptor and we win the clinch; so we need to wait on the 'clinch resolved' future till + // the competitor realises it should terminate (this realization will happen on the other side of the channel), send + // the corresponding message to this node, terminate its handshake and complete the 'clinch resolved' future. + DescriptorAcquiry competitorAcquiry = descriptor.holder(); - if (holderChannel == null) { + if (competitorAcquiry == null) { continue; } - holderChannel.close().awaitUninterruptibly(); + competitorAcquiry.clinchResolved().whenComplete((res, ex) -> { + // The competitor has finished terminating its handshake, it must've already released the descriptor, + // so let's try again. + if (ctx.executor().inEventLoop()) { + tryAcquireDescriptorAndFinishHandshake(attempt + 1); + } else { + ctx.executor().execute(() -> tryAcquireDescriptorAndFinishHandshake(attempt + 1)); + } + }); + + return; } else { - String err = "Failed to acquire recovery descriptor during handshake, it is held by: " + descriptor.holderDescription(); + // A competitor is holding the descriptor and we lose the clinch; so we need to send the correspnding message + // to the other side, where the code handling the message will terminate our handshake and complete the 'clinch resolved' + // future, making it possible for the competitor to proceed and finish the handshake. + String localErrorMessage = "Failed to acquire recovery descriptor during handshake, it is held by: " + + descriptor.holderDescription(); + + LOG.debug(localErrorMessage); - LOG.info(err); + HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage() + .reasonString(HandshakeRejectionReason.CLINCH.name()) + .message("Handshake clinch detected, this handshake will be terminated, winning channel is " + + descriptor.holderDescription()) + .build(); - handshakeCompleteFuture.completeExceptionally(new HandshakeException(err)); + sendHandshakeRejectedMessage(rejectionMessage, localErrorMessage); return; } @@ -263,25 +309,21 @@ public class RecoveryServerHandshakeManager implements HandshakeManager { handshake(descriptor); } - private void handleStaleClientId(HandshakeStartResponseMessage msg) { - String reason = msg.consistentId() + ":" + msg.launchId() + " is stale, client should be restarted to be allowed to connect"; - HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage() - .critical(true) - .reason(reason) - .build(); + private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) { + boolean ignorable = stopping.get() || !msg.reason().critical(); - sendHandshakeRejectedMessage(rejectionMessage, reason); - } + if (ignorable) { + LOG.debug("Handshake rejected by client: {}", msg.message()); + } else { + LOG.warn("Handshake rejected by client: {}", msg.message()); + } - private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage msg) { - String reason = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId - + ", but it's stopping"; - HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage() - .critical(false) - .reason(reason) - .build(); + handshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.message())); - sendHandshakeRejectedMessage(rejectionMessage, reason); + if (!ignorable) { + // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler + failureHandler.handleFailure(new IgniteException("Handshake rejected by client: " + msg.message())); + } } private void sendHandshakeRejectedMessage(HandshakeRejectedMessage rejectionMessage, String reason) { @@ -353,7 +395,12 @@ public class RecoveryServerHandshakeManager implements HandshakeManager { /** {@inheritDoc} */ @Override - public CompletableFuture<NettySender> handshakeFuture() { + public CompletableFuture<NettySender> localHandshakeFuture() { + return handshakeCompleteFuture; + } + + @Override + public CompletionStage<NettySender> finalHandshakeFuture() { return handshakeCompleteFuture; } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java index 9871de3597..e83abeb8ef 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java @@ -24,21 +24,25 @@ import org.apache.ignite.network.annotations.Transferable; /** * Handshake rejected message, contains the reason for a rejection. * This message is sent from a server to a client or wise versa. - * After this message is received it makes no sense to retry connections with same node identity (launch ID must be changed - * to make a retry). */ @Transferable(HANDSHAKE_REJECTED) public interface HandshakeRejectedMessage extends InternalMessage { /** - * Returns rejection reason. + * Returns rejection message. + */ + String message(); + + /** + * Returns rejection reason (this is the name of a member of {@link HandshakeRejectionReason}). * - * @return Reason of the rejection. + * @see HandshakeRejectionReason */ - String reason(); + String reasonString(); /** - * Returns {@code true} iff the rejection is not expected and should be treated as a critical failure (requiring - * the rejected node to restart). + * Returns rejection reason. */ - boolean critical(); + default HandshakeRejectionReason reason() { + return HandshakeRejectionReason.valueOf(reasonString()); + } } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java similarity index 61% copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java index 9871de3597..b70fea80aa 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java @@ -17,28 +17,26 @@ package org.apache.ignite.internal.network.recovery.message; -import static org.apache.ignite.internal.network.NetworkMessageTypes.HANDSHAKE_REJECTED; - -import org.apache.ignite.network.annotations.Transferable; - /** - * Handshake rejected message, contains the reason for a rejection. - * This message is sent from a server to a client or wise versa. - * After this message is received it makes no sense to retry connections with same node identity (launch ID must be changed - * to make a retry). + * Reason for handshake rejection. */ -@Transferable(HANDSHAKE_REJECTED) -public interface HandshakeRejectedMessage extends InternalMessage { +public enum HandshakeRejectionReason { + /** The sender is stopping. */ + STOPPING, /** - * Returns rejection reason. - * - * @return Reason of the rejection. + * The sender has detected that the counterpart launch ID is stale (was earlier used to establish a connection). + * After this is received it makes no sense to retry connections with same node identity (launch ID must be changed + * to make a retry). */ - String reason(); + STALE_LAUNCH_ID, + /** The sender has detected a clinch and decided to terminate this handshake in favor of the competitor. */ + CLINCH; /** * Returns {@code true} iff the rejection is not expected and should be treated as a critical failure (requiring * the rejected node to restart). */ - boolean critical(); + public boolean critical() { + return this == STALE_LAUNCH_ID; + } } diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java index 6fb114ae90..b5f52fa336 100644 --- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java +++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java @@ -28,7 +28,6 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -37,12 +36,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiPredicate; import java.util.function.Function; -import org.apache.ignite.internal.future.OrderingFuture; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.NetworkMessagesFactory; -import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException; import org.apache.ignite.internal.network.message.ClassDescriptorMessage; import org.apache.ignite.internal.network.message.InvokeRequest; import org.apache.ignite.internal.network.message.InvokeResponse; @@ -298,24 +295,8 @@ public class DefaultMessagingService extends AbstractMessagingService { return failedFuture(new IgniteException("Failed to marshal message: " + e.getMessage(), e)); } - OrderingFuture<NettySender> channel = connectionManager.channel(consistentId, type, addr); - - return channel.handle((sender, throwable) -> { - if (throwable != null) { - if (throwable instanceof CompletionException && throwable.getCause() instanceof ChannelAlreadyExistsException) { - ChannelAlreadyExistsException e = (ChannelAlreadyExistsException) throwable.getCause(); - - OrderingFuture<NettySender> channelFut = connectionManager.channel(e.consistentId(), type, addr); - - return channelFut.thenComposeToCompletable(nettySender -> { - return nettySender.send(new OutNetworkObject(message, descriptors)); - }); - } - - throw new CompletionException(throwable); - } - return sender.send(new OutNetworkObject(message, descriptors)); - }).thenComposeToCompletable(Function.identity()); + return connectionManager.channel(consistentId, type, addr) + .thenComposeToCompletable(sender -> sender.send(new OutNetworkObject(message, descriptors))); } private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception { diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java index b407968e19..f1f69b9c4f 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java @@ -30,6 +30,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; @@ -267,10 +268,15 @@ public class NettyClientTest extends BaseIgniteAbstractTest { /** {@inheritDoc} */ @Override - public CompletableFuture<NettySender> handshakeFuture() { + public CompletableFuture<NettySender> localHandshakeFuture() { return CompletableFuture.completedFuture(sender); } + @Override + public CompletionStage<NettySender> finalHandshakeFuture() { + return localHandshakeFuture(); + } + /** {@inheritDoc} */ @Override public void onInit(ChannelHandlerContext ctx) { diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java index bf58a812d6..a8a8431933 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java @@ -149,7 +149,8 @@ public class NettyServerTest extends BaseIgniteAbstractTest { public void testHandshakeManagerInvoked() throws Exception { HandshakeManager handshakeManager = mock(HandshakeManager.class); - when(handshakeManager.handshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class))); + when(handshakeManager.localHandshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class))); + when(handshakeManager.finalHandshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class))); MessageSerializationRegistry registry = mock(MessageSerializationRegistry.class); @@ -218,7 +219,7 @@ public class NettyServerTest extends BaseIgniteAbstractTest { order.verify(handshakeManager, timeout()).onInit(any()); order.verify(handshakeManager, timeout()).onConnectionOpen(); - order.verify(handshakeManager, timeout()).handshakeFuture(); + order.verify(handshakeManager, timeout()).localHandshakeFuture(); order.verify(handshakeManager, timeout()).onMessage(any()); } diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java index 305a43c2b2..4d4f9cc922 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java @@ -54,6 +54,8 @@ import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.OutNetworkObject; import org.apache.ignite.network.serialization.MessageSerializationRegistry; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; /** * Recovery protocol handshake flow test. @@ -62,6 +64,9 @@ public class RecoveryHandshakeTest { /** Connection id. */ private static final short CONNECTION_ID = 1337; + private static final UUID LOWER_UUID = new UUID(100, 200); + private static final UUID HIGHER_UUID = new UUID(300, 400); + /** Serialization registry. */ private static final MessageSerializationRegistry MESSAGE_REGISTRY = defaultSerializationRegistry(); @@ -209,12 +214,12 @@ public class RecoveryHandshakeTest { } @Test - public void testPairedRecoveryDescriptors() throws Exception { + public void testPairedRecoveryDescriptorsClinch() throws Exception { RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider(); RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider(); - UUID node1Uuid = new UUID(100, 200); - UUID node2Uuid = new UUID(300, 400); + UUID node1Uuid = LOWER_UUID; + UUID node2Uuid = HIGHER_UUID; RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery); RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery); @@ -222,14 +227,14 @@ public class RecoveryHandshakeTest { RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery); RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery); - // Channel opened from node1 to node2 - channel 1. - // Channel opened from node2 to node1 - channel 2. + // Channel opened from node1 to node2 is channel 1. + // Channel opened from node2 to node1 is channel 2. // Channel 1. EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener); EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener); - // Channel 1. + // Channel 2. EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener); EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener); @@ -239,7 +244,8 @@ public class RecoveryHandshakeTest { exchangeClientToServer(channel2Dst, channel2Src); exchangeClientToServer(channel1Dst, channel1Src); - // 2 -> 1 is alive, while 1 -> 2 closes because of the tie-breaking. + // 2 -> 1 (Channel 2) is alive, while 1 -> 2 (Channel 1) closes because of the tie-breaking. + exchangeServerToClient(channel1Dst, channel1Src); exchangeServerToClient(channel2Dst, channel2Src); assertFalse(channel1Src.isOpen()); assertFalse(channel1Dst.isOpen()); @@ -253,6 +259,60 @@ public class RecoveryHandshakeTest { assertFalse(channel1Dst.finish()); } + /** + * This tests the following scenario: two handshakes in the opposite directions are started, + * Handshake 1 is faster and it takes both client-side and server-side locks (using recovery descriptors + * as locks), and only then Handshake 2 tries to take the first lock (the one on the client side). + * In such a situation, tie-breaking logic should not be applied (as Handshake 1 could have already + * established, or almost established, a logical connection); instead, Handshake 2 must stop + * itself (regardless of what that the Tie Breaker would prescribe). + */ + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testLateHandshakeDoesNotUseTieBreaker(boolean node1LaunchIdIsLower) throws Exception { + RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider(); + RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider(); + + UUID node1Uuid = node1LaunchIdIsLower ? LOWER_UUID : HIGHER_UUID; + UUID node2Uuid = node1LaunchIdIsLower ? HIGHER_UUID : LOWER_UUID; + + RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery); + RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery); + + RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery); + RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery); + + // Channel opened from node1 to node2 is channel 1. + // Channel opened from node2 to node1 is channel 2. + + // Channel 1. + EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener); + EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener); + + // Channel 2. + EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener); + EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener); + + // Channel 2's handshake acquires both locks. + exchangeServerToClient(channel2Dst, channel2Src); + exchangeClientToServer(channel2Dst, channel2Src); + + // Now Channel 1's handshake cannot acquire even first lock. + exchangeServerToClient(channel1Dst, channel1Src); + + // 2 -> 1 is alive, while 1 -> 2 closes because it is late. + exchangeServerToClient(channel2Dst, channel2Src); + assertFalse(channel1Src.isOpen()); + + assertTrue(channel2Src.isOpen()); + assertTrue(channel2Dst.isOpen()); + + assertFalse(channel1Src.finish()); + assertFalse(channel2Dst.finish()); + assertFalse(channel2Src.finish()); + assertFalse(channel1Dst.finish()); + } + @Test public void testExactlyOnceServer() throws Exception { testExactlyOnce(true); @@ -464,25 +524,41 @@ public class RecoveryHandshakeTest { } private void checkHandshakeNotCompleted(HandshakeManager manager) { - CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture(); - assertFalse(handshakeFuture.isDone()); - assertFalse(handshakeFuture.isCompletedExceptionally()); - assertFalse(handshakeFuture.isCancelled()); + CompletableFuture<NettySender> localHandshakeFuture = manager.localHandshakeFuture(); + assertFalse(localHandshakeFuture.isDone()); + assertFalse(localHandshakeFuture.isCompletedExceptionally()); + assertFalse(localHandshakeFuture.isCancelled()); + + CompletableFuture<NettySender> finalHandshakeFuture = manager.finalHandshakeFuture().toCompletableFuture(); + assertFalse(finalHandshakeFuture.isDone()); + assertFalse(finalHandshakeFuture.isCompletedExceptionally()); + assertFalse(finalHandshakeFuture.isCancelled()); } private void checkHandshakeCompleted(HandshakeManager manager) { - CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture(); - assertTrue(handshakeFuture.isDone()); - assertFalse(handshakeFuture.isCompletedExceptionally()); - assertFalse(handshakeFuture.isCancelled()); + CompletableFuture<NettySender> localHandshakeFuture = manager.localHandshakeFuture(); + assertTrue(localHandshakeFuture.isDone()); + assertFalse(localHandshakeFuture.isCompletedExceptionally()); + assertFalse(localHandshakeFuture.isCancelled()); + + CompletableFuture<NettySender> finalHandshakeFuture = manager.finalHandshakeFuture().toCompletableFuture(); + assertTrue(finalHandshakeFuture.isDone()); + assertFalse(finalHandshakeFuture.isCompletedExceptionally()); + assertFalse(finalHandshakeFuture.isCancelled()); } private void checkHandshakeCompletedExceptionally(HandshakeManager manager) { - CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture(); + CompletableFuture<NettySender> handshakeFuture = manager.localHandshakeFuture(); assertTrue(handshakeFuture.isDone()); assertTrue(handshakeFuture.isCompletedExceptionally()); assertFalse(handshakeFuture.isCancelled()); + + CompletableFuture<NettySender> finalHandshakeFuture = manager.finalHandshakeFuture().toCompletableFuture(); + + assertTrue(finalHandshakeFuture.isDone()); + assertTrue(finalHandshakeFuture.isCompletedExceptionally()); + assertFalse(finalHandshakeFuture.isCancelled()); } private void addUnacknowledgedMessages(RecoveryDescriptor recoveryDescriptor) { diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java new file mode 100644 index 0000000000..349a34957a --- /dev/null +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.recovery; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +import io.netty.channel.Channel; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.network.netty.NettySender; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DescriptorAcquiryTest extends BaseIgniteAbstractTest { + @Mock + private Channel channel; + + private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>(); + + @Test + void clinchResolvedStagedIsInitiallyIncomplete() { + DescriptorAcquiry acquiry = new DescriptorAcquiry(channel, handshakeCompleteFuture); + + assertThat(acquiry.clinchResolved().toCompletableFuture(), is(not(completedFuture()))); + } + + @Test + void clinchGetsResolved() { + DescriptorAcquiry acquiry = new DescriptorAcquiry(channel, handshakeCompleteFuture); + + acquiry.markClinchResolved(); + + assertThat(acquiry.clinchResolved().toCompletableFuture(), is(completedFuture())); + } +} diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java new file mode 100644 index 0000000000..f728b7b94b --- /dev/null +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.recovery; + +import static java.util.UUID.randomUUID; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.util.concurrent.EventExecutor; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.network.NetworkMessagesFactory; +import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException; +import org.apache.ignite.internal.network.handshake.HandshakeException; +import org.apache.ignite.internal.network.netty.ChannelCreationListener; +import org.apache.ignite.internal.network.netty.NettySender; +import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage; +import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason; +import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +@Timeout(10) +class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest { + private static final UUID LOWER_ID = new UUID(1, 1); + private static final UUID HIGHER_ID = new UUID(2, 2); + + private static final String CLIENT_CONSISTENT_ID = "client"; + private static final String SERVER_CONSISTENT_ID = "server"; + + private static final short CONNECTION_INDEX = 0; + + private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory(); + + @Mock + private Channel thisChannel; + @Mock + private Channel competitorChannel; + + @Mock + private ChannelHandlerContext thisContext; + @Mock + private ChannelHandlerContext competitorContext; + + @Mock + private ChannelCreationListener channelCreationListener; + + @Mock + private RecoveryDescriptorProvider recoveryDescriptorProvider; + + @Mock + private EventExecutor eventExecutor; + + @Mock + private NettySender competitorNettySender; + + private final RecoveryDescriptor recoveryDescriptor = new RecoveryDescriptor(100); + + @BeforeEach + void initMocks() { + lenient().when(thisContext.channel()).thenReturn(thisChannel); + lenient().when(competitorContext.channel()).thenReturn(competitorChannel); + + lenient().when(thisContext.executor()).thenReturn(eventExecutor); + lenient().when(eventExecutor.inEventLoop()).thenReturn(true); + + lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(any(), any(), anyShort())) + .thenReturn(recoveryDescriptor); + } + + /** + * This tests the following scenario: two handshakes in the opposite directions are started, + * Handshake 1 is faster and it takes both client-side and server-side locks (using recovery descriptors + * as locks), and only then Handshake 2 tries to take the first lock (the one on the client side). + * In such a situation, tie-breaking logic should not be applied (as Handshake 1 could have already + * established, or almost established, a logical connection); instead, Handshake 2 must stop + * itself (regardless of what that the Tie Breaker would prescribe). + */ + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void terminatesCurrentHandshakeWhenCannotAcquireLockAtClientSide(boolean clientLaunchIdIsLower) { + UUID clientLaunchId = clientLaunchIdIsLower ? LOWER_ID : HIGHER_ID; + UUID serverLaunchId = clientLaunchIdIsLower ? HIGHER_ID : LOWER_ID; + + RecoveryClientHandshakeManager manager = clientHandshakeManager(clientLaunchId); + CompletableFuture<NettySender> localHandshakeFuture = manager.localHandshakeFuture(); + CompletionStage<NettySender> finalHandshakeFuture = manager.finalHandshakeFuture(); + + recoveryDescriptor.acquire(thisContext, completedFuture(competitorNettySender)); + + manager.onMessage(handshakeStartMessageFrom(serverLaunchId)); + + verify(thisChannel, never()).close(); + verify(thisChannel, never()).close(any(ChannelPromise.class)); + + HandshakeException ex = assertWillThrowFast(localHandshakeFuture, HandshakeException.class); + assertThat(ex.getMessage(), is("Stepping aside to allow an incoming handshake from server finish.")); + + assertThat(finalHandshakeFuture.toCompletableFuture(), willCompleteSuccessfully()); + assertThat(finalHandshakeFuture.toCompletableFuture().join(), is(competitorNettySender)); + } + + private RecoveryClientHandshakeManager clientHandshakeManager(UUID launchId) { + RecoveryClientHandshakeManager manager = new RecoveryClientHandshakeManager( + launchId, + CLIENT_CONSISTENT_ID, + CONNECTION_INDEX, + recoveryDescriptorProvider, + new AllIdsAreFresh(), + channelCreationListener, + new AtomicBoolean(false) + ); + + manager.onInit(thisContext); + + return manager; + } + + private static HandshakeStartMessage handshakeStartMessageFrom(UUID serverLaunchId) { + return MESSAGE_FACTORY.handshakeStartMessage() + .launchId(serverLaunchId) + .consistentId(SERVER_CONSISTENT_ID) + .build(); + } + + @Test + void switchesToCompetitorFutureWhenRejectedDueToClinchAndCompetitorIsHere() { + RecoveryClientHandshakeManager manager = clientHandshakeManager(randomUUID()); + CompletableFuture<NettySender> localHandshakeFuture = manager.localHandshakeFuture(); + CompletionStage<NettySender> finalHandshakeFuture = manager.finalHandshakeFuture(); + + recoveryDescriptor.acquire(thisContext, new CompletableFuture<>()); + + DescriptorAcquiry thisAcquiry = recoveryDescriptor.holder(); + assertThat(thisAcquiry, notNullValue()); + thisAcquiry.clinchResolved().whenComplete(((unused, ex) -> { + assertThat(recoveryDescriptor.acquire(competitorContext, completedFuture(competitorNettySender)), is(true)); + })); + + manager.onMessage(handshakeRejectedMessageDueToClinchFrom()); + + HandshakeException ex = assertWillThrowFast(localHandshakeFuture, HandshakeException.class); + assertThat(ex.getMessage(), startsWith("Stepping aside to allow an incoming handshake from ")); + + assertThat(finalHandshakeFuture.toCompletableFuture(), willCompleteSuccessfully()); + assertThat(finalHandshakeFuture.toCompletableFuture().join(), is(competitorNettySender)); + } + + @Test + void finishesWithChannelAlreadyExistsExceptionWhenRejectedDueToClinchAndCompetitorIsNotHere() { + RecoveryClientHandshakeManager manager = clientHandshakeManager(randomUUID()); + CompletableFuture<NettySender> localHandshakeFuture = manager.localHandshakeFuture(); + CompletionStage<NettySender> finalHandshakeFuture = manager.finalHandshakeFuture(); + + recoveryDescriptor.acquire(thisContext, new CompletableFuture<>()); + + manager.onMessage(handshakeRejectedMessageDueToClinchFrom()); + + assertWillThrowFast(localHandshakeFuture, ChannelAlreadyExistsException.class); + assertWillThrowFast(finalHandshakeFuture.toCompletableFuture(), ChannelAlreadyExistsException.class); + } + + private static HandshakeRejectedMessage handshakeRejectedMessageDueToClinchFrom() { + return MESSAGE_FACTORY.handshakeRejectedMessage() + .reasonString(HandshakeRejectionReason.CLINCH.name()) + .message("Rejected") + .build(); + } +} diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java new file mode 100644 index 0000000000..1261b637f0 --- /dev/null +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.recovery; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.apache.ignite.internal.network.netty.NettySender; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class RecoveryDescriptorTest extends BaseIgniteAbstractTest { + private final RecoveryDescriptor descriptor = new RecoveryDescriptor(100); + + @Mock + private Channel channel1; + @Mock + private Channel channel2; + + @Mock + private ChannelHandlerContext context1; + @Mock + private ChannelHandlerContext context2; + + private final CompletableFuture<NettySender> handshakeCompleteFuture1 = new CompletableFuture<>(); + private final CompletableFuture<NettySender> handshakeCompleteFuture2 = new CompletableFuture<>(); + + @BeforeEach + void setupMocks() { + lenient().when(context1.channel()).thenReturn(channel1); + lenient().when(context2.channel()).thenReturn(channel2); + } + + @Test + void acquiresNonAcquiredDescriptor() { + assertTrue(descriptor.acquire(context1, handshakeCompleteFuture1)); + } + + @Test + void acquiryIsAbsentOnNonAcquiredDescriptor() { + assertThat(descriptor.holder(), is(nullValue())); + } + + @Test + void acquiryInformationIsAvailabeAfterAcquiring() { + descriptor.acquire(context1, handshakeCompleteFuture1); + + DescriptorAcquiry acquiry = descriptor.holder(); + assertThat(acquiry, is(notNullValue())); + assertThat(acquiry.channel(), is(channel1)); + assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(), is(false)); + } + + @Test + void cannotAcquireAcquiredDescriptor() { + descriptor.acquire(context1, handshakeCompleteFuture1); + + assertFalse(descriptor.acquire(context2, handshakeCompleteFuture2)); + assertThat(descriptor.holder().channel(), is(channel1)); + } + + @Test + void releaseMakesDescriptorAvailable() { + descriptor.acquire(context1, handshakeCompleteFuture1); + + descriptor.release(context1); + + assertTrue(descriptor.acquire(context1, handshakeCompleteFuture1)); + DescriptorAcquiry acquiry = descriptor.holder(); + assertThat(acquiry, is(notNullValue())); + assertThat(acquiry.channel(), is(channel1)); + } + + @Test + void releaseRemovesAcquiryInformation() { + descriptor.acquire(context1, handshakeCompleteFuture1); + + descriptor.release(context1); + + assertThat(descriptor.holder(), is(nullValue())); + } + + @Test + void releaseWithAnotherContextHasNoEffect() { + descriptor.acquire(context1, handshakeCompleteFuture1); + + descriptor.release(context2); + + DescriptorAcquiry acquiry = descriptor.holder(); + assertThat(acquiry, is(notNullValue())); + assertThat(acquiry.channel(), is(channel1)); + assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(), is(false)); + + assertFalse(descriptor.acquire(context2, handshakeCompleteFuture2)); + + acquiry = descriptor.holder(); + assertThat(acquiry, is(notNullValue())); + assertThat(acquiry.channel(), is(channel1)); + assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(), is(false)); + } + + @Test + void releaseCompletesClinchReleasedStage() { + descriptor.acquire(context1, handshakeCompleteFuture1); + CompletionStage<Void> clinchResolved = descriptor.holder().clinchResolved(); + + descriptor.release(context1); + + assertThat(clinchResolved.toCompletableFuture(), is(completedFuture())); + } +} diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java new file mode 100644 index 0000000000..9ad44b62cf --- /dev/null +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.recovery; + +import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelProgressivePromise; +import io.netty.util.concurrent.EventExecutor; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.network.NetworkMessagesFactory; +import org.apache.ignite.internal.network.handshake.HandshakeException; +import org.apache.ignite.internal.network.netty.ChannelCreationListener; +import org.apache.ignite.internal.network.netty.NettySender; +import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage; +import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason; +import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.network.OutNetworkObject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest { + private static final UUID LOWER_ID = new UUID(1, 1); + private static final UUID HIGHER_ID = new UUID(2, 2); + + private static final String CLIENT_CONSISTENT_ID = "client"; + private static final String SERVER_CONSISTENT_ID = "server"; + + private static final short CONNECTION_INDEX = 0; + + private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory(); + + @Mock + private Channel channel; + + @Mock + private ChannelHandlerContext context; + + @Mock + private ChannelCreationListener channelCreationListener; + + @Mock + private RecoveryDescriptorProvider recoveryDescriptorProvider; + + @Mock + private EventExecutor eventExecutor; + + @Captor + private ArgumentCaptor<OutNetworkObject> sentMessageCaptor; + + private final RecoveryDescriptor recoveryDescriptor = new RecoveryDescriptor(100); + + @BeforeEach + void initMocks() { + lenient().when(context.channel()).thenReturn(channel); + lenient().when(channel.close()).thenAnswer(invocation -> { + recoveryDescriptor.release(context); + return mock(ChannelFuture.class); + }); + lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(anyString(), any(), anyShort())) + .thenReturn(recoveryDescriptor); + + lenient().when(context.executor()).thenReturn(eventExecutor); + lenient().when(eventExecutor.inEventLoop()).thenReturn(true); + + lenient().when(channel.writeAndFlush(any())).then(invocation -> { + DefaultChannelProgressivePromise future = new DefaultChannelProgressivePromise(channel, eventExecutor); + future.setSuccess(); + return future; + }); + } + + @Test + @Timeout(10) + void terminatesCurrentHandshakeInClinchWhenOngoingHandshakeLosesDueToTieBreaking() { + UUID clientLaunchId = LOWER_ID; + UUID serverLaunchId = HIGHER_ID; + + RecoveryServerHandshakeManager manager = serverHandshakeManager(serverLaunchId); + CompletableFuture<NettySender> handshakeFuture = manager.localHandshakeFuture(); + + recoveryDescriptor.acquire(context, new CompletableFuture<>()); + + manager.onMessage(handshakeStartResponseMessageFrom(clientLaunchId)); + + verify(channel, never()).close(); + verify(channel, never()).close(any(ChannelPromise.class)); + + HandshakeException ex = assertWillThrowFast(handshakeFuture, HandshakeException.class); + assertThat(ex.getMessage(), startsWith("Failed to acquire recovery descriptor during handshake, it is held by: ")); + + verify(channel).writeAndFlush(sentMessageCaptor.capture()); + + OutNetworkObject outObject = sentMessageCaptor.getValue(); + assertThat(outObject.shouldBeSavedForRecovery(), is(false)); + assertThat(outObject.networkMessage(), is(instanceOf(HandshakeRejectedMessage.class))); + + HandshakeRejectedMessage rejectedMessage = (HandshakeRejectedMessage) outObject.networkMessage(); + assertThat(rejectedMessage.reason(), is(HandshakeRejectionReason.CLINCH)); + assertThat( + rejectedMessage.message(), + startsWith("Handshake clinch detected, this handshake will be terminated, winning channel is ") + ); + } + + private RecoveryServerHandshakeManager serverHandshakeManager(UUID launchId) { + RecoveryServerHandshakeManager manager = new RecoveryServerHandshakeManager( + launchId, + SERVER_CONSISTENT_ID, + MESSAGE_FACTORY, + recoveryDescriptorProvider, + new AllIdsAreFresh(), + channelCreationListener, + new AtomicBoolean(false) + ); + + manager.onInit(context); + + return manager; + } + + private static HandshakeStartResponseMessage handshakeStartResponseMessageFrom(UUID clientLaunchId) { + return MESSAGE_FACTORY.handshakeStartResponseMessage() + .launchId(clientLaunchId) + .consistentId(CLIENT_CONSISTENT_ID) + .connectionId(CONNECTION_INDEX) + .receivedCount(0) + .build(); + } +}