sashapolo commented on code in PR #3103:
URL: https://github.com/apache/ignite-3/pull/3103#discussion_r1467796323


##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -214,7 +220,16 @@ private void 
onHandshakeStartResponseMessage(HandshakeStartResponseMessage messa
         this.receivedCount = message.receivedCount();
         this.remoteChannelId = message.connectionId();
 
-        tryAcquireDescriptorAndFinishHandshake(message);
+        eventLoopSwitchPropagator.set(false);
+
+        ChannelKey channelKey = new ChannelKey(remoteConsistentId, 
remoteLaunchId, remoteChannelId);
+        switchEventLoopIfNeeded(channel, channelKey, () -> {
+            // Doing this to make sure we have a heppens-before between all 
writes in old event loop and all reads in the new one.

Review Comment:
   ```suggestion
               // Doing this to make sure we have a happens-before between all 
writes in old event loop and all reads in the new one.
   ```



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -310,7 +311,14 @@ private CompletableFuture<Void> sendViaNetwork(
         }
 
         return connectionManager.channel(consistentId, type, addr)
-                .thenComposeToCompletable(sender -> sender.send(new 
OutNetworkObject(message, descriptors)));
+                .thenComposeToCompletable(sender -> sender.send(
+                        new OutNetworkObject(message, descriptors),
+                        () -> triggerChannelCreation(consistentId, type, addr)
+                ));
+    }
+
+    private OrderingFuture<NettySender> triggerChannelCreation(@Nullable 
String consistentId, ChannelType type, InetSocketAddress addr) {

Review Comment:
   What's the point of having a separate method for this?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -178,6 +184,8 @@ public void onMessage(NetworkMessage message) {
         }
 
         assert recoveryDescriptor != null : "Wrong client handshake flow";
+        assert recoveryDescriptor.holderChannel() != null;

Review Comment:
   Won't this be covered by the next assert?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -98,6 +102,8 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
 
     private final BooleanSupplier stopping;
 
+    private final AtomicBoolean eventLoopSwitchPropagator = new 
AtomicBoolean();

Review Comment:
   Please add a javadoc



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java:
##########
@@ -41,31 +48,131 @@ public class NettySender {
 
     private final short channelId;
 
+    private final RecoveryDescriptor recoveryDescriptor;
+
     /**
      * Constructor.
      *
      * @param channel Netty channel.
      * @param launchId Launch id of the remote node.
      * @param consistentId Consistent id of the remote node.
      * @param channelId channel identifier.
+     * @param recoveryDescriptor Descriptor corresponding to the current 
logical connection.
      */
-    public NettySender(Channel channel, String launchId, String consistentId, 
short channelId) {
+    public NettySender(Channel channel, String launchId, String consistentId, 
short channelId, RecoveryDescriptor recoveryDescriptor) {
         this.channel = channel;
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.channelId = channelId;
+        this.recoveryDescriptor = recoveryDescriptor;
     }
 
     /**
      * Sends the message.
      *
+     * <p>NB: the future returned by this method might be completed by a 
channel different from the channel encapsulated
+     * in the current sender (this might happen if the 'current' channel gets 
closed and another one is opened
+     * in the same logical connection and resends a not-yet-acknowledged 
message sent via the old channel).
+     *
      * @param obj Network message wrapper.
      * @return Future of the send operation (that gets completed when the 
message gets acknowledged by the receiver).
      */
+    @TestOnly
     public CompletableFuture<Void> send(OutNetworkObject obj) {
+        return send(obj, () -> {});
+    }
+
+    /**
+     * Sends the message.
+     *
+     * <p>NB: the future returned by this method might be completed by a 
channel different from the channel encapsulated
+     * in the current sender (this might happen if the 'current' channel gets 
closed and another one is opened
+     * in the same logical connection and resends a not-yet-acknowledged 
message sent via the old channel).
+     *
+     * @param obj Network message wrapper.
+     * @param triggerChannelRecreation Used to trigger channel recreation 
(when it turns out that the underlying channel is closed
+     *     and the connection recovery procedure has to be performed).
+     * @return Future of the send operation (that gets completed when the 
message gets acknowledged by the receiver).
+     */
+    public CompletableFuture<Void> send(OutNetworkObject obj, Runnable 
triggerChannelRecreation) {
+        if (!obj.networkMessage().needAck()) {
+            // We don't care that the client might get an exception like 
ClosedChannelException or that the message
+            // will be lost if the channel is closed as it does not require to 
be acked.
+            return toCompletableFuture(channel.writeAndFlush(obj));
+        }
+
+        // Write in event loop to make sure that, if a ClosedSocketException 
happens, we recover from it without existing the event loop.
+        // We need this to avoid message reordering due to switching from old 
channel to a new one.
+        if (channel.eventLoop().inEventLoop()) {
+            writeWithRecovery(obj, channel, triggerChannelRecreation);
+        } else {
+            channel.eventLoop().execute(() -> writeWithRecovery(obj, channel, 
triggerChannelRecreation));
+        }
+
+        return obj.acknowledgedFuture();
+    }
+
+    private void chainRecoverSendAfterChannelClosure(
+            CompletableFuture<Void> writeFuture,
+            OutNetworkObject obj,
+            Channel currentChannel,
+            Runnable triggerChannelRecreation
+    ) {
+        if (!completedSuccessfully(writeFuture)) {
+            writeFuture.whenComplete((res, ex) -> {
+                if (ex instanceof ClosedChannelException) {
+                    try {
+                        recoverSendAfterChannelClosure(obj, currentChannel, 
triggerChannelRecreation);
+                    } catch (RuntimeException | AssertionError e) {
+                        LOG.error("An error while sending a message {}", e, 
obj.networkMessage());
+                    }
+                }
+            });
+        }
+    }
+
+    private static boolean completedSuccessfully(CompletableFuture<Void> 
writeFuture) {

Review Comment:
   I'm pretty sure I've seen this method already, I suggest extracting it



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelKey.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
+
+/** Channel key. */
+public class ChannelKey {
+    /** Remote node's consistent id. */
+    private final String consistentId;
+
+    /** Remote node's launch id. */
+    private final UUID launchId;
+
+    /**
+     * Connection id. Every connection between this node and a remote node has 
a unique connection id, but connections with different nodes
+     * may have the same ids.
+     */
+    private final short connectionId;
+
+    /**
+     * Constructor.
+     */
+    public ChannelKey(String consistentId, UUID launchId, short connectionId) {
+        this.consistentId = consistentId;
+        this.launchId = launchId;
+        this.connectionId = connectionId;
+    }
+
+    UUID launchId() {
+        return launchId;
+    }
+
+    /** {@inheritDoc} */

Review Comment:
   We don't use `inheritDoc` anymore



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java:
##########
@@ -63,4 +69,35 @@ static void sendRejectionMessageAndFailHandshake(
             
handshakeFuture.completeExceptionally(exceptionFactory.apply(exceptionText));
         });
     }
+
+    static void switchEventLoopIfNeeded(Channel channel, ChannelKey 
channelKey, Runnable afterSwitching) {

Review Comment:
   Please add javadoc, why do we even need to move channels between event loops?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to