This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 222a85b111e IGNITE-27776 Simplify custom messages hierarchy (#12708)
222a85b111e is described below
commit 222a85b111ef7a1acd2ab89386dd1d2baf647f45
Author: Ilya Shishkov <[email protected]>
AuthorDate: Tue Feb 24 22:11:02 2026 +0300
IGNITE-27776 Simplify custom messages hierarchy (#12708)
---
.../managers/discovery/CustomMessageWrapper.java | 68 ----------------------
.../managers/discovery/DiscoveryCustomMessage.java | 3 -
.../managers/discovery/GridDiscoveryManager.java | 23 ++++----
.../SecurityAwareCustomMessageWrapper.java | 31 ++++++++--
.../wal/reader/StandaloneNoopDiscoverySpi.java | 4 +-
.../apache/ignite/internal/util/IgniteUtils.java | 12 ++++
.../spi/discovery/DiscoveryNotification.java | 17 +++---
.../apache/ignite/spi/discovery/DiscoverySpi.java | 3 +-
.../spi/discovery/DiscoverySpiCustomMessage.java | 39 +++++--------
.../DiscoverySpiMutableCustomMessageSupport.java | 5 +-
.../discovery/isolated/IsolatedDiscoverySpi.java | 6 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 19 +++---
.../ignite/spi/discovery/tcp/ServerImpl.java | 17 +++---
.../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +-
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 4 +-
.../messages/TcpDiscoveryCustomEventMessage.java | 15 +++--
.../TcpDiscoveryServerOnlyCustomEventMessage.java | 4 +-
.../main/resources/META-INF/classnames.properties | 1 -
.../ignite/internal/DiscoverySpiTestListener.java | 12 ++--
.../internal/binary/BinaryMarshallerSelfTest.java | 6 +-
.../internal/client/thin/ServiceAwarenessTest.java | 11 ++--
.../IgniteDiscoverySpiInternalListener.java | 3 +-
.../cache/ClientSlowDiscoveryAbstractTest.java | 6 +-
...IgniteMarshallerCacheClassNameConflictTest.java | 4 +-
...teMarshallerCacheClientRequestsMappingTest.java | 56 ++++++++----------
.../cache/IgniteMarshallerCacheFSRestoreTest.java | 4 +-
.../cache/binary/BinaryMetadataRemoveTest.java | 9 +--
.../GridBinaryCacheEntryMemorySizeSelfTest.java | 4 +-
.../db/IgniteSequentialNodeCrashRecoveryTest.java | 6 +-
.../snapshot/AbstractSnapshotSelfTest.java | 24 ++++----
.../IncrementalSnapshotJoiningClientTest.java | 6 +-
.../IgniteNoCustomEventsOnNodeStart.java | 8 +--
.../query/schema/IndexWithSameNameTestBase.java | 8 +--
.../NodeSecurityContextPropagationTest.java | 6 +-
.../service/ServiceConcurrentUndeployTest.java | 13 ++---
.../ServiceDeploymentProcessAbstractTest.java | 8 +--
.../discovery/DiscoverySpiDataExchangeTest.java | 3 +-
.../spi/discovery/tcp/BlockTcpDiscoverySpi.java | 11 +---
.../TcpDiscoveryPendingMessageDeliveryTest.java | 3 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 6 +-
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 4 +-
.../apache/ignite/testframework/GridTestUtils.java | 36 +-----------
.../testframework/junits/GridAbstractTest.java | 4 +-
.../junits/GridTestBinaryMarshaller.java | 4 +-
.../cache/BinaryTypeRegistrationTest.java | 11 ++--
.../cache/CacheRegisterMetadataLocallyTest.java | 13 ++---
.../internal/processors/query/KillQueryTest.java | 37 ++++++------
.../processors/query/RunningQueriesTest.java | 47 +++++++--------
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 4 +-
.../ZkCommunicationErrorResolveFinishMessage.java | 5 +-
.../ZkCommunicationErrorResolveStartMessage.java | 5 +-
.../zk/internal/ZkDiscoveryCustomEventData.java | 3 +-
.../zk/internal/ZkForceNodeFailMessage.java | 5 +-
.../discovery/zk/internal/ZkNoServersMessage.java | 5 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 20 +++----
.../discovery/zk/TestZookeeperDiscoverySpi.java | 4 +-
56 files changed, 290 insertions(+), 409 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
deleted file mode 100644
index 29aa940ecf2..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.discovery;
-
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class CustomMessageWrapper implements DiscoverySpiCustomMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final DiscoveryCustomMessage delegate;
-
- /**
- * @param delegate Delegate.
- */
- public CustomMessageWrapper(DiscoveryCustomMessage delegate) {
- this.delegate = delegate;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
- DiscoveryCustomMessage res = delegate.ackMessage();
-
- return res == null ? null : new CustomMessageWrapper(res);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMutable() {
- return delegate.isMutable();
- }
-
- /** {@inheritDoc} */
- @Override public boolean stopProcess() {
- return delegate.stopProcess();
- }
-
- /**
- * @return Delegate.
- */
- public DiscoveryCustomMessage delegate() {
- return delegate;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return delegate.toString();
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index c3cd0aba66c..3df16a103b7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.managers.discovery;
import java.io.Serializable;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.jetbrains.annotations.Nullable;
@@ -93,8 +92,6 @@ public interface DiscoveryCustomMessage extends Serializable {
public boolean isMutable();
/**
- * See {@link DiscoverySpiCustomMessage#stopProcess()}.
- *
* @return {@code True} if message should not be sent to others nodes
after it was processed on coordinator.
*/
public default boolean stopProcess() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index de9a08f18f3..387e22103eb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -132,7 +132,6 @@ import
org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -593,8 +592,8 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
ClusterNode node = notification.getNode();
long topVer = notification.getTopVer();
- DiscoveryCustomMessage customMsg =
notification.getCustomMsgData() == null ? null
- :
((CustomMessageWrapper)notification.getCustomMsgData()).delegate();
+ DiscoveryCustomMessage customMsg =
U.unwrapCustomMessage(notification.customMessage() == null ?
+ null : notification.customMessage());
if (skipMessage(notification.type(), customMsg))
return;
@@ -933,7 +932,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
/** */
@Override public void run() {
- DiscoverySpiCustomMessage customMsg =
notification.getCustomMsgData();
+ DiscoveryCustomMessage customMsg =
notification.customMessage();
if (customMsg instanceof
SecurityAwareCustomMessageWrapper) {
UUID secSubjId =
((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId();
@@ -2336,7 +2335,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
getSpi().sendCustomEvent(security.enabled()
? new SecurityAwareCustomMessageWrapper(msg,
security.securityContext().subject().id())
- : new CustomMessageWrapper(msg));
+ : msg);
}
catch (IgniteClientDisconnectedException e) {
IgniteFuture<?> reconnectFut =
ctx.cluster().clientReconnectFuture();
@@ -2941,7 +2940,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
Collection<ClusterNode> topSnapshot;
/** Data. */
- @Nullable DiscoveryCustomMessage data;
+ @Nullable DiscoveryCustomMessage customMsg;
/** Span container. */
SpanContainer spanContainer;
@@ -2955,7 +2954,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
* @param node Node.
* @param discoCache Disco cache.
* @param topSnapshot Topology snapshot.
- * @param data Data.
+ * @param customMsg Data.
* @param spanContainer Span container.
*/
public NotificationEvent(
@@ -2964,7 +2963,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
ClusterNode node,
DiscoCache discoCache,
Collection<ClusterNode> topSnapshot,
- @Nullable DiscoveryCustomMessage data,
+ @Nullable DiscoveryCustomMessage customMsg,
SpanContainer spanContainer,
SecurityContext secCtx
) {
@@ -2973,7 +2972,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
this.node = node;
this.discoCache = discoCache;
this.topSnapshot = topSnapshot;
- this.data = data;
+ this.customMsg = customMsg;
this.spanContainer = spanContainer;
this.secCtx = secCtx;
}
@@ -3072,7 +3071,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
* @param notificationEvt Notification event.
*/
void addEvent(NotificationEvent notificationEvt) {
- assert notificationEvt.node != null : notificationEvt.data;
+ assert notificationEvt.node != null : notificationEvt.customMsg;
if (notificationEvt.type == EVT_CLIENT_NODE_DISCONNECTED)
discoWrk.disconnectEvtFut = new GridFutureAdapter();
@@ -3228,11 +3227,11 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
customEvt.type(type);
customEvt.topologySnapshot(topVer.topologyVersion(), evt.topSnapshot);
customEvt.affinityTopologyVersion(topVer);
- customEvt.customMessage(evt.data);
+ customEvt.customMessage(evt.customMsg);
customEvt.span(evt.spanContainer != null ?
evt.spanContainer.span() : null);
if (evt.discoCache == null) {
- assert discoCache != null : evt.data;
+ assert discoCache != null : evt.customMsg;
evt.discoCache = discoCache;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
index 49ecd2e518e..1b1b8ce62f1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
@@ -21,18 +21,20 @@ import java.util.UUID;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
-/** Extends {@link CustomMessageWrapper} with ID of security subject that
initiated the current message. */
-public class SecurityAwareCustomMessageWrapper extends CustomMessageWrapper {
+/** Custom message wrapper with ID of security subject that initiated the
current message. */
+public class SecurityAwareCustomMessageWrapper extends
DiscoverySpiCustomMessage {
/** */
private static final long serialVersionUID = 0L;
/** Security subject ID. */
private final UUID secSubjId;
+ /** Original message. */
+ private final DiscoveryCustomMessage delegate;
+
/** */
public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate,
UUID secSubjId) {
- super(delegate);
-
+ this.delegate = delegate;
this.secSubjId = secSubjId;
}
@@ -42,8 +44,25 @@ public class SecurityAwareCustomMessageWrapper extends
CustomMessageWrapper {
}
/** {@inheritDoc} */
- @Override public @Nullable DiscoverySpiCustomMessage ackMessage() {
- DiscoveryCustomMessage ack = delegate().ackMessage();
+ @Override public boolean isMutable() {
+ return delegate.isMutable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return delegate.stopProcess();
+ }
+
+ /**
+ * @return Delegate.
+ */
+ public DiscoveryCustomMessage delegate() {
+ return delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+ DiscoveryCustomMessage ack = delegate.ackMessage();
return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack,
secSubjId);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java
index 3946c4fd2a8..3dd6d0b5f45 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java
@@ -23,13 +23,13 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiNoop;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
@@ -101,7 +101,7 @@ public class StandaloneNoopDiscoverySpi extends
IgniteSpiAdapter implements Disc
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws
IgniteException {
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3c46f639796..762f46f1e96 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -187,7 +187,9 @@ import org.apache.ignite.internal.logger.IgniteLoggerEx;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import
org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
import
org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -8305,4 +8307,14 @@ public abstract class IgniteUtils extends CommonUtils {
return (IgniteDataTransferObjectSerializer<T>)EMPTY_DTO_SERIALIZER;
}
}
+
+ /**
+ * Unwraps messsage if it is wrapped by {@link
SecurityAwareCustomMessageWrapper}.
+ *
+ * @param msg Message.
+ */
+ public static DiscoveryCustomMessage
unwrapCustomMessage(DiscoveryCustomMessage msg) {
+ return msg instanceof SecurityAwareCustomMessageWrapper ?
+ ((SecurityAwareCustomMessageWrapper)msg).delegate() : msg;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
index 6dc38c1c60d..194b5b3e781 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery;
import java.util.Collection;
import java.util.NavigableMap;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.jetbrains.annotations.Nullable;
@@ -41,8 +42,8 @@ public class DiscoveryNotification {
/** Topology history. */
private @Nullable NavigableMap<Long, Collection<ClusterNode>> topHist;
- /** Custom message data. */
- private @Nullable DiscoverySpiCustomMessage customMsgData;
+ /** Custom message. */
+ private @Nullable DiscoveryCustomMessage customMsg;
/** Span container. */
private SpanContainer spanContainer;
@@ -66,7 +67,7 @@ public class DiscoveryNotification {
* @param node Node.
* @param topSnapshot Topology snapshot.
* @param topHist Topology history.
- * @param customMsgData Custom message data.
+ * @param customMsg Custom message.
* @param spanContainer Span container.
*/
public DiscoveryNotification(
@@ -75,7 +76,7 @@ public class DiscoveryNotification {
ClusterNode node,
Collection<ClusterNode> topSnapshot,
@Nullable NavigableMap<Long, Collection<ClusterNode>> topHist,
- @Nullable DiscoverySpiCustomMessage customMsgData,
+ @Nullable DiscoveryCustomMessage customMsg,
SpanContainer spanContainer
) {
this.eventType = eventType;
@@ -83,7 +84,7 @@ public class DiscoveryNotification {
this.node = node;
this.topSnapshot = topSnapshot;
this.topHist = topHist;
- this.customMsgData = customMsgData;
+ this.customMsg = customMsg;
this.spanContainer = spanContainer;
}
@@ -123,10 +124,10 @@ public class DiscoveryNotification {
}
/**
- * @return Custom message data.
+ * @return Custom message.
*/
- public DiscoverySpiCustomMessage getCustomMsgData() {
- return customMsgData;
+ public DiscoveryCustomMessage customMessage() {
+ return customMsg;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 545e1a043e7..3d51a2fc3bf 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiException;
@@ -153,7 +154,7 @@ public interface DiscoverySpi extends IgniteSpi {
* @param msg Custom message.
* @throws IgniteException if failed to sent the event message.
*/
- public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws
IgniteException;
+ public void sendCustomEvent(DiscoveryCustomMessage msg) throws
IgniteException;
/**
* Initiates failure of provided node.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 6e11673d8e2..661234b5a7e 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -17,37 +17,28 @@
package org.apache.ignite.spi.discovery;
-import java.io.Serializable;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.lang.IgniteUuid;
/**
* Message to send across ring.
*
* @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
+ * TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27778
*/
-public interface DiscoverySpiCustomMessage extends Serializable {
- /**
- * Called when custom message has been handled by all nodes.
- *
- * @return Ack message or {@code null} if ack is not required.
- */
- @Nullable public DiscoverySpiCustomMessage ackMessage();
+@Deprecated(forRemoval = true)
+public abstract class DiscoverySpiCustomMessage implements
DiscoveryCustomMessage {
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return null;
+ }
- /**
- * @return {@code True} if message can be modified during listener
notification. Changes will be send to next nodes.
- */
- public boolean isMutable();
-
- /**
- * Called on discovery coordinator node after listener is notified. If
returns {@code true}
- * then message is not passed to others nodes, if after this method {@link
#ackMessage()} returns non-null ack
- * message, it is sent to all nodes.
- *
- * Note: this method is used then and only then the zookeeper discovery is
configured.
- *
- * @return {@code True} if message should not be sent to all nodes.
- */
- public boolean stopProcess();
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer,
+ DiscoCache discoCache) {
+ return null;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
index e7952682cb7..ccd2fe1978b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
@@ -23,10 +23,11 @@ import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
/**
* This annotation is for all implementations of {@link DiscoverySpi} that
support
- * topology mutable {@link DiscoverySpiCustomMessage}s.
+ * topology mutable {@link DiscoveryCustomMessage}s.
*/
@Documented
@Inherited
@@ -34,7 +35,7 @@ import java.lang.annotation.Target;
@Target({ElementType.TYPE})
public @interface DiscoverySpiMutableCustomMessageSupport {
/**
- * @return Whether or not target SPI supports mutable {@link
DiscoverySpiCustomMessage}s.
+ * @return Whether or not target SPI supports mutable {@link
DiscoveryCustomMessage}s.
*/
public boolean value();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
index 1e0d02019fb..9e00dd311a9 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProductVersion;
@@ -41,7 +42,6 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -159,7 +159,7 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter
implements IgniteDisc
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws
IgniteException {
exec.execute(() -> {
IgniteFuture<?> fut = lsnr.onDiscovery(new DiscoveryNotification(
EVT_DISCOVERY_CUSTOM_EVT,
@@ -172,7 +172,7 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter
implements IgniteDisc
// Acknowledge message must be send after initial message
processed.
fut.listen((f) -> {
- DiscoverySpiCustomMessage ack = msg.ackMessage();
+ DiscoveryCustomMessage ack = msg.ackMessage();
if (ack != null) {
exec.execute(() -> lsnr.onDiscovery(new
DiscoveryNotification(
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index dfa7d8cfeef..9d9e588747a 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -65,7 +65,7 @@ import
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
@@ -89,7 +89,6 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -492,7 +491,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage evt) {
State state = this.state;
if (state == DISCONNECTED)
@@ -504,7 +503,9 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
TcpDiscoveryCustomEventMessage msg;
- if (((CustomMessageWrapper)evt).delegate() instanceof
DiscoveryServerOnlyCustomMessage)
+ DiscoveryCustomMessage customMsg = U.unwrapCustomMessage(evt);
+
+ if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt,
U.marshal(spi.marshaller(), evt));
else
@@ -515,7 +516,7 @@ class ClientImpl extends TcpDiscoveryImpl {
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.CONSISTENT_ID),
() -> locNode.consistentId().toString())
- .addTag(SpanTags.MESSAGE_CLASS, () ->
((CustomMessageWrapper)evt).delegate().getClass().getSimpleName())
+ .addTag(SpanTags.MESSAGE_CLASS, () ->
customMsg.getClass().getSimpleName())
.addLog(() -> "Created");
// This root span will be parent both from local and remote nodes.
@@ -2599,7 +2600,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (node != null && node.visible()) {
try {
- DiscoverySpiCustomMessage msgObj =
msg.message(spi.marshaller(),
+ DiscoveryCustomMessage msgObj =
msg.message(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
notifyDiscovery(
@@ -2688,14 +2689,14 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param topVer Topology version.
* @param node Node.
* @param top Topology snapshot.
- * @param data Optional custom message data.
+ * @param customMsg Optional custom message.
*/
private void notifyDiscovery(
int type,
long topVer,
ClusterNode node,
Collection<ClusterNode> top,
- @Nullable DiscoverySpiCustomMessage data,
+ @Nullable DiscoveryCustomMessage customMsg,
SpanContainer spanContainer
) {
DiscoverySpiListener lsnr = spi.lsnr;
@@ -2708,7 +2709,7 @@ class ClientImpl extends TcpDiscoveryImpl {
", topVer=" + topVer + ']');
lsnr.onDiscovery(
- new DiscoveryNotification(type, topVer, node, top, new
TreeMap<>(topHist), data, spanContainer)
+ new DiscoveryNotification(type, topVer, node, top, new
TreeMap<>(topHist), customMsg, spanContainer)
).get();
}
else if (debugLog.isDebugEnabled())
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index fa69db0cde1..d015188d6d2 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -78,7 +78,7 @@ import
org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
@@ -121,7 +121,6 @@ import
org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
@@ -1018,11 +1017,13 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage evt) {
try {
TcpDiscoveryCustomEventMessage msg;
- if (((CustomMessageWrapper)evt).delegate() instanceof
DiscoveryServerOnlyCustomMessage)
+ DiscoveryCustomMessage customMsg = U.unwrapCustomMessage(evt);
+
+ if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt,
U.marshal(spi.marshaller(), evt));
else
@@ -1033,7 +1034,7 @@ class ServerImpl extends TcpDiscoveryImpl {
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.CONSISTENT_ID),
() -> locNode.consistentId().toString())
- .addTag(SpanTags.MESSAGE_CLASS, () ->
((CustomMessageWrapper)evt).delegate().getClass().getSimpleName())
+ .addTag(SpanTags.MESSAGE_CLASS, () ->
customMsg.getClass().getSimpleName())
.addLog(() -> "Created");
// This root span will be parent both from local and remote nodes.
@@ -6079,7 +6080,7 @@ class ServerImpl extends TcpDiscoveryImpl {
else {
addMessage(new
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
- DiscoverySpiCustomMessage msgObj = null;
+ DiscoveryCustomMessage msgObj = null;
try {
msgObj = msg.message(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
@@ -6089,7 +6090,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msgObj != null) {
- DiscoverySpiCustomMessage nextMsg =
msgObj.ackMessage();
+ DiscoveryCustomMessage nextMsg = msgObj.ackMessage();
if (nextMsg != null) {
try {
@@ -6267,7 +6268,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (node == null)
return;
- DiscoverySpiCustomMessage msgObj;
+ DiscoveryCustomMessage msgObj;
try {
msgObj = msg.message(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e115a3cca03..aa7dc4a6322 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot;
import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
@@ -49,7 +50,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiThread;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientNodesMetricsMessage;
@@ -281,7 +281,7 @@ abstract class TcpDiscoveryImpl {
* @param msg Message.
* @throws IgniteException If failed.
*/
- public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws
IgniteException;
+ public abstract void sendCustomEvent(DiscoveryCustomMessage msg) throws
IgniteException;
/**
* @param nodeId Node id.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c748fee1832..0d99d795a19 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -57,6 +57,7 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
@@ -91,7 +92,6 @@ import org.apache.ignite.spi.IgniteSpiVersionCheckException;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -537,7 +537,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws
IgniteException {
impl.sendCustomEvent(msg);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index cd1b90b348c..73ac084a3f1 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -20,12 +20,11 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -39,7 +38,7 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
private static final long serialVersionUID = 0L;
/** */
- private transient volatile DiscoverySpiCustomMessage msg;
+ private transient volatile DiscoveryCustomMessage msg;
/** */
private byte[] msgBytes;
@@ -49,7 +48,7 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
* @param msg Message.
* @param msgBytes Serialized message.
*/
- public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable
DiscoverySpiCustomMessage msg,
+ public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable
DiscoveryCustomMessage msg,
@NotNull byte[] msgBytes) {
super(creatorNodeId);
@@ -64,7 +63,7 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
public TcpDiscoveryCustomEventMessage(TcpDiscoveryCustomEventMessage msg) {
super(msg);
- this.msgBytes = msg.msgBytes;
+ msgBytes = msg.msgBytes;
this.msg = msg.msg;
}
@@ -86,7 +85,7 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
* @param msg Message.
* @param msgBytes Serialized message.
*/
- public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull
byte[] msgBytes) {
+ public void message(@Nullable DiscoveryCustomMessage msg, @NotNull byte[]
msgBytes) {
this.msg = msg;
this.msgBytes = msgBytes;
}
@@ -97,7 +96,7 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
* @return Deserialized message,
* @throws java.lang.Throwable if unmarshal failed.
*/
- @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller
marsh, ClassLoader ldr) throws Throwable {
+ @Nullable public DiscoveryCustomMessage message(@NotNull Marshaller marsh,
ClassLoader ldr) throws Throwable {
if (msg == null) {
try {
msg = U.unmarshal(marsh, msgBytes, ldr);
@@ -105,7 +104,7 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
catch (IgniteCheckedException e) {
// Try to resurrect a message in a case of deserialization
failure
if (e.getCause() instanceof IncompleteDeserializationException)
- return new
CustomMessageWrapper(((IncompleteDeserializationException)e.getCause()).message());
+ return
((IncompleteDeserializationException)e.getCause()).message();
throw e;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
index 97f701ed6e8..15122f1e50b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
@@ -18,7 +18,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.jetbrains.annotations.NotNull;
/**
@@ -34,7 +34,7 @@ public class TcpDiscoveryServerOnlyCustomEventMessage extends
TcpDiscoveryCustom
* @param msg Message.
* @param msgBytes Serialized message.
*/
- public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId,
@NotNull DiscoverySpiCustomMessage msg,
+ public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId,
@NotNull DiscoveryCustomMessage msg,
@NotNull byte[] msgBytes) {
super(creatorNodeId, msg, msgBytes);
}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index a6b3f0122c0..dd613bffcd7 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -719,7 +719,6 @@
org.apache.ignite.internal.managers.deployment.GridDeploymentPerVersionStore$2
org.apache.ignite.internal.managers.deployment.GridDeploymentRequest
org.apache.ignite.internal.managers.deployment.GridDeploymentResponse
org.apache.ignite.internal.managers.deployment.P2PClassNotFoundException
-org.apache.ignite.internal.managers.discovery.CustomMessageWrapper
org.apache.ignite.internal.managers.discovery.DiscoCache$1
org.apache.ignite.internal.managers.discovery.DiscoCache$2
org.apache.ignite.internal.managers.discovery.DiscoCache$3
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
index 01a3955c09a..465f7f78d47 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
@@ -30,8 +30,6 @@ import
org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.testframework.GridTestUtils;
/**
* Test callback for discovery SPI.
@@ -52,7 +50,7 @@ public class DiscoverySpiTestListener implements
IgniteDiscoverySpiInternalListe
private final Object mux = new Object();
/** */
- private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>();
+ private List<DiscoveryCustomMessage> blockedMsgs = new ArrayList<>();
/** */
private volatile DiscoverySpi spi;
@@ -121,13 +119,13 @@ public class DiscoverySpiTestListener implements
IgniteDiscoverySpiInternalListe
}
/** {@inheritDoc} */
- @Override public boolean beforeSendCustomEvent(DiscoverySpi spi,
IgniteLogger log, DiscoverySpiCustomMessage msg) {
+ @Override public boolean beforeSendCustomEvent(DiscoverySpi spi,
IgniteLogger log, DiscoveryCustomMessage msg) {
this.spi = spi;
this.log = log;
synchronized (mux) {
if (blockCustomEvtCls != null) {
- DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg,
"delegate");
+ DiscoveryCustomMessage msg0 = U.unwrapCustomMessage(msg);
if (blockCustomEvtCls.contains(msg0.getClass())) {
log.info("Block custom message: " + msg0);
@@ -176,7 +174,7 @@ public class DiscoverySpiTestListener implements
IgniteDiscoverySpiInternalListe
if (spi == null)
return;
- List<DiscoverySpiCustomMessage> msgs;
+ List<DiscoveryCustomMessage> msgs;
synchronized (this) {
msgs = new ArrayList<>(blockedMsgs);
@@ -186,7 +184,7 @@ public class DiscoverySpiTestListener implements
IgniteDiscoverySpiInternalListe
blockedMsgs.clear();
}
- for (DiscoverySpiCustomMessage msg : msgs) {
+ for (DiscoveryCustomMessage msg : msgs) {
log.info("Resend blocked message: " + msg);
spi.sendCustomEvent(msg);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index dcc49c013e4..308b390cef1 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -85,6 +85,7 @@ import
org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryStreams;
import org.apache.ignite.internal.binary.streams.BinaryStreamsTestUtils;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
import org.apache.ignite.internal.managers.systemview.JmxSystemViewExporterSpi;
@@ -100,7 +101,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -4159,7 +4159,7 @@ public class BinaryMarshallerSelfTest extends
AbstractBinaryArraysTest {
iCfg.setBinaryConfiguration(bCfg);
iCfg.setClientMode(false);
iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
//No-op.
}
});
@@ -4217,7 +4217,7 @@ public class BinaryMarshallerSelfTest extends
AbstractBinaryArraysTest {
iCfg.setBinaryConfiguration(bCfg);
iCfg.setClientMode(false);
iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
//No-op.
}
});
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
index 5c87c1819db..77ae572bd0e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java
@@ -49,7 +49,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.service.GridServiceProxy;
@@ -60,7 +59,6 @@ import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.ServiceConfiguration;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
@@ -646,13 +644,12 @@ public class ServiceAwarenessTest extends
AbstractThinClientTest {
private final Set<Class<? extends DiscoveryCustomMessage>> toBlock =
new HashSet<>();
/** */
- private final List<CustomMessageWrapper> blocked = new
CopyOnWriteArrayList<>();
+ private final List<DiscoveryCustomMessage> blocked = new
CopyOnWriteArrayList<>();
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
- if (msg instanceof CustomMessageWrapper
- && toBlock.stream().anyMatch(mt ->
mt.isAssignableFrom(((CustomMessageWrapper)msg).delegate().getClass()))) {
- blocked.add((CustomMessageWrapper)msg);
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
+ if (toBlock.stream().anyMatch(mt ->
mt.isAssignableFrom(U.unwrapCustomMessage(msg).getClass()))) {
+ blocked.add(msg);
return;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
index 6b25e9ff5d9..3d0a267a2ff 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.managers.discovery;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
/**
* For TESTING only.
@@ -46,5 +45,5 @@ public interface IgniteDiscoverySpiInternalListener {
* @param msg Custom message.
* @return {@code False} to cancel event send.
*/
- public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log,
DiscoverySpiCustomMessage msg);
+ public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log,
DiscoveryCustomMessage msg);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
index a31feaffb66..cf1871b1523 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
@@ -23,13 +23,11 @@ import
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -99,12 +97,12 @@ public class ClientSlowDiscoveryAbstractTest extends
GridCommonAbstractTest {
DiscoveryCustomMessage delegate;
try {
- DiscoverySpiCustomMessage custMsg = cm.message(marshaller(),
+ DiscoveryCustomMessage custMsg = cm.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
assertNotNull(custMsg);
- delegate = ((CustomMessageWrapper)custMsg).delegate();
+ delegate = U.unwrapCustomMessage(custMsg);
}
catch (Throwable throwable) {
throw new RuntimeException(throwable);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
index 67ff9fafa19..bb103c2625e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
@@ -198,8 +198,8 @@ public class IgniteMarshallerCacheClassNameConflictTest
extends GridCommonAbstra
@Override public IgniteFuture<?> onDiscovery(
DiscoveryNotification notification
) {
- DiscoveryCustomMessage customMsg =
notification.getCustomMsgData() == null ? null
- :
(DiscoveryCustomMessage)U.field(notification.getCustomMsgData(), "delegate");
+ DiscoveryCustomMessage customMsg =
notification.customMessage() == null ? null
+ : U.unwrapCustomMessage(notification.customMessage());
if (customMsg != null) {
//don't want to make this class public, using equality of
class name instead of instanceof operator
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
index c485ac94417..388d883de1b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SharedFileTree;
@@ -43,7 +42,6 @@ import
org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMe
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -163,25 +161,23 @@ public class
IgniteMarshallerCacheClientRequestsMappingTest extends GridCommonAb
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
- DiscoverySpiCustomMessage custom =
+ DiscoveryCustomMessage customMsg =
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
U.gridClassLoader());
- if (custom instanceof CustomMessageWrapper) {
- DiscoveryCustomMessage delegate =
((CustomMessageWrapper)custom).delegate();
-
- if (delegate instanceof
MappingAcceptedMessage) {
- MarshallerMappingItem item =
GridTestUtils.getFieldValue(delegate, "item");
-
- if
(item.className().equals(PERSON_CLASS_NAME) ||
-
item.className().equals(ORGANIZATION_CLASS_NAME) ||
-
item.className().equals(ADDRESS_CLASS_NAME)
- ) {
- try {
- U.await(delayMappingLatch,
AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- }
- catch (Exception e) {
- fail("Mapping proposed message
must be released.");
- }
+ DiscoveryCustomMessage delegate =
U.unwrapCustomMessage(customMsg);
+
+ if (delegate instanceof MappingAcceptedMessage) {
+ MarshallerMappingItem item =
GridTestUtils.getFieldValue(delegate, "item");
+
+ if (item.className().equals(PERSON_CLASS_NAME)
||
+
item.className().equals(ORGANIZATION_CLASS_NAME) ||
+ item.className().equals(ADDRESS_CLASS_NAME)
+ ) {
+ try {
+ U.await(delayMappingLatch,
AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ fail("Mapping proposed message must be
released.");
}
}
}
@@ -240,22 +236,20 @@ public class
IgniteMarshallerCacheClientRequestsMappingTest extends GridCommonAb
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
- DiscoverySpiCustomMessage custom =
+ DiscoveryCustomMessage customMsg =
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
U.gridClassLoader());
- if (custom instanceof CustomMessageWrapper) {
- DiscoveryCustomMessage delegate =
((CustomMessageWrapper)custom).delegate();
+ DiscoveryCustomMessage delegate =
U.unwrapCustomMessage(customMsg);
- if (delegate instanceof
MappingProposedMessage) {
- MarshallerMappingItem item =
GridTestUtils.getFieldValue(delegate, "mappingItem");
+ if (delegate instanceof MappingProposedMessage) {
+ MarshallerMappingItem item =
GridTestUtils.getFieldValue(delegate, "mappingItem");
- if
(item.className().contains(JOB_RESULT_CLASS_NAME_PREFIX)) {
- try {
- U.await(latch,
AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- }
- catch (Exception e) {
- fail("Exception must never be
thrown: " + e.getMessage());
- }
+ if
(item.className().contains(JOB_RESULT_CLASS_NAME_PREFIX)) {
+ try {
+ U.await(latch,
AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ fail("Exception must never be thrown:
" + e.getMessage());
}
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
index fb5496ef596..c9a41b9cc58 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
@@ -250,8 +250,8 @@ public class IgniteMarshallerCacheFSRestoreTest extends
GridCommonAbstractTest {
@Override public IgniteFuture<?> onDiscovery(
DiscoveryNotification notification
) {
- DiscoveryCustomMessage customMsg =
notification.getCustomMsgData() == null ? null
- :
(DiscoveryCustomMessage)U.field(notification.getCustomMsgData(), "delegate");
+ DiscoveryCustomMessage customMsg =
notification.customMessage() == null ? null
+ : U.unwrapCustomMessage(notification.customMessage());
if (customMsg != null) {
//don't want to make this class public, using equality of
class name instead of instanceof operator
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java
index 7a8193f3b12..420cf01d033 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java
@@ -31,10 +31,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -184,14 +182,11 @@ public class BinaryMetadataRemoveTest extends
GridCommonAbstractTest {
AtomicBoolean hookMsgs = new AtomicBoolean(true);
discoveryHook = new GridTestUtils.DiscoveryHook() {
- @Override public void beforeDiscovery(DiscoverySpiCustomMessage
msg) {
+ @Override public void beforeDiscovery(DiscoveryCustomMessage msg) {
if (!hookMsgs.get())
return;
- DiscoveryCustomMessage customMsg = msg == null ? null
- : (DiscoveryCustomMessage)IgniteUtils.field(msg,
"delegate");
-
- if (customMsg instanceof MetadataRemoveProposedMessage) {
+ if (msg instanceof MetadataRemoveProposedMessage) {
try {
barrier0.await();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
index 15910a33d19..f359db9fc7a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
import org.apache.ignite.internal.managers.systemview.JmxSystemViewExporterSpi;
@@ -28,7 +29,6 @@ import
org.apache.ignite.internal.processors.cache.GridCacheEntryMemorySizeSelfT
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -43,7 +43,7 @@ public class GridBinaryCacheEntryMemorySizeSelfTest extends
GridCacheEntryMemory
IgniteConfiguration iCfg = new IgniteConfiguration();
iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
// No-op.
}
});
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
index dc4f15bd07b..724e2e10678 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
@@ -67,7 +66,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -357,7 +355,7 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
/** */
private DiscoveryCustomMessage
extractCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- DiscoverySpiCustomMessage msgObj = null;
+ DiscoveryCustomMessage msgObj = null;
try {
msgObj = msg.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
@@ -366,7 +364,7 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
U.error(log, "Failed to unmarshal discovery custom message.",
e);
}
- return ((CustomMessageWrapper)msgObj).delegate();
+ return U.unwrapCustomMessage(msgObj);
}
/** Unblock discovery custom messages. */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 3a490545429..715b771d611 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -68,7 +68,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.encryption.AbstractEncryptionTest;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import
org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
@@ -97,7 +96,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi;
import org.apache.ignite.testframework.GridTestUtils;
@@ -910,24 +908,22 @@ public abstract class AbstractSnapshotSelfTest extends
GridCommonAbstractTest {
/** */
protected static class BlockingCustomMessageDiscoverySpi extends
TcpDiscoverySpi {
/** List of messages which have been blocked. */
- private final List<DiscoverySpiCustomMessage> blocked = new
CopyOnWriteArrayList<>();
+ private final List<DiscoveryCustomMessage> blocked = new
CopyOnWriteArrayList<>();
/** Discovery custom message filter. */
private volatile IgnitePredicate<DiscoveryCustomMessage> blockPred;
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
- if (msg instanceof CustomMessageWrapper) {
- DiscoveryCustomMessage msg0 =
((CustomMessageWrapper)msg).delegate();
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
+ DiscoveryCustomMessage msg0 = U.unwrapCustomMessage(msg);
- if (blockPred != null && blockPred.apply(msg0)) {
- blocked.add(msg);
+ if (blockPred != null && blockPred.apply(msg0)) {
+ blocked.add(msg);
- if (log.isInfoEnabled())
- log.info("Discovery message has been blocked: " +
msg0);
+ if (log.isInfoEnabled())
+ log.info("Discovery message has been blocked: " + msg0);
- return;
- }
+ return;
}
super.sendCustomEvent(msg);
@@ -954,11 +950,11 @@ public abstract class AbstractSnapshotSelfTest extends
GridCommonAbstractTest {
/** Releases the blocked messages. */
private void releaseBlocked() {
- List<DiscoverySpiCustomMessage> blocked = new
CopyOnWriteArrayList<>(this.blocked);
+ List<DiscoveryCustomMessage> blocked = new
CopyOnWriteArrayList<>(this.blocked);
this.blocked.clear();
- for (DiscoverySpiCustomMessage msg : blocked)
+ for (DiscoveryCustomMessage msg : blocked)
sendCustomEvent(msg);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
index aaae9d6b9e2..3de54f1f3ba 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import
org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -247,10 +247,10 @@ public class IncrementalSnapshotJoiningClientTest extends
AbstractIncrementalSna
TcpDiscoveryCustomEventMessage m =
(TcpDiscoveryCustomEventMessage)msg;
try {
- CustomMessageWrapper m0 = (CustomMessageWrapper)m.message(
+ DiscoveryCustomMessage m0 = m.message(
marshaller(),
U.resolveClassLoader(ignite().configuration()));
- if (m0.delegate() instanceof InitMessage)
+ if (U.unwrapCustomMessage(m0) instanceof InitMessage)
rcvStartSnpReq.countDown();
}
catch (Throwable e) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
index 96e2b462fc2..c94eab7a02f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.processors.continuous;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -60,8 +60,8 @@ public class IgniteNoCustomEventsOnNodeStart extends
GridCommonAbstractTest {
*/
static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
- if (GridTestUtils.getFieldValue(msg, "delegate") instanceof
CacheAffinityChangeMessage)
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg) {
+ if (U.unwrapCustomMessage(msg) instanceof
CacheAffinityChangeMessage)
return;
failed = true;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
index fce48f3bbbc..df51e031417 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
@@ -40,7 +40,6 @@ import
org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import
org.apache.ignite.internal.processors.query.schema.management.IndexDescriptor;
@@ -50,7 +49,6 @@ import
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexC
import org.apache.ignite.internal.util.lang.ConsumerX;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -313,10 +311,10 @@ public abstract class IndexWithSameNameTestBase extends
GridCommonAbstractTest {
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
- DiscoverySpiCustomMessage spiCustomMsg =
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
- U.resolveClassLoader(ignite().configuration()));
+ TcpDiscoveryCustomEventMessage evtMsg =
(TcpDiscoveryCustomEventMessage)msg;
- DiscoveryCustomMessage discoCustomMsg =
((CustomMessageWrapper)spiCustomMsg).delegate();
+ DiscoveryCustomMessage discoCustomMsg =
U.unwrapCustomMessage(evtMsg.message(marshaller(),
+ U.gridClassLoader()));
if (discoCustomMsg instanceof
SchemaFinishDiscoveryMessage) {
SchemaFinishDiscoveryMessage finishMsg =
(SchemaFinishDiscoveryMessage)discoCustomMsg;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
index 3480bbe2cbb..81ce3e4dc58 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -44,7 +43,6 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -188,7 +186,7 @@ public class NodeSecurityContextPropagationTest extends
GridCommonAbstractTest {
Object unwrappedMsg = msg;
if (msg instanceof TcpDiscoveryCustomEventMessage) {
- DiscoverySpiCustomMessage customMsg = U.field(msg, "msg");
+ DiscoveryCustomMessage customMsg = U.field(msg, "msg");
if (customMsg == null) {
try {
@@ -205,7 +203,7 @@ public class NodeSecurityContextPropagationTest extends
GridCommonAbstractTest {
assert customMsg instanceof SecurityAwareCustomMessageWrapper;
- unwrappedMsg = ((CustomMessageWrapper)customMsg).delegate();
+ unwrappedMsg = U.unwrapCustomMessage(customMsg);
}
if (predicate.test(unwrappedMsg))
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java
index a040aca7cb1..1f0ca99ab5d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java
@@ -26,10 +26,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.DiscoverySpiTestListener;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.service.inner.LongInitializedTestService;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -50,13 +50,10 @@ public class ServiceConcurrentUndeployTest extends
GridCommonAbstractTest {
TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
disco.setInternalListener(new DiscoverySpiTestListener() {
- @Override public boolean beforeSendCustomEvent(DiscoverySpi spi,
IgniteLogger log, DiscoverySpiCustomMessage msg) {
+ @Override public boolean beforeSendCustomEvent(DiscoverySpi spi,
IgniteLogger log, DiscoveryCustomMessage msg) {
if (spi.isClientMode()) {
- boolean isUndeployMsg = msg instanceof CustomMessageWrapper
- && ((CustomMessageWrapper)msg).delegate() instanceof
ServiceChangeBatchRequest;
-
- if (isUndeployMsg) {
- ServiceChangeBatchRequest batch =
(ServiceChangeBatchRequest)((CustomMessageWrapper)msg).delegate();
+ if (U.unwrapCustomMessage(msg) instanceof
ServiceChangeBatchRequest) {
+ ServiceChangeBatchRequest batch =
(ServiceChangeBatchRequest)U.unwrapCustomMessage(msg);
long undeployReqCnt = batch.requests().stream()
.filter(r -> r instanceof
ServiceUndeploymentRequest)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java
index 5b4dc2ae74e..dcfe6ec5e97 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java
@@ -22,13 +22,13 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -102,8 +102,8 @@ public abstract class ServiceDeploymentProcessAbstractTest
extends GridCommonAbs
private volatile boolean block;
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
- if (block && GridTestUtils.getFieldValue(msg, "delegate")
instanceof ServiceClusterDeploymentResultBatch)
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
+ if (block && U.unwrapCustomMessage(msg) instanceof
ServiceClusterDeploymentResultBatch)
return;
super.sendCustomEvent(msg);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
index 30a7ec80298..9cffa43b922 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteProductVersion;
@@ -153,7 +154,7 @@ public class DiscoverySpiDataExchangeTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
delegate.sendCustomEvent(msg);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
index f72eac99994..59fac925a35 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
@@ -21,11 +21,9 @@ import java.io.IOException;
import java.net.Socket;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
@@ -55,22 +53,19 @@ public class BlockTcpDiscoverySpi extends
TestTcpDiscoverySpi {
TcpDiscoveryCustomEventMessage cm =
(TcpDiscoveryCustomEventMessage)msg;
- DiscoveryCustomMessage delegate;
+ DiscoveryCustomMessage custMsg;
try {
- DiscoverySpiCustomMessage custMsg = cm.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
+ custMsg = cm.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
assertNotNull(custMsg);
-
- delegate = ((CustomMessageWrapper)custMsg).delegate();
-
}
catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
if (clo != null)
- clo.apply(addr, delegate);
+ clo.apply(addr, U.unwrapCustomMessage(custMsg));
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index 15cb1f90382..0d156464075 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -24,7 +24,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -238,7 +237,7 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends
GridCommonAbstractTe
* @param id Message id.
*/
private void sendDummyCustomMessage(TcpDiscoverySpi disco, IgniteUuid id) {
- disco.sendCustomEvent(new CustomMessageWrapper(new
DummyCustomDiscoveryMessage(id)));
+ disco.sendCustomEvent(new DummyCustomDiscoveryMessage(id));
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 999c7c747c3..08012acc3a0 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -2607,8 +2607,10 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
if (stopBeforeSndAck) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
- DiscoveryCustomMessage custMsg =
GridTestUtils.getFieldValue(
-
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
U.gridClassLoader()), "delegate");
+ TcpDiscoveryCustomEventMessage evtMsg =
(TcpDiscoveryCustomEventMessage)msg;
+
+ DiscoveryCustomMessage custMsg =
U.unwrapCustomMessage(evtMsg.message(marshaller(),
+ U.gridClassLoader()));
if (custMsg instanceof
StartRoutineAckDiscoveryMessage) {
log.info("Skip message send and stop node: " +
msg);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index e3038bf5dc6..336c8537060 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -20,8 +20,8 @@ package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
@@ -73,7 +73,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws
IgniteException {
IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
if (internalLsnr != null) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 62e450ee10c..373ac47f9f4 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -94,7 +94,6 @@ import
org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -126,7 +125,6 @@ import
org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.ssl.SslContextFactory;
import org.apache.ignite.testframework.config.GridTestProperties;
@@ -177,16 +175,6 @@ public final class GridTestUtils {
* and thus allows to make assertions or other actions like skipping
certain discovery messages.
*/
public static class DiscoveryHook {
- /**
- * Handles discovery message before {@link
DiscoverySpiListener#onDiscovery} invocation.
- *
- * @param msg Intercepted discovery message.
- */
- public void beforeDiscovery(DiscoverySpiCustomMessage msg) {
- if (msg instanceof CustomMessageWrapper)
- beforeDiscovery(unwrap((CustomMessageWrapper)msg));
- }
-
/**
* Handles {@link DiscoveryCustomMessage} before {@link
DiscoverySpiListener#onDiscovery} invocation.
*
@@ -196,16 +184,6 @@ public final class GridTestUtils {
// No-op.
}
- /**
- * Handles discovery message after {@link
DiscoverySpiListener#onDiscovery} completion.
- *
- * @param msg Intercepted discovery message.
- */
- public void afterDiscovery(DiscoverySpiCustomMessage msg) {
- if (msg instanceof CustomMessageWrapper)
- afterDiscovery(unwrap((CustomMessageWrapper)msg));
- }
-
/**
* Handles {@link DiscoveryCustomMessage} after {@link
DiscoverySpiListener#onDiscovery} completion.
*
@@ -221,16 +199,6 @@ public final class GridTestUtils {
public void ignite(IgniteEx ignite) {
// No-op.
}
-
- /**
- * Obtains {@link DiscoveryCustomMessage} from {@link
CustomMessageWrapper}.
- *
- * @param wrapper Wrapper of {@link DiscoveryCustomMessage}.
- * @return Unwrapped {@link DiscoveryCustomMessage}.
- */
- private DiscoveryCustomMessage unwrap(CustomMessageWrapper wrapper) {
- return U.field(wrapper, "delegate");
- }
}
/**
@@ -254,11 +222,11 @@ public final class GridTestUtils {
/** {@inheritDoc} */
@Override public IgniteFuture<?> onDiscovery(DiscoveryNotification
notification) {
- hook.beforeDiscovery(notification.getCustomMsgData());
+
hook.beforeDiscovery(U.unwrapCustomMessage(notification.customMessage()));
IgniteFuture<?> fut = delegate.onDiscovery(notification);
- fut.listen(f ->
hook.afterDiscovery(notification.getCustomMsgData()));
+ fut.listen(f ->
hook.afterDiscovery(U.unwrapCustomMessage(notification.customMessage())));
return fut;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 039309f8663..34fed2e5537 100755
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -84,6 +84,7 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.systemview.JmxSystemViewExporterSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -113,7 +114,6 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -647,7 +647,7 @@ public abstract class GridAbstractTest extends
JUnitAssertAware {
cfg.setClientMode(false);
cfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
// No-op.
}
});
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java
index ff9af51975f..524c2d6f257 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java
@@ -26,10 +26,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SharedFileTree;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
/** */
@@ -68,7 +68,7 @@ public class GridTestBinaryMarshaller {
)
.setClientMode(false)
.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void
sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage
msg) throws IgniteException {
//No-op.
}
});
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
index 92b47624466..b8ff43e48a9 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
@@ -27,9 +27,9 @@ import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -48,10 +48,9 @@ public class BinaryTypeRegistrationTest extends
GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
- if (msg instanceof CustomMessageWrapper
- && ((CustomMessageWrapper)msg).delegate() instanceof
MetadataUpdateProposedMessage)
-
metadataUpdateProposedMessages.add(((CustomMessageWrapper)msg).delegate());
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
+ if (U.unwrapCustomMessage(msg) instanceof
MetadataUpdateProposedMessage)
+
metadataUpdateProposedMessages.add(U.unwrapCustomMessage(msg));
super.sendCustomEvent(msg);
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java
index f7c079b48a5..f63d6b55513 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java
@@ -29,17 +29,16 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -70,13 +69,11 @@ public class CacheRegisterMetadataLocallyTest extends
GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
- if (msg instanceof CustomMessageWrapper) {
- DiscoveryCustomMessage realMsg =
((CustomMessageWrapper)msg).delegate();
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
+ DiscoveryCustomMessage realMsg = U.unwrapCustomMessage(msg);
- if (realMsg instanceof MetadataUpdateProposedMessage ||
realMsg instanceof MetadataUpdateAcceptedMessage)
- customMessages.add(realMsg);
- }
+ if (realMsg instanceof MetadataUpdateProposedMessage ||
realMsg instanceof MetadataUpdateAcceptedMessage)
+ customMessages.add(realMsg);
super.sendCustomEvent(msg);
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index 70847fabff3..997692f77bf 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -65,7 +65,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -81,7 +80,7 @@ import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -195,26 +194,24 @@ public class KillQueryTest extends GridCommonAbstractTest
{
clientBlocker = commSpi;
cfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
- if (msg instanceof CustomMessageWrapper) {
- DiscoveryCustomMessage delegate =
((CustomMessageWrapper)msg).delegate();
-
- if (delegate instanceof DynamicCacheChangeBatch) {
- try {
- awaitTimeout();
- }
- catch (Exception e) {
- log.error(e.getMessage(), e);
- }
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
+ DiscoveryCustomMessage delegate = U.unwrapCustomMessage(msg);
+ if (delegate instanceof DynamicCacheChangeBatch) {
+ try {
+ awaitTimeout();
}
- else if (delegate instanceof
SchemaProposeDiscoveryMessage) {
- try {
- awaitTimeout();
- }
- catch (Exception e) {
- log.error(e.getMessage(), e);
- }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+
+ }
+ else if (delegate instanceof SchemaProposeDiscoveryMessage) {
+ try {
+ awaitTimeout();
+ }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
}
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
index 5bc65cbebc9..cd32445ca76 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
@@ -57,10 +56,10 @@ import
org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
@@ -168,30 +167,28 @@ public class RunningQueriesTest extends
AbstractIndexingCommonTest {
cfg.setDiscoverySpi(new TcpDiscoverySpi() {
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage
msg) throws IgniteException {
- if
(CustomMessageWrapper.class.isAssignableFrom(msg.getClass())) {
- DiscoveryCustomMessage delegate =
((CustomMessageWrapper)msg).delegate();
-
- if
(DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) {
- ((DynamicCacheChangeBatch)delegate).requests().stream()
- .filter((c) ->
!c.cacheName().equalsIgnoreCase("default"))
- .findAny()
- .ifPresent((c) -> {
- try {
- awaitTimeout();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- });
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg)
throws IgniteException {
+ DiscoveryCustomMessage delegate = U.unwrapCustomMessage(msg);
+
+ if
(DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) {
+ ((DynamicCacheChangeBatch)delegate).requests().stream()
+ .filter((c) ->
!c.cacheName().equalsIgnoreCase("default"))
+ .findAny()
+ .ifPresent((c) -> {
+ try {
+ awaitTimeout();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ else if
(SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) {
+ try {
+ awaitTimeout();
}
- else if
(SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) {
- try {
- awaitTimeout();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
+ catch (Exception e) {
+ e.printStackTrace();
}
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 54347b18970..e5105615e72 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -49,7 +50,6 @@ import
org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -404,7 +404,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter
implements IgniteDis
}
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg) {
impl.sendCustomMessage(msg);
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 9b7476c5355..29459ff7591 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
@@ -25,7 +26,7 @@ import org.jetbrains.annotations.Nullable;
/**
*
*/
-class ZkCommunicationErrorResolveFinishMessage implements
DiscoverySpiCustomMessage, ZkInternalMessage {
+class ZkCommunicationErrorResolveFinishMessage extends
DiscoverySpiCustomMessage implements ZkInternalMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -48,7 +49,7 @@ class ZkCommunicationErrorResolveFinishMessage implements
DiscoverySpiCustomMess
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
index 0c79c36aee0..323e41bcf9b 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
@@ -25,7 +26,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Zk Communication Error Resolve Start Message.
*/
-public class ZkCommunicationErrorResolveStartMessage implements
DiscoverySpiCustomMessage, ZkInternalMessage {
+public class ZkCommunicationErrorResolveStartMessage extends
DiscoverySpiCustomMessage implements ZkInternalMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -40,7 +41,7 @@ public class ZkCommunicationErrorResolveStartMessage
implements DiscoverySpiCust
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index 60a36c5adb9..a4db36079e8 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
@@ -41,7 +42,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData
{
DiscoverySpiCustomMessage msg;
/** Unmarshalled message. */
- transient DiscoverySpiCustomMessage resolvedMsg;
+ transient DiscoveryCustomMessage resolvedMsg;
/**
* @param evtId Event ID.
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
index de7291c0d45..025d7afa46f 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
@@ -24,7 +25,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Zk Force Node Fail Message.
*/
-public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage,
ZkInternalMessage {
+public class ZkForceNodeFailMessage extends DiscoverySpiCustomMessage
implements ZkInternalMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -44,7 +45,7 @@ public class ZkForceNodeFailMessage implements
DiscoverySpiCustomMessage, ZkInte
}
/** {@inheritDoc} */
- @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
index 626fe742d1c..0079ab11b71 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
@@ -24,12 +25,12 @@ import org.jetbrains.annotations.Nullable;
/**
*
*/
-class ZkNoServersMessage implements DiscoverySpiCustomMessage,
ZkInternalMessage {
+class ZkNoServersMessage extends DiscoverySpiCustomMessage implements
ZkInternalMessage {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 37039dd028b..a6c9a952c90 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -82,7 +83,6 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
@@ -649,7 +649,7 @@ public class ZookeeperDiscoveryImpl {
/**
* @param msg Message.
*/
- public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
+ public void sendCustomMessage(DiscoveryCustomMessage msg) {
assert msg != null;
List<ClusterNode> nodes = rtState.top.topologySnapshot();
@@ -2469,7 +2469,7 @@ public class ZookeeperDiscoveryImpl {
if (sndNode != null) {
byte[] evtBytes = readCustomEventData(zkClient, evtPath,
sndNodeId);
- DiscoverySpiCustomMessage msg;
+ DiscoveryCustomMessage msg;
try {
msg = unmarshalZip(evtBytes);
@@ -2500,7 +2500,7 @@ public class ZookeeperDiscoveryImpl {
*/
private void generateAndProcessCustomEventOnCoordinator(String evtPath,
ZookeeperClusterNode sndNode,
- DiscoverySpiCustomMessage msg
+ DiscoveryCustomMessage msg
) throws Exception {
ZookeeperClient zkClient = rtState.zkClient;
ZkDiscoveryEventsData evtsData = rtState.evtsData;
@@ -2585,7 +2585,7 @@ public class ZookeeperDiscoveryImpl {
evtsData.evtIdGen--;
- DiscoverySpiCustomMessage ack = msg.ackMessage();
+ DiscoveryCustomMessage ack = msg.ackMessage();
if (ack != null) {
evtData = createAckEvent(ack, evtData);
@@ -2746,7 +2746,7 @@ public class ZookeeperDiscoveryImpl {
if (evtData0.ackEvent() && evtData0.topologyVersion()
< locNode.order())
break;
- DiscoverySpiCustomMessage msg;
+ DiscoveryCustomMessage msg;
if (rtState.crd) {
assert evtData0.resolvedMsg != null : evtData0;
@@ -3508,7 +3508,7 @@ public class ZookeeperDiscoveryImpl {
* @param evtData Event data.
* @param msg Custom message.
*/
- private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData,
final DiscoverySpiCustomMessage msg) {
+ private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData,
final DiscoveryCustomMessage msg) {
assert !(msg instanceof ZkInternalMessage) : msg;
if (log.isDebugEnabled())
@@ -3714,7 +3714,7 @@ public class ZookeeperDiscoveryImpl {
}
case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
- DiscoverySpiCustomMessage ack =
handleProcessedCustomEvent(ctx,
+ DiscoveryCustomMessage ack =
handleProcessedCustomEvent(ctx,
(ZkDiscoveryCustomEventData)evtData);
if (ack != null) {
@@ -3762,7 +3762,7 @@ public class ZookeeperDiscoveryImpl {
* @throws Exception If failed.
*/
private ZkDiscoveryCustomEventData createAckEvent(
- DiscoverySpiCustomMessage ack,
+ DiscoveryCustomMessage ack,
ZkDiscoveryCustomEventData origEvt) throws Exception {
assert ack != null;
@@ -3885,7 +3885,7 @@ public class ZookeeperDiscoveryImpl {
* @return Ack message.
* @throws Exception If failed.
*/
- @Nullable private DiscoverySpiCustomMessage
handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
+ @Nullable private DiscoveryCustomMessage handleProcessedCustomEvent(String
ctx, ZkDiscoveryCustomEventData evtData)
throws Exception {
if (log.isDebugEnabled())
log.debug("All nodes processed custom event [ctx=" + ctx + ",
evtData=" + evtData + ']');
diff --git
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/TestZookeeperDiscoverySpi.java
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/TestZookeeperDiscoverySpi.java
index fd9e4ba4239..23352efc152 100644
---
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/TestZookeeperDiscoverySpi.java
+++
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/TestZookeeperDiscoverySpi.java
@@ -18,8 +18,8 @@
package org.apache.ignite.spi.discovery.zk;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.apache.ignite.testframework.junits.GridAbstractTest;
@@ -32,7 +32,7 @@ public class TestZookeeperDiscoverySpi extends
ZookeeperDiscoverySpi implements
private volatile IgniteDiscoverySpiInternalListener internalLsnr;
/** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+ @Override public void sendCustomEvent(DiscoveryCustomMessage msg) {
IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
if (internalLsnr != null && !internalLsnr.beforeSendCustomEvent(this,
log, msg))