sashapolo commented on code in PR #3103: URL: https://github.com/apache/ignite-3/pull/3103#discussion_r1467796323
########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java: ########## @@ -214,7 +220,16 @@ private void onHandshakeStartResponseMessage(HandshakeStartResponseMessage messa this.receivedCount = message.receivedCount(); this.remoteChannelId = message.connectionId(); - tryAcquireDescriptorAndFinishHandshake(message); + eventLoopSwitchPropagator.set(false); + + ChannelKey channelKey = new ChannelKey(remoteConsistentId, remoteLaunchId, remoteChannelId); + switchEventLoopIfNeeded(channel, channelKey, () -> { + // Doing this to make sure we have a heppens-before between all writes in old event loop and all reads in the new one. Review Comment: ```suggestion // Doing this to make sure we have a happens-before between all writes in old event loop and all reads in the new one. ``` ########## modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java: ########## @@ -310,7 +311,14 @@ private CompletableFuture<Void> sendViaNetwork( } return connectionManager.channel(consistentId, type, addr) - .thenComposeToCompletable(sender -> sender.send(new OutNetworkObject(message, descriptors))); + .thenComposeToCompletable(sender -> sender.send( + new OutNetworkObject(message, descriptors), + () -> triggerChannelCreation(consistentId, type, addr) + )); + } + + private OrderingFuture<NettySender> triggerChannelCreation(@Nullable String consistentId, ChannelType type, InetSocketAddress addr) { Review Comment: What's the point of having a separate method for this? ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java: ########## @@ -178,6 +184,8 @@ public void onMessage(NetworkMessage message) { } assert recoveryDescriptor != null : "Wrong client handshake flow"; + assert recoveryDescriptor.holderChannel() != null; Review Comment: Won't this be covered by the next assert? ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java: ########## @@ -98,6 +102,8 @@ public class RecoveryServerHandshakeManager implements HandshakeManager { private final BooleanSupplier stopping; + private final AtomicBoolean eventLoopSwitchPropagator = new AtomicBoolean(); Review Comment: Please add a javadoc ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java: ########## @@ -41,31 +48,131 @@ public class NettySender { private final short channelId; + private final RecoveryDescriptor recoveryDescriptor; + /** * Constructor. * * @param channel Netty channel. * @param launchId Launch id of the remote node. * @param consistentId Consistent id of the remote node. * @param channelId channel identifier. + * @param recoveryDescriptor Descriptor corresponding to the current logical connection. */ - public NettySender(Channel channel, String launchId, String consistentId, short channelId) { + public NettySender(Channel channel, String launchId, String consistentId, short channelId, RecoveryDescriptor recoveryDescriptor) { this.channel = channel; this.launchId = launchId; this.consistentId = consistentId; this.channelId = channelId; + this.recoveryDescriptor = recoveryDescriptor; } /** * Sends the message. * + * <p>NB: the future returned by this method might be completed by a channel different from the channel encapsulated + * in the current sender (this might happen if the 'current' channel gets closed and another one is opened + * in the same logical connection and resends a not-yet-acknowledged message sent via the old channel). + * * @param obj Network message wrapper. * @return Future of the send operation (that gets completed when the message gets acknowledged by the receiver). */ + @TestOnly public CompletableFuture<Void> send(OutNetworkObject obj) { + return send(obj, () -> {}); + } + + /** + * Sends the message. + * + * <p>NB: the future returned by this method might be completed by a channel different from the channel encapsulated + * in the current sender (this might happen if the 'current' channel gets closed and another one is opened + * in the same logical connection and resends a not-yet-acknowledged message sent via the old channel). + * + * @param obj Network message wrapper. + * @param triggerChannelRecreation Used to trigger channel recreation (when it turns out that the underlying channel is closed + * and the connection recovery procedure has to be performed). + * @return Future of the send operation (that gets completed when the message gets acknowledged by the receiver). + */ + public CompletableFuture<Void> send(OutNetworkObject obj, Runnable triggerChannelRecreation) { + if (!obj.networkMessage().needAck()) { + // We don't care that the client might get an exception like ClosedChannelException or that the message + // will be lost if the channel is closed as it does not require to be acked. + return toCompletableFuture(channel.writeAndFlush(obj)); + } + + // Write in event loop to make sure that, if a ClosedSocketException happens, we recover from it without existing the event loop. + // We need this to avoid message reordering due to switching from old channel to a new one. + if (channel.eventLoop().inEventLoop()) { + writeWithRecovery(obj, channel, triggerChannelRecreation); + } else { + channel.eventLoop().execute(() -> writeWithRecovery(obj, channel, triggerChannelRecreation)); + } + + return obj.acknowledgedFuture(); + } + + private void chainRecoverSendAfterChannelClosure( + CompletableFuture<Void> writeFuture, + OutNetworkObject obj, + Channel currentChannel, + Runnable triggerChannelRecreation + ) { + if (!completedSuccessfully(writeFuture)) { + writeFuture.whenComplete((res, ex) -> { + if (ex instanceof ClosedChannelException) { + try { + recoverSendAfterChannelClosure(obj, currentChannel, triggerChannelRecreation); + } catch (RuntimeException | AssertionError e) { + LOG.error("An error while sending a message {}", e, obj.networkMessage()); + } + } + }); + } + } + + private static boolean completedSuccessfully(CompletableFuture<Void> writeFuture) { Review Comment: I'm pretty sure I've seen this method already, I suggest extracting it ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.netty; + +import java.util.UUID; +import org.apache.ignite.internal.tostring.S; + +/** Channel key. */ +public class ChannelKey { + /** Remote node's consistent id. */ + private final String consistentId; + + /** Remote node's launch id. */ + private final UUID launchId; + + /** + * Connection id. Every connection between this node and a remote node has a unique connection id, but connections with different nodes + * may have the same ids. + */ + private final short connectionId; + + /** + * Constructor. + */ + public ChannelKey(String consistentId, UUID launchId, short connectionId) { + this.consistentId = consistentId; + this.launchId = launchId; + this.connectionId = connectionId; + } + + UUID launchId() { + return launchId; + } + + /** {@inheritDoc} */ Review Comment: We don't use `inheritDoc` anymore ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java: ########## @@ -63,4 +69,35 @@ static void sendRejectionMessageAndFailHandshake( handshakeFuture.completeExceptionally(exceptionFactory.apply(exceptionText)); }); } + + static void switchEventLoopIfNeeded(Channel channel, ChannelKey channelKey, Runnable afterSwitching) { Review Comment: Please add javadoc, why do we even need to move channels between event loops? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org