This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c0f92249c2 IGNITE-21470 Wait for sender to appear before applying an ack silencer (#3163) c0f92249c2 is described below commit c0f92249c2d5ac235710520b13bfbb2412c27928 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Tue Feb 6 19:44:29 2024 +0400 IGNITE-21470 Wait for sender to appear before applying an ack silencer (#3163) --- .../internal/network/netty/ItConnectionManagerTest.java | 17 +++++++++++++++++ .../network/netty/OutgoingAcknowledgementSilencer.java | 5 +++++ .../ignite/internal/network/netty/NettySender.java | 8 ++++++++ 3 files changed, 30 insertions(+) diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java index 65122b0481..a921dadfd0 100644 --- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java +++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java @@ -388,6 +388,7 @@ public class ItConnectionManagerTest extends BaseIgniteAbstractTest { ConnectionManagerWrapper manager2 = startManager(4001) ) { NettySender sender = manager1.openChannelTo(manager2).toCompletableFuture().get(10, TimeUnit.SECONDS); + waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2); OutgoingAcknowledgementSilencer ackSilencer = dropAcksFrom(manager2); @@ -402,6 +403,22 @@ public class ItConnectionManagerTest extends BaseIgniteAbstractTest { } } + private static void waitTillChannelAppearsInMapOnAcceptor( + NettySender senderFromOpener, + ConnectionManagerWrapper opener, + ConnectionManagerWrapper acceptor + ) throws InterruptedException { + assertTrue( + waitForCondition( + () -> acceptor.channels().values().stream().anyMatch(acceptorSender + -> acceptorSender.consistentId().equals(opener.connectionManager.consistentId()) + && acceptorSender.channelId() == senderFromOpener.channelId()), + TimeUnit.SECONDS.toMillis(10) + ), + "Did not observe the sender appearing in the acceptor's sender map in time" + ); + } + @Test public void sendFuturesCompleteInSendOrder() throws Exception { try ( diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java index 160a16263e..e18f8458ee 100644 --- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java +++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.network.netty; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertTrue; import io.netty.channel.ChannelHandler.Sharable; @@ -50,6 +53,8 @@ public class OutgoingAcknowledgementSilencer extends ChannelOutboundHandlerAdapt */ public static OutgoingAcknowledgementSilencer installOn(Collection<NettySender> senders) throws InterruptedException { + assertThat(senders, not(empty())); + OutgoingAcknowledgementSilencer ackSilencer = new OutgoingAcknowledgementSilencer(senders.size()); for (NettySender sender : senders) { diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java index 82c570d8d3..7bff673b6e 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java @@ -30,6 +30,8 @@ import org.apache.ignite.internal.network.NettyBootstrapFactory; import org.apache.ignite.internal.network.OutNetworkObject; import org.apache.ignite.internal.network.direct.DirectMessageWriter; import org.apache.ignite.internal.network.recovery.RecoveryDescriptor; +import org.apache.ignite.internal.tostring.IgniteToStringExclude; +import org.apache.ignite.internal.tostring.S; import org.jetbrains.annotations.TestOnly; /** @@ -49,6 +51,7 @@ public class NettySender { private final short channelId; + @IgniteToStringExclude private final RecoveryDescriptor recoveryDescriptor; /** @@ -234,4 +237,9 @@ public class NettySender { public RecoveryDescriptor recoveryDescriptor() { return recoveryDescriptor; } + + @Override + public String toString() { + return S.toString(this); + } }