This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a8605bd95ba IGNITE-28440 Use message serializer for test discovery
messages (#12974)
a8605bd95ba is described below
commit a8605bd95bab4e4e423ded2b267adc843ddc5e7c
Author: Nikita Amelchev <[email protected]>
AuthorDate: Tue Apr 7 10:31:08 2026 +0300
IGNITE-28440 Use message serializer for test discovery messages (#12974)
---
.../plugin/extensions/communication/Message.java | 11 +--
.../tcp/messages/HandshakeWaitMessage.java | 1 -
.../metric/OutboundIoMessageQueueSizeTest.java | 34 ++------
.../snapshot/AbstractSnapshotSelfTest.java | 3 +-
.../snapshot/IgniteClusterSnapshotHandlerTest.java | 10 ++-
.../NodeSecurityContextPropagationTest.java | 29 +------
.../security/TestDiscoveryAcknowledgeMessage.java} | 26 +++---
.../security/TestDiscoveryMessage.java} | 26 +++---
.../DistributedProcessClientAwaitTest.java | 6 +-
.../DistributedProcessCoordinatorLeftTest.java | 5 +-
.../util/distributed/MessagesPluginProvider.java | 64 ---------------
.../util/distributed/TestIntegerMessage.java | 1 -
.../apache/ignite/spi/MessagesPluginProvider.java | 92 ++++++++++++++++++++++
.../FilterDataForClientNodeDiscoveryTest.java | 40 ++--------
.../discovery/MessageForServer.java} | 27 ++++---
.../tcp/DummyCustomDiscoveryMessage.java} | 28 ++++---
.../TcpDiscoveryPendingMessageDeliveryTest.java | 56 ++++---------
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 38 +++++++++
18 files changed, 232 insertions(+), 265 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
index 614418b2e36..ee48f1abdc0 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
@@ -58,11 +58,6 @@ public interface Message {
throw new UnsupportedOperationException();
}
- /**
- * Gets message type.
- *
- * @return Message type.
- */
/**
* Gets message type.
*
@@ -72,9 +67,8 @@ public interface Message {
var clazz = getClass();
Short type = REGISTRATIONS.get(clazz);
- if (type == null) {
+ if (type == null)
throw new IgniteException("No registration for class " +
clazz.getSimpleName());
- }
return type;
}
@@ -91,8 +85,7 @@ public interface Message {
var clazz = getClass();
var type = REGISTRATIONS.putIfAbsent(clazz, directType);
- if ((type != null) && (type != directType)) {
+ if ((type != null) && (type != directType))
throw new IgniteException(clazz.getSimpleName() + " is already
registered for direct type " + type);
- }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
index 84cc53e4335..e8cb7db6540 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java
@@ -31,5 +31,4 @@ public class HandshakeWaitMessage implements Message {
@Override public String toString() {
return S.toString(HandshakeWaitMessage.class, this);
}
-
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
index c7d11d53ae7..c3cbdae64e9 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java
@@ -23,18 +23,17 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.DummyCustomDiscoveryMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
@@ -58,6 +57,8 @@ public class OutboundIoMessageQueueSizeTest extends
GridCommonAbstractTest {
cfg.setDiscoverySpi(new
BlockTcpDiscoverySpi().setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
cfg.setGridLogger(log);
+ cfg.setPluginProviders(new
MessagesPluginProvider(DummyCustomDiscoveryMessage.class));
+
return cfg;
}
@@ -124,7 +125,7 @@ public class OutboundIoMessageQueueSizeTest extends
GridCommonAbstractTest {
metric.reset(); // Reset value accumulated before discovery SPI
startup.
- srv0.context().discovery().sendCustomEvent(new
DummyCustomDiscoveryMessage(IgniteUuid.randomUuid()));
+ srv0.context().discovery().sendCustomEvent(new
DummyCustomDiscoveryMessage());
// Assume our message can be added to queue concurrently with other
messages
// (for example, with metrics update message).
@@ -142,7 +143,7 @@ public class OutboundIoMessageQueueSizeTest extends
GridCommonAbstractTest {
try {
for (int i = 0; i <= MSG_LIMIT; i++)
- srv0.context().discovery().sendCustomEvent(new
DummyCustomDiscoveryMessage(IgniteUuid.randomUuid()));
+ srv0.context().discovery().sendCustomEvent(new
DummyCustomDiscoveryMessage());
assertTrue(metric.value() >= MSG_LIMIT);
}
@@ -150,27 +151,4 @@ public class OutboundIoMessageQueueSizeTest extends
GridCommonAbstractTest {
latch.countDown();
}
}
-
- /** */
- private static class DummyCustomDiscoveryMessage implements
DiscoveryCustomMessage {
- /** */
- private final IgniteUuid id;
-
- /**
- * @param id Message id.
- */
- DummyCustomDiscoveryMessage(IgniteUuid id) {
- this.id = id;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return null;
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 8b8e91e3fba..19b0a4a3f75 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -99,6 +99,7 @@ import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -911,7 +912,7 @@ public abstract class AbstractSnapshotSelfTest extends
GridCommonAbstractTest {
}
/** */
- protected static class BlockingCustomMessageDiscoverySpi extends
TcpDiscoverySpi {
+ protected static class BlockingCustomMessageDiscoverySpi extends
TestTcpDiscoverySpi {
/** List of messages which have been blocked. */
private final List<DiscoveryCustomMessage> blocked = new
CopyOnWriteArrayList<>();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
index cd54f29839d..7b679de89ad 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
@@ -36,8 +36,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
-import org.apache.ignite.internal.util.distributed.MessagesPluginProvider;
-import
org.apache.ignite.internal.util.distributed.MessagesPluginProvider.MessagesInjectedTcpDiscoverySpi;
+import org.apache.ignite.internal.util.distributed.TestIntegerMessage;
import org.apache.ignite.internal.util.distributed.TestUuidMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -48,6 +47,7 @@ import org.apache.ignite.plugin.PluginConfiguration;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -76,8 +76,10 @@ public class IgniteClusterSnapshotHandlerTest extends
IgniteClusterSnapshotResto
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
- .setPluginProviders(pluginProvider, new MessagesPluginProvider())
- .setDiscoverySpi(new MessagesInjectedTcpDiscoverySpi());
+ .setPluginProviders(
+ pluginProvider,
+ new MessagesPluginProvider(TestIntegerMessage.class,
TestUuidMessage.class)
+ );
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
index 43cb4a6dcae..a900dc423b2 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
@@ -34,10 +34,9 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -87,6 +86,8 @@ public class NodeSecurityContextPropagationTest extends
GridCommonAbstractTest {
.setIpFinder(new TcpDiscoveryVmIpFinder()
.setAddresses(Collections.singleton("127.0.0.1:47500")));
+ cfg.setPluginProviders(new
MessagesPluginProvider(TestDiscoveryMessage.class,
TestDiscoveryAcknowledgeMessage.class));
+
return cfg;
}
@@ -218,30 +219,6 @@ public class NodeSecurityContextPropagationTest extends
GridCommonAbstractTest {
return U.field(impl, "msgWorker");
}
- /** */
- public static class TestDiscoveryMessage extends
AbstractTestDiscoveryMessage {
- /** {@inheritDoc} */
- @Override public @Nullable DiscoveryCustomMessage ackMessage() {
- return new TestDiscoveryAcknowledgeMessage();
- }
- }
-
- /** */
- public static class TestDiscoveryAcknowledgeMessage extends
AbstractTestDiscoveryMessage { }
-
- /** */
- public abstract static class AbstractTestDiscoveryMessage implements
DiscoveryCustomMessage {
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return IgniteUuid.randomUuid();
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable DiscoveryCustomMessage ackMessage() {
- return null;
- }
- }
-
/** */
public static class BlockingDequeWrapper<T> implements BlockingDeque<T> {
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryAcknowledgeMessage.java
similarity index 62%
copy from
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
copy to
modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryAcknowledgeMessage.java
index 6f148b3c98f..7c8a5f02a04 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryAcknowledgeMessage.java
@@ -15,31 +15,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util.distributed;
+package org.apache.ignite.internal.processors.security;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.jetbrains.annotations.Nullable;
/** */
-public class TestIntegerMessage implements Message {
+public class TestDiscoveryAcknowledgeMessage implements
DiscoveryCustomMessage, Message {
/** */
@Order(0)
- int val;
+ IgniteUuid id = IgniteUuid.randomUuid();
- /** Default constructor for {@link MessageFactory}. */
- public TestIntegerMessage() {
+ /** Constructor for {@link MessageFactory}. */
+ public TestDiscoveryAcknowledgeMessage() {
// No-op.
}
- /** */
- public TestIntegerMessage(int val) {
- this.val = val;
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
}
- /** */
- public int value() {
- return val;
+ /** {@inheritDoc} */
+ @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+ return null;
}
-
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryMessage.java
similarity index 61%
copy from
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
copy to
modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryMessage.java
index 6f148b3c98f..b80a6a01e65 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/TestDiscoveryMessage.java
@@ -15,31 +15,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util.distributed;
+package org.apache.ignite.internal.processors.security;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.jetbrains.annotations.Nullable;
/** */
-public class TestIntegerMessage implements Message {
+public class TestDiscoveryMessage implements DiscoveryCustomMessage, Message {
/** */
@Order(0)
- int val;
+ IgniteUuid id = IgniteUuid.randomUuid();
- /** Default constructor for {@link MessageFactory}. */
- public TestIntegerMessage() {
+ /** Constructor for {@link MessageFactory}. */
+ public TestDiscoveryMessage() {
// No-op.
}
- /** */
- public TestIntegerMessage(int val) {
- this.val = val;
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
}
- /** */
- public int value() {
- return val;
+ /** {@inheritDoc} */
+ @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+ return new TestDiscoveryAcknowledgeMessage();
}
-
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
index 5fc16e6534b..bfbc4128acc 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
@@ -28,10 +28,10 @@ import java.util.function.BiFunction;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import
org.apache.ignite.internal.util.distributed.MessagesPluginProvider.MessagesInjectedTcpDiscoverySpi;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -70,9 +70,7 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(instanceName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-
- cfg.setPluginProviders(new MessagesPluginProvider());
- cfg.setDiscoverySpi(new MessagesInjectedTcpDiscoverySpi());
+ cfg.setPluginProviders(new
MessagesPluginProvider(TestIntegerMessage.class, TestUuidMessage.class));
return cfg;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
index e907d130bc0..19d3c1621cb 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
@@ -28,8 +28,8 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import
org.apache.ignite.internal.util.distributed.MessagesPluginProvider.MessagesInjectedTcpDiscoverySpi;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -86,8 +86,7 @@ public class DistributedProcessCoordinatorLeftTest extends
GridCommonAbstractTes
}
});
- cfg.setPluginProviders(new MessagesPluginProvider());
- cfg.setDiscoverySpi(new MessagesInjectedTcpDiscoverySpi());
+ cfg.setPluginProviders(new
MessagesPluginProvider(TestIntegerMessage.class, TestUuidMessage.class));
return cfg;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/MessagesPluginProvider.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/MessagesPluginProvider.java
deleted file mode 100644
index 7e5b53492dc..00000000000
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/MessagesPluginProvider.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.distributed;
-
-import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
-import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
-import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.testframework.GridTestUtils;
-
-import static org.apache.ignite.marshaller.Marshallers.jdk;
-import static org.junit.Assert.assertTrue;
-
-/** */
-public class MessagesPluginProvider extends AbstractTestPluginProvider {
- /** */
- private static final MessageFactoryProvider FACTORY_PROVIDER = f -> {
- f.register(10_000, TestIntegerMessage::new, new
TestIntegerMessageSerializer());
- f.register(10_001, TestUuidMessage::new, new
TestUuidMessageSerializer());
- };
-
- /** {@inheritDoc} */
- @Override public String name() {
- return "Distributed process test messages plugin";
- }
-
- /** {@inheritDoc} */
- @Override public void initExtensions(PluginContext ctx, ExtensionRegistry
registry) {
- registry.registerExtension(MessageFactoryProvider.class,
FACTORY_PROVIDER);
-
- // Register messages into the DiscoverySpi.
- assertTrue(ctx.igniteConfiguration().getDiscoverySpi() instanceof
MessagesInjectedTcpDiscoverySpi);
- }
-
- /** */
- public static class MessagesInjectedTcpDiscoverySpi extends
TcpDiscoverySpi {
- /** {@inheritDoc} */
- @Override protected void initLocalNode(int srvPort, boolean
addExtAddrAttr) {
- GridTestUtils.setFieldValue(this, TcpDiscoverySpi.class,
"msgFactory", new IgniteMessageFactoryImpl(
- new MessageFactoryProvider[] { new
DiscoveryMessageFactory(jdk(), U.gridClassLoader()), FACTORY_PROVIDER}));
-
- super.initLocalNode(srvPort, addExtAddrAttr);
- }
- }
-}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
index 6f148b3c98f..1fd753d6bb7 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
@@ -41,5 +41,4 @@ public class TestIntegerMessage implements Message {
public int value() {
return val;
}
-
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
new file mode 100644
index 00000000000..5134383656e
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+
+/**
+ * Plugin provider for registering test messages in the communication and
discovery protocols.
+ */
+public class MessagesPluginProvider extends AbstractTestPluginProvider {
+ /** */
+ private final MessageFactoryProvider msgFactoryProvider;
+
+ /** */
+ @SafeVarargs
+ public MessagesPluginProvider(Class<? extends Message>... msgs) {
+ msgFactoryProvider = f -> {
+ short directType = 10_000;
+
+ for (Class<? extends Message> msg : msgs) {
+ Supplier<Message> msgSupp = () -> {
+ try {
+ return U.newInstance(msg);
+ }
+ catch (IgniteCheckedException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ f.register(directType, msgSupp, loadSerializer(msg));
+
+ directType++;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "Test messages plugin";
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx, ExtensionRegistry
registry) {
+ // Register messages into the communication protocol.
+ registry.registerExtension(MessageFactoryProvider.class,
msgFactoryProvider);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(PluginContext ctx) throws
IgniteCheckedException {
+ // Register messages into the discovery protocol.
+ TestTcpDiscoverySpi discoSpi =
(TestTcpDiscoverySpi)ctx.igniteConfiguration().getDiscoverySpi();
+
+ discoSpi.messageFactory(msgFactoryProvider);
+ }
+
+ /** */
+ private MessageSerializer<? extends Message> loadSerializer(Class<?
extends Message> msgCls) {
+ try {
+ Class<?> serCls = U.gridClassLoader()
+ .loadClass(msgCls.getPackage().getName() + "." +
msgCls.getSimpleName() + "Serializer");
+
+ return (MessageSerializer<? extends Message>)U.newInstance(serCls);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unable to find serializer for message:
" + msgCls, e);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
index a4025d614a1..c222840f15c 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
@@ -21,13 +21,11 @@ import java.util.Arrays;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
@@ -118,24 +116,21 @@ public class FilterDataForClientNodeDiscoveryTest extends
GridCommonAbstractTest
cfg.setDiscoverySpi(testSpi);
+ cfg.setPluginProviders(new
MessagesPluginProvider(MessageForServer.class));
+
return cfg;
}
- /**
- *
- */
- private class TestDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private class TestDiscoverySpi extends TestTcpDiscoverySpi {
/** Test exchange. */
private TestDiscoveryDataExchange testEx = new
TestDiscoveryDataExchange();
- /**
- *
- */
+ /** */
public TestDiscoverySpi() {
exchange = testEx;
}
-
/** {@inheritDoc} */
@Override public void setDataExchange(DiscoverySpiDataExchange
exchange) {
testEx.setExchange(exchange);
@@ -171,25 +166,4 @@ public class FilterDataForClientNodeDiscoveryTest extends
GridCommonAbstractTest
this.ex = ex;
}
}
-
- /**
- *
- */
- private static class MessageForServer implements
DiscoveryServerOnlyCustomMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteUuid id = IgniteUuid.randomUuid();
-
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return null;
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/MessageForServer.java
similarity index 60%
copy from
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
copy to
modules/core/src/test/java/org/apache/ignite/spi/discovery/MessageForServer.java
index 6f148b3c98f..2495290d165 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/MessageForServer.java
@@ -15,31 +15,34 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util.distributed;
+package org.apache.ignite.spi.discovery;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.jetbrains.annotations.Nullable;
/** */
-public class TestIntegerMessage implements Message {
+public class MessageForServer implements DiscoveryServerOnlyCustomMessage,
Message {
/** */
@Order(0)
- int val;
+ IgniteUuid id = IgniteUuid.randomUuid();
- /** Default constructor for {@link MessageFactory}. */
- public TestIntegerMessage() {
+ /** Constructor for {@link MessageFactory}. */
+ public MessageForServer() {
// No-op.
}
- /** */
- public TestIntegerMessage(int val) {
- this.val = val;
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
}
- /** */
- public int value() {
- return val;
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
}
-
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DummyCustomDiscoveryMessage.java
similarity index 57%
copy from
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
copy to
modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DummyCustomDiscoveryMessage.java
index 6f148b3c98f..8eeae66fac4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/TestIntegerMessage.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DummyCustomDiscoveryMessage.java
@@ -15,31 +15,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util.distributed;
+package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.jetbrains.annotations.Nullable;
/** */
-public class TestIntegerMessage implements Message {
+public class DummyCustomDiscoveryMessage implements DiscoveryCustomMessage,
Message {
/** */
@Order(0)
- int val;
+ IgniteUuid id = IgniteUuid.randomUuid();
- /** Default constructor for {@link MessageFactory}. */
- public TestIntegerMessage() {
+ /** Constructor for {@link DiscoveryMessageFactory}. */
+ public DummyCustomDiscoveryMessage() {
// No-op.
}
- /** */
- public TestIntegerMessage(int val) {
- this.val = val;
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
}
- /** */
- public int value() {
- return val;
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
}
-
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index 30f7ad478f4..ff37efc0b66 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -24,13 +24,11 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.MessagesPluginProvider;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
@@ -67,11 +65,13 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
else if (igniteInstanceName.startsWith("receiver"))
disco = new DyingThreadDiscoverySpi();
else
- disco = new TcpDiscoverySpi();
+ disco = new TestTcpDiscoverySpi();
disco.setIpFinder(sharedStaticIpFinder);
cfg.setDiscoverySpi(disco);
+ cfg.setPluginProviders(new
MessagesPluginProvider(DummyCustomDiscoveryMessage.class));
+
return cfg;
}
@@ -103,7 +103,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
receivedEnsuredMsgs.clear();
// Initial custom message will travel across the ring and will be
discarded.
- sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+ sendDummyCustomMessage(coordDisco);
assertTrue("Sent: " + sentEnsuredMsgs + "; received: " +
receivedEnsuredMsgs,
GridTestUtils.waitForCondition(() -> {
@@ -120,7 +120,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
int msgsNum = 2000;
for (int i = 0; i < msgsNum; i++)
- sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+ sendDummyCustomMessage(coordDisco);
mediator.close();
victim.close();
@@ -149,7 +149,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
});
// Custom message on a singleton cluster shouldn't break consistency
of PendingMessages.
- sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+ sendDummyCustomMessage(coordDisco);
// Victim doesn't send acknowledges, so we need an intermediate node
to accept messages,
// so the coordinator could mark them as pending.
@@ -172,7 +172,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
int msgsNum = 100;
for (int i = 0; i < msgsNum; i++)
- sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+ sendDummyCustomMessage(coordDisco);
mediator.close();
victim.close();
@@ -229,18 +229,15 @@ public class TcpDiscoveryPendingMessageDeliveryTest
extends GridCommonAbstractTe
assertTrue("Sent: " + sentEnsuredMsgs + "; received: " +
receivedEnsuredMsgs, delivered);
}
- /**
- * @param disco Discovery SPI.
- * @param id Message id.
- */
- private void sendDummyCustomMessage(TcpDiscoverySpi disco, IgniteUuid id) {
- disco.sendCustomEvent(new DummyCustomDiscoveryMessage(id));
+ /** @param disco Discovery SPI. */
+ private void sendDummyCustomMessage(TcpDiscoverySpi disco) {
+ disco.sendCustomEvent(new DummyCustomDiscoveryMessage());
}
/**
* Discovery SPI, that makes a thread to die when {@code blockMsgs} is set
to {@code true}.
*/
- private class DyingThreadDiscoverySpi extends TcpDiscoverySpi {
+ private class DyingThreadDiscoverySpi extends TestTcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (blockMsgs)
@@ -251,7 +248,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
/**
* Discovery SPI, that makes a node stop sending messages when {@code
blockMsgs} is set to {@code true}.
*/
- private class DyingDiscoverySpi extends TcpDiscoverySpi {
+ private class DyingDiscoverySpi extends TestTcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void writeToSocket(
Socket sock,
@@ -285,36 +282,11 @@ public class TcpDiscoveryPendingMessageDeliveryTest
extends GridCommonAbstractTe
/**
*
*/
- private class ListeningDiscoverySpi extends TcpDiscoverySpi {
+ private class ListeningDiscoverySpi extends TestTcpDiscoverySpi {
/** {@inheritDoc} */
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (ensured(msg))
receivedEnsuredMsgs.add(msg);
}
}
-
- /**
- *
- */
- private static class DummyCustomDiscoveryMessage implements
DiscoveryCustomMessage {
- /** */
- private final IgniteUuid id;
-
- /**
- * @param id Message id.
- */
- DummyCustomDiscoveryMessage(IgniteUuid id) {
- this.id = id;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return null;
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index e3038bf5dc6..859c0979beb 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -20,16 +20,23 @@ package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.marshaller.Marshallers.jdk;
import static
org.apache.ignite.testframework.GridTestUtils.DiscoverySpiListenerWrapper.wrap;
/**
@@ -45,6 +52,9 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
/** */
private IgniteDiscoverySpiInternalListener internalLsnr;
+ /** */
+ private MessageFactory msgFactory;
+
/** {@inheritDoc} */
@Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
IgniteCheckedException {
@@ -100,4 +110,32 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
this.discoHook = discoHook;
}
+
+ /**
+ * Sets test discovery messages factory provider. Note that {@link
MessageFactoryProvider} must be set before SPI start.
+ * Otherwise, this method call will take no effect.
+ *
+ * @param msgFactoryProvider Discovery messages factory provider.
+ */
+ public void messageFactory(MessageFactoryProvider msgFactoryProvider) {
+ assert !started();
+
+ this.msgFactory = new IgniteMessageFactoryImpl(new
MessageFactoryProvider[] {
+ new DiscoveryMessageFactory(jdk(),
U.resolveClassLoader(ignite().configuration())),
+ msgFactoryProvider
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void initLocalNode(int srvPort, boolean
addExtAddrAttr) {
+ if (msgFactory != null)
+ GridTestUtils.setFieldValue(this, TcpDiscoverySpi.class,
"msgFactory", msgFactory);
+
+ super.initLocalNode(srvPort, addExtAddrAttr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessageFactory messageFactory() {
+ return msgFactory != null ? msgFactory : super.messageFactory();
+ }
}