This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b3e9bf163 IGNITE-21493 Apply ack silencer when the channel is open 
(#3196)
4b3e9bf163 is described below

commit 4b3e9bf16316cfeca40cf83a4e748202c26a6382
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Fri Feb 9 19:02:26 2024 +0400

    IGNITE-21493 Apply ack silencer when the channel is open (#3196)
---
 .../network/netty/ItConnectionManagerTest.java       |  2 ++
 .../scalecube/ItScaleCubeNetworkMessagingTest.java   | 20 +++++++++++++++-----
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 0b5254c7fb..c4cd4f751c 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -427,6 +427,7 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
                 ConnectionManagerWrapper manager2 = startManager(4001)
         ) {
             NettySender sender = 
manager1.openChannelTo(manager2).toCompletableFuture().get(10, 
TimeUnit.SECONDS);
+            waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2);
 
             OutgoingAcknowledgementSilencer ackSilencer = 
dropAcksFrom(manager2);
 
@@ -453,6 +454,7 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
                 ConnectionManagerWrapper manager2 = startManager(4001)
         ) {
             NettySender sender = 
manager1.openChannelTo(manager2).toCompletableFuture().get(10, 
TimeUnit.SECONDS);
+            waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2);
 
             dropAcksFrom(manager2);
 
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index cd6ff9ca76..df80f7e3ce 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -959,7 +959,7 @@ class ItScaleCubeNetworkMessagingTest {
 
         // Open a channel to allow a silencer to be installed on it.
         openDefaultChannelBetween(sender, receiver);
-        OutgoingAcknowledgementSilencer ackSilencer = dropAcksFrom(receiver);
+        OutgoingAcknowledgementSilencer ackSilencer = 
dropAcksWhenDefaultChannelOpens(receiver);
 
         CompletableFuture<Void> sendFuture = operation.send(
                 sender.messagingService(),
@@ -996,10 +996,20 @@ class ItScaleCubeNetworkMessagingTest {
         return send(messageFactory.testMessage().build(), sender, receiver);
     }
 
-    private static OutgoingAcknowledgementSilencer dropAcksFrom(ClusterService 
clusterService) throws InterruptedException {
+    private static OutgoingAcknowledgementSilencer 
dropAcksWhenDefaultChannelOpens(ClusterService clusterService)
+            throws InterruptedException {
         DefaultMessagingService messagingService = (DefaultMessagingService) 
clusterService.messagingService();
 
-        return 
OutgoingAcknowledgementSilencer.installOn(messagingService.connectionManager().channels().values());
+        ConnectionManager connectionManager = 
messagingService.connectionManager();
+        assertTrue(
+                waitForCondition(
+                        () -> 
connectionManager.channels().keySet().stream().anyMatch(key -> key.type() == 
ChannelType.DEFAULT),
+                        SECONDS.toMillis(10)
+                ),
+                "Did not see a default channel to be opened on the connection 
manager"
+        );
+
+        return 
OutgoingAcknowledgementSilencer.installOn(connectionManager.channels().values());
     }
 
     private void provokeAckFor(ClusterService sideToGetAck, ClusterService 
sideToSendAck) {
@@ -1019,7 +1029,7 @@ class ItScaleCubeNetworkMessagingTest {
         echoMessagesBackAt(outcast);
 
         openDefaultChannelBetween(notOutcast, outcast);
-        dropAcksFrom(outcast);
+        dropAcksWhenDefaultChannelOpens(outcast);
 
         CompletableFuture<Void> sendFuture = operation.send(
                 notOutcast.messagingService(),
@@ -1045,7 +1055,7 @@ class ItScaleCubeNetworkMessagingTest {
         echoMessagesBackAt(receiver);
 
         openDefaultChannelBetween(sender, receiver);
-        dropAcksFrom(receiver);
+        dropAcksWhenDefaultChannelOpens(receiver);
 
         CompletableFuture<Void> sendFuture = operation.send(
                 sender.messagingService(),

Reply via email to