This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 1d1313634c IGNITE-21471 Change approach in sendMessagesTwoChannels in DefaultMessagingServiceTest (#3165) 1d1313634c is described below commit 1d1313634c91551770629b5c1158bd74af4465da Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Wed Feb 7 12:01:52 2024 +0400 IGNITE-21471 Change approach in sendMessagesTwoChannels in DefaultMessagingServiceTest (#3165) --- .../network/DefaultMessagingServiceTest.java | 118 ++------------------- 1 file changed, 8 insertions(+), 110 deletions(-) diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java index 2c8f624944..bcd638a0ee 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java @@ -22,8 +22,8 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -42,7 +42,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -54,7 +53,6 @@ import org.apache.ignite.internal.network.messages.InstantContainer; import org.apache.ignite.internal.network.messages.MessageWithInstant; import org.apache.ignite.internal.network.messages.TestMessage; import org.apache.ignite.internal.network.messages.TestMessageImpl; -import org.apache.ignite.internal.network.messages.TestMessageSerializationFactory; import org.apache.ignite.internal.network.messages.TestMessageTypes; import org.apache.ignite.internal.network.messages.TestMessagesFactory; import org.apache.ignite.internal.network.netty.ConnectionManager; @@ -65,10 +63,7 @@ import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider; import org.apache.ignite.internal.network.recovery.StaleIdDetector; import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory; import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry; -import org.apache.ignite.internal.network.serialization.MessageDeserializer; -import org.apache.ignite.internal.network.serialization.MessageSerializationFactory; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; -import org.apache.ignite.internal.network.serialization.MessageSerializer; import org.apache.ignite.internal.network.serialization.SerializationService; import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext; import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller; @@ -168,75 +163,17 @@ class DefaultMessagingServiceTest extends BaseIgniteAbstractTest { } } - @Test - public void sendMessagesOneChannel() throws Exception { - AtomicBoolean release = new AtomicBoolean(false); - MessageSerializer<TestMessage> serializer = new TestMessageSerializationFactory( - new TestMessagesFactory()).createSerializer(); - Serializer longWaitSerializer = new Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE, - (message, writer) -> release.get() - && serializer.writeMessage((TestMessage) message, writer)); - - try (Services senderServices = createMessagingService( - senderNode, - senderNetworkConfig, - () -> {}, - mockSerializationRegistry(longWaitSerializer)); - Services receiverServices = createMessagingService(receiverNode, receiverNetworkConfig) - ) { - try { - CountDownLatch latch = new CountDownLatch(2); - receiverServices.messagingService.addMessageHandler( - TestMessageTypes.class, - (message, sender, correlationId) -> latch.countDown() - ); - - senderServices.messagingService.send(receiverNode, TestMessageImpl.builder().build()); - senderServices.messagingService.send(receiverNode, AllTypesMessageImpl.builder().build()); - - assertThat(latch.getCount(), is(2L)); - release.set(true); - assertTrue(latch.await(1, TimeUnit.SECONDS)); - } finally { - release.set(true); - } - } - } - @Test public void sendMessagesTwoChannels() throws Exception { - AtomicBoolean release = new AtomicBoolean(false); - MessageSerializer<TestMessage> serializer = new TestMessageSerializationFactory( - new TestMessagesFactory()).createSerializer(); - Serializer longWaitSerializer = new Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE, - (message, writer) -> release.get() - && serializer.writeMessage((TestMessage) message, writer)); - - try (Services senderServices = createMessagingService( - senderNode, - senderNetworkConfig, - () -> {}, - mockSerializationRegistry(longWaitSerializer)); + try (Services senderServices = createMessagingService(senderNode, senderNetworkConfig); Services receiverServices = createMessagingService(receiverNode, receiverNetworkConfig) ) { - try { - CountDownLatch latch = new CountDownLatch(2); - receiverServices.messagingService.addMessageHandler( - TestMessageTypes.class, - (message, sender, correlationId) -> latch.countDown() - ); - - senderServices.messagingService.send(receiverNode, TestMessageImpl.builder().build()); - senderServices.messagingService.send(receiverNode, TEST_CHANNEL, AllTypesMessageImpl.builder().build()); - - await().timeout(10, TimeUnit.SECONDS) - .until(() -> latch.getCount() == 1); - - release.set(true); - assertTrue(latch.await(1, TimeUnit.SECONDS)); - } finally { - release.set(true); - } + assertThat(receiverServices.connectionManager.channels(), is(anEmptyMap())); + + senderServices.messagingService.send(receiverNode, TestMessageImpl.builder().build()); + senderServices.messagingService.send(receiverNode, TEST_CHANNEL, AllTypesMessageImpl.builder().build()); + + assertTrue(waitForCondition(() -> receiverServices.connectionManager.channels().size() == 2, 10_000)); } } @@ -385,45 +322,6 @@ class DefaultMessagingServiceTest extends BaseIgniteAbstractTest { } } - private static MessageSerializationRegistry mockSerializationRegistry(Serializer... serializers) { - MessageSerializationRegistry defaultRegistry = defaultSerializationRegistry(); - - return new MessageSerializationRegistry() { - @Override - public MessageSerializationRegistry registerFactory(short groupType, short messageType, - MessageSerializationFactory<?> factory) { - return this; - } - - @Override - public <T extends NetworkMessage> MessageSerializer<T> createSerializer(short groupType, short messageType) { - for (Serializer serializer : serializers) { - if (serializer.groupType == groupType && serializer.messageType == messageType) { - return (MessageSerializer<T>) serializer.serializer; - } - } - return defaultRegistry.createSerializer(groupType, messageType); - } - - @Override - public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short groupType, short messageType) { - return defaultRegistry.createDeserializer(groupType, messageType); - } - }; - } - - private static class Serializer { - private final short groupType; - private final short messageType; - private final MessageSerializer<? extends NetworkMessage> serializer; - - private Serializer(short groupType, short messageType, MessageSerializer<? extends NetworkMessage> serializer) { - this.groupType = groupType; - this.messageType = messageType; - this.serializer = serializer; - } - } - private static void awaitQuietly(CountDownLatch latch) { try { latch.await();