This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9859f645865 IGNITE-28050 Use MessageSerializer for continuous routine
start messages (#12952)
9859f645865 is described below
commit 9859f6458655d98c7d541d424a66ca5c22845784
Author: Ilya Shishkov <[email protected]>
AuthorDate: Tue Apr 7 10:23:25 2026 +0300
IGNITE-28050 Use MessageSerializer for continuous routine start messages
(#12952)
---
.../ignite/internal/GridEventConsumeHandler.java | 7 +-
.../ignite/internal/GridMessageListenHandler.java | 7 +-
.../discovery/DiscoveryMessageFactory.java | 12 ++
.../IncompleteDeserializationException.java | 53 -----
.../CachePartitionPartialCountersMap.java | 8 +-
.../continuous/CacheContinuousQueryHandler.java | 34 ++-
.../continuous/AbstractContinuousMessage.java | 3 +-
.../continuous/GridContinuousHandler.java | 7 +-
.../continuous/GridContinuousProcessor.java | 197 ++++++-----------
.../processors/continuous/StartRequestData.java | 239 +++++++++------------
.../processors/continuous/StartRequestDataV2.java | 164 --------------
.../StartRoutineAckDiscoveryMessage.java | 35 +--
.../continuous/StartRoutineDiscoveryMessage.java | 86 ++------
.../continuous/StartRoutineDiscoveryMessageV2.java | 23 +-
.../continuous/StopRoutineAckDiscoveryMessage.java | 4 +-
.../continuous/StopRoutineDiscoveryMessage.java | 4 +-
.../messages/TcpDiscoveryCustomEventMessage.java | 17 +-
.../main/resources/META-INF/classnames.properties | 2 -
.../IncompleteDeserializationExceptionTest.java | 115 ----------
.../ignite/testsuites/IgniteBasicTestSuite2.java | 3 -
20 files changed, 257 insertions(+), 763 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 4b1ad612add..1ac2ce2bc92 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -49,7 +49,6 @@ import
org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -142,13 +141,13 @@ class GridEventConsumeHandler implements
GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void updateCounters(AffinityTopologyVersion topVer,
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
- Map<Integer, T2<Long, Long>> cntrs) {
+ @Override public void updateCounters(AffinityTopologyVersion topVer,
Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs) {
// No-op.
}
/** {@inheritDoc} */
- @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters() {
return Collections.emptyMap();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index bcd35ed249e..e59873f4b44 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -36,7 +36,6 @@ import
org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -117,13 +116,13 @@ public class GridMessageListenHandler implements
GridContinuousHandler {
}
/** {@inheritDoc} */
- @Override public void updateCounters(AffinityTopologyVersion topVer,
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
- Map<Integer, T2<Long, Long>> cntrs) {
+ @Override public void updateCounters(AffinityTopologyVersion topVer,
Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs) {
// No-op.
}
/** {@inheritDoc} */
- @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters() {
return Collections.emptyMap();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 4960fb374c8..7c2a991f18e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -95,6 +95,14 @@ import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMess
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageMarshallableSerializer;
+import org.apache.ignite.internal.processors.continuous.StartRequestData;
+import
org.apache.ignite.internal.processors.continuous.StartRequestDataSerializer;
+import
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
+import
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessageSerializer;
+import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
+import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageSerializer;
+import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
+import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2Serializer;
import
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
import
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessageSerializer;
import
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
@@ -245,6 +253,7 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register(-200, TcpDiscoveryCollectionMessage::new,
new TcpDiscoveryCollectionMessageMarshallableSerializer(marsh,
clsLdr));
+ factory.register(-118, StartRequestData::new, new
StartRequestDataSerializer());
factory.register(-117, TcpDiscoveryNode::new, new
TcpDiscoveryNodeMarshallableSerializer(marsh, clsLdr));
factory.register(-116, IgniteProductVersion::new, new
IgniteProductVersionSerializer());
factory.register(-115, SchemaAlterTableAddColumnOperation::new, new
SchemaAlterTableAddColumnOperationSerializer());
@@ -367,5 +376,8 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr));
factory.register(538, ServiceUndeploymentRequest::new, new
ServiceUndeploymentRequestSerializer());
factory.register(539, ExchangeFailureMessage::new, new
ExchangeFailureMessageSerializer());
+ factory.register(540, StartRoutineDiscoveryMessage::new, new
StartRoutineDiscoveryMessageSerializer());
+ factory.register(541, StartRoutineAckDiscoveryMessage::new, new
StartRoutineAckDiscoveryMessageSerializer());
+ factory.register(542, StartRoutineDiscoveryMessageV2::new, new
StartRoutineDiscoveryMessageV2Serializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationException.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationException.java
deleted file mode 100644
index 5a440cefa1c..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.discovery;
-
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Exception which can be used to access a message which failed to be
deserialized completely using Java serialization.
- * Throwed from deserialization methods it can be caught by a caller.
- * <p>
- * Should be {@link RuntimeException} because of limitations of Java
serialization mechanisms.
- * <p>
- * Catching {@link ClassNotFoundException} inside deserialization methods
cannot do the same trick because
- * Java deserialization remembers such exception internally and will rethrow
it anyway upon returing to a user.
- */
-public class IncompleteDeserializationException extends RuntimeException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final DiscoveryCustomMessage m;
-
- /**
- * @param m Message.
- */
- public IncompleteDeserializationException(@NotNull DiscoveryCustomMessage
m) {
- super(null, null, false, false);
-
- this.m = m;
- }
-
- /**
- * @return Message.
- */
- @NotNull public DiscoveryCustomMessage message() {
- return m;
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
index de0f3ac0d85..7a723fbb780 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -203,15 +202,14 @@ public class CachePartitionPartialCountersMap implements
Serializable, Message {
* @param cntrsMap Partial local counters map.
* @return Partition ID to partition counters map.
*/
- public static Map<Integer, T2<Long, Long>>
toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
+ public static Map<Integer, Long>
toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
if (cntrsMap.size() == 0)
return Collections.emptyMap();
- Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrsMap.size());
+ Map<Integer, Long> res = U.newHashMap(cntrsMap.size());
for (int idx = 0; idx < cntrsMap.size(); idx++)
- res.put(cntrsMap.partitionAt(idx),
- new T2<>(cntrsMap.initialUpdateCounterAt(idx),
cntrsMap.updateCounterAt(idx)));
+ res.put(cntrsMap.partitionAt(idx), cntrsMap.updateCounterAt(idx));
return res;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ee131f4db3e..2c5ce3ba634 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -75,7 +75,6 @@ import
org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -200,10 +199,10 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
private transient int cacheId;
/** */
- private transient volatile Map<Integer, T2<Long, Long>> initUpdCntrs;
+ private transient volatile Map<Integer, Long> initUpdCntrs;
/** */
- private transient volatile Map<UUID, Map<Integer, T2<Long, Long>>>
initUpdCntrsPerNode;
+ private transient volatile Map<UUID, Map<Integer, Long>>
initUpdCntrsPerNode;
/** */
private transient volatile AffinityTopologyVersion initTopVer;
@@ -224,7 +223,7 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
private transient UUID routineId;
/** Local update counters values on listener start. Used for skipping
events fired before the listener start. */
- private transient volatile Map<Integer, T2<Long, Long>> locInitUpdCntrs;
+ private transient volatile Map<Integer, Long> locInitUpdCntrs;
/** */
private transient GridKernalContext ctx;
@@ -361,15 +360,15 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
}
/** {@inheritDoc} */
- @Override public void updateCounters(AffinityTopologyVersion topVer,
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
- Map<Integer, T2<Long, Long>> cntrs) {
- this.initUpdCntrsPerNode = cntrsPerNode;
- this.initUpdCntrs = cntrs;
- this.initTopVer = topVer;
+ @Override public void updateCounters(AffinityTopologyVersion topVer,
Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs) {
+ initUpdCntrsPerNode = cntrsPerNode;
+ initUpdCntrs = cntrs;
+ initTopVer = topVer;
}
/** {@inheritDoc} */
- @Override public Map<Integer, T2<Long, Long>> updateCounters() {
+ @Override public Map<Integer, Long> updateCounters() {
return locInitUpdCntrs;
}
@@ -1163,9 +1162,9 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
CacheContinuousQueryPartitionRecovery rec = rcvs.get(partId);
if (rec == null) {
- T2<Long, Long> partCntrs = null;
+ Long partCntr = null;
- Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode =
this.initUpdCntrsPerNode;
+ Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode =
this.initUpdCntrsPerNode;
if (initUpdCntrsPerNode != null) {
GridCacheContext<K, V> cctx = cacheContext(ctx);
@@ -1173,22 +1172,21 @@ public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler
GridCacheAffinityManager aff = cctx.affinity();
for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
- Map<Integer, T2<Long, Long>> map =
initUpdCntrsPerNode.get(node.id());
+ Map<Integer, Long> map =
initUpdCntrsPerNode.get(node.id());
if (map != null) {
- partCntrs = map.get(partId);
+ partCntr = map.get(partId);
break;
}
}
}
else if (initUpdCntrs != null)
- partCntrs = initUpdCntrs.get(partId);
+ partCntr = initUpdCntrs.get(partId);
- T2<Long, Long> partCntrs0 = partCntrs;
+ Long partCntr0 = partCntr;
CacheContinuousQueryPartitionRecovery oldRec =
rcvs.computeIfAbsent(partId, k ->
- new
CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
topVer,
- partCntrs0 != null ? partCntrs0.get2() : null));
+ new
CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY),
topVer, partCntr0));
if (oldRec != null)
rec = oldRec;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index 64d3f98fb1c..f3e4dc5228d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -21,11 +21,12 @@ import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
*/
-public abstract class AbstractContinuousMessage implements
DiscoveryCustomMessage {
+public abstract class AbstractContinuousMessage implements Message,
DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 2781778657738703012L;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index f7482c78ebf..b1a3812f612 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -24,7 +24,6 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;
/**
@@ -163,11 +162,11 @@ public interface GridContinuousHandler extends
Externalizable, Cloneable {
* @param cntrs Init state for partition counters.
* @param topVer Topology version.
*/
- public void updateCounters(AffinityTopologyVersion topVer, Map<UUID,
Map<Integer, T2<Long, Long>>> cntrsPerNode,
- Map<Integer, T2<Long, Long>> cntrs);
+ public void updateCounters(AffinityTopologyVersion topVer, Map<UUID,
Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs);
/**
* @return Init state for partition counters.
*/
- public Map<Integer, T2<Long, Long>> updateCounters();
+ public Map<Integer, Long> updateCounters();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 521b881c827..1786bdf4003 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -59,7 +59,6 @@ import
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -83,7 +82,7 @@ import
org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -976,30 +975,26 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
dep = new GridDeploymentInfoBean(dep0);
}
-
- // Handle peer deployment for other handler-specific objects.
- hnd.p2pMarshal(ctx);
}
- if (discoProtoVer == 1) {
- StartRequestData reqData = new StartRequestData(
- nodeFilter,
- hnd,
- bufSize,
- interval,
- autoUnsubscribe);
+ StartRequestData reqData = new StartRequestData(nodeFilter,
+ hnd,
+ bufSize,
+ interval,
+ autoUnsubscribe,
+ hnd.keepBinary());
- if (clsName != null) {
- reqData.className(clsName);
- reqData.deploymentInfo(dep);
+ if (clsName != null) {
+ reqData.className(clsName);
+ reqData.deploymentInfo(dep);
+ }
- reqData.p2pMarshal(marsh);
- }
+ reqData.prepareMarshal(ctx);
+ if (discoProtoVer == 1) {
StartRoutineDiscoveryMessage msg = new
StartRoutineDiscoveryMessage(
routineId,
- reqData,
- reqData.handler().keepBinary());
+ reqData);
if (hnd.updateCounters() != null)
msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
@@ -1009,24 +1004,9 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
else {
assert discoProtoVer == 2 : discoProtoVer;
- byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh,
nodeFilter) : null;
- byte[] hndBytes = U.marshal(marsh, hnd);
-
- StartRequestDataV2 reqData = new
StartRequestDataV2(nodeFilterBytes,
- hndBytes,
- bufSize,
- interval,
- autoUnsubscribe);
-
- if (clsName != null) {
- reqData.className(clsName);
- reqData.deploymentInfo(dep);
- }
-
return new StartRoutineDiscoveryMessageV2(
routineId,
- reqData,
- hnd.keepBinary());
+ reqData);
}
}
@@ -1362,7 +1342,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
if (fut != null) {
fut.onAllRemoteRegistered(
topVer,
- msg.errs(),
+ msg.errors(),
msg.updateCountersPerNode(),
msg.updateCounters());
}
@@ -1378,51 +1358,31 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
UUID routineId = req.routineId();
- if (req.deserializationException() != null && checkNodeFilter(req)) {
- IgniteCheckedException err = new
IgniteCheckedException(req.deserializationException());
-
- req.addError(node.id(), err);
-
- U.error(log, "Failed to register handler [nodeId=" + node.id() +
", routineId=" + routineId + ']', err);
-
- return;
- }
-
StartRequestData data = req.startRequestData();
- GridContinuousHandler hnd = data.handler();
-
- if (req.keepBinary()) {
- assert hnd instanceof CacheContinuousQueryHandler;
-
- ((CacheContinuousQueryHandler)hnd).keepBinary(true);
- }
-
IgniteCheckedException err = null;
try {
- if (ctx.config().isPeerClassLoadingEnabled()) {
- String clsName = data.className();
-
- if (clsName != null) {
- GridDeploymentInfo depInfo = data.deploymentInfo();
-
- GridDeployment dep =
ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
- depInfo.userVersion(), node.id(),
depInfo.classLoaderId(), depInfo.participants(), null);
+ data.finishUnmarshal(ctx, node.id());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal start request data [nodeId=" +
node.id() +
+ ", routineId=" + routineId + ']', e);
- if (dep == null)
- throw new IgniteDeploymentCheckedException("Failed to
obtain deployment for class: " + clsName);
+ // Tolerate missing classes exceptions (e.g. remote filter class).
+ // We need this means because CQ registration process assumes that
an "ack message" will be sent.
+ if (X.hasCause(e, ClassNotFoundException.class)) {
+ if (checkNodeFilter(req))
+ req.addError(node.id(), e);
- data.p2pUnmarshal(marsh,
U.resolveClassLoader(dep.classLoader(), ctx.config()));
- }
+ return;
}
- }
- catch (IgniteCheckedException e) {
- err = e;
- U.error(log, "Failed to register handler [nodeId=" + node.id() +
", routineId=" + routineId + ']', e);
+ err = e;
}
+ GridContinuousHandler hnd = data.handler();
+
if (node.isClient()) {
Map<UUID, LocalRoutineInfo> clientRoutineMap =
clientInfos.get(node.id());
@@ -1435,7 +1395,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
}
clientRoutineMap.put(routineId, new LocalRoutineInfo(node.id(),
- data.projectionPredicate(),
+ data.nodeFilter(),
hnd,
data.bufferSize(),
data.interval(),
@@ -1444,7 +1404,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
if (err == null) {
try {
- IgnitePredicate<ClusterNode> prjPred =
data.projectionPredicate();
+ IgnitePredicate<ClusterNode> prjPred = data.nodeFilter();
if (prjPred != null)
ctx.resource().injectGeneric(prjPred);
@@ -1489,10 +1449,10 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
/** */
private boolean checkNodeFilter(StartRoutineDiscoveryMessage req) {
StartRequestData reqData = req.startRequestData();
- IgnitePredicate<ClusterNode> prjPred;
+ IgnitePredicate<ClusterNode> nodeFilter;
- return reqData == null || (prjPred = reqData.projectionPredicate()) ==
null
- || prjPred.apply(ctx.discovery().localNode());
+ return reqData == null || (nodeFilter = reqData.nodeFilter()) == null
+ || nodeFilter.apply(ctx.discovery().localNode());
}
/**
@@ -1514,12 +1474,12 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
private void processStartRequestV2(final AffinityTopologyVersion topVer,
final ClusterNode snd,
final StartRoutineDiscoveryMessageV2 msg) {
- StartRequestDataV2 reqData = msg.startRequestData();
+ StartRequestData reqData = msg.startRequestData();
ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
msg.routineId(),
- reqData.handlerBytes(),
- reqData.nodeFilterBytes(),
+ reqData.hndBytes,
+ reqData.nodeFilterBytes,
reqData.bufferSize(),
reqData.interval(),
reqData.autoUnsubscribe());
@@ -1540,73 +1500,42 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
return;
}
- StartRequestDataV2 reqData = msg.startRequestData();
+ StartRequestData reqData = msg.startRequestData();
Exception err = null;
- IgnitePredicate<ClusterNode> nodeFilter = null;
-
- CachePartitionPartialCountersMap cntrsMap = null;
+ try {
+ reqData.finishUnmarshal(ctx, snd.id());
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
- if (reqData.nodeFilterBytes() != null) {
- try {
- if (ctx.config().isPeerClassLoadingEnabled() &&
reqData.className() != null) {
- String clsName = reqData.className();
- GridDeploymentInfo depInfo =
reqData.deploymentInfo();
-
- GridDeployment dep =
ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
- clsName,
- clsName,
- depInfo.userVersion(),
- snd.id(),
- depInfo.classLoaderId(),
- depInfo.participants(),
- null);
-
- if (dep == null) {
- throw new
IgniteDeploymentCheckedException("Failed to obtain deployment " +
- "for class: " + clsName);
- }
+ U.error(log, "Failed to unmarshal continuous request data
[" +
+ "routineId=" + msg.routineId +
+ ", srcNodeId=" + snd.id() + ']', e);
+ }
- nodeFilter = U.unmarshal(marsh,
- reqData.nodeFilterBytes(),
- U.resolveClassLoader(dep.classLoader(),
ctx.config()));
- }
- else {
- nodeFilter = U.unmarshal(marsh,
- reqData.nodeFilterBytes(),
- U.resolveClassLoader(ctx.config()));
- }
+ IgnitePredicate<ClusterNode> nodeFilter = reqData.nodeFilter();
- if (nodeFilter != null)
- ctx.resource().injectGeneric(nodeFilter);
+ if (nodeFilter != null) {
+ try {
+ ctx.resource().injectGeneric(nodeFilter);
}
- catch (Exception e) {
- err = e;
-
- U.error(log, "Failed to unmarshal continuous routine
filter [" +
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to inject generic into continuous
routine filter [" +
"routineId=" + msg.routineId +
", srcNodeId=" + snd.id() + ']', e);
}
}
+ CachePartitionPartialCountersMap cntrsMap = null;
+
boolean register = err == null &&
(nodeFilter == null ||
nodeFilter.apply(ctx.discovery().localNode()));
if (register) {
try {
- GridContinuousHandler hnd = U.unmarshal(marsh,
- reqData.handlerBytes(),
- U.resolveClassLoader(ctx.config()));
-
- if (ctx.config().isPeerClassLoadingEnabled())
- hnd.p2pUnmarshal(snd.id(), ctx);
-
- if (msg.keepBinary()) {
- assert hnd instanceof CacheContinuousQueryHandler
: hnd;
-
-
((CacheContinuousQueryHandler)hnd).keepBinary(true);
- }
+ GridContinuousHandler hnd = reqData.handler();
registerHandler(snd.id(),
msg.routineId,
@@ -2544,7 +2473,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
resCollect = new
DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage,
RoutineRegisterResults>(ctx) {
@Override protected RoutineRegisterResults
createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) {
Map<UUID, Throwable> errs = null;
- Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode =
null;
+ Map<UUID, Map<Integer, Long>> cntrsPerNode = null;
for (Map.Entry<UUID,
NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) {
ContinuousRoutineStartResultMessage msg =
entry.getValue().message();
@@ -2571,7 +2500,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
if (cntrsPerNode == null)
cntrsPerNode = new HashMap<>();
- cntrsPerNode.put(entry.getKey(),
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+ cntrsPerNode.put(entry.getKey(),
toCountersMap(cntrsMap));
}
}
}
@@ -2598,8 +2527,8 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
private void onAllRemoteRegistered(
AffinityTopologyVersion topVer,
@Nullable Map<UUID, ? extends Throwable> errs,
- Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
- Map<Integer, T2<Long, Long>> cntrs) {
+ Map<UUID, Map<Integer, Long>> cntrsPerNode,
+ Map<Integer, Long> cntrs) {
try {
if (errs == null || errs.isEmpty()) {
LocalRoutineInfo routine = locInfos.get(routineId);
@@ -2692,7 +2621,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
private final Map<UUID, ? extends Throwable> errs;
/** */
- private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode;
+ private final Map<UUID, Map<Integer, Long>> cntrsPerNode;
/**
* @param topVer Topology version.
@@ -2701,7 +2630,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
*/
RoutineRegisterResults(AffinityTopologyVersion topVer,
Map<UUID, ? extends Throwable> errs,
- Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) {
+ Map<UUID, Map<Integer, Long>> cntrsPerNode) {
this.topVer = topVer;
this.errs = errs;
this.cntrsPerNode = cntrsPerNode;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
index a3049366dfc..99c39201500 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,134 +17,96 @@
package org.apache.ignite.internal.processors.continuous;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.marshaller.Marshaller;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Start request data.
*/
-class StartRequestData implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Projection predicate. */
- private IgnitePredicate<ClusterNode> prjPred;
+public class StartRequestData implements Message {
+ /** Node filter. */
+ private IgnitePredicate<ClusterNode> nodeFilter;
- /** Serialized projection predicate. */
- private byte[] prjPredBytes;
+ /** Serialized node filter. */
+ @Order(0)
+ byte[] nodeFilterBytes;
/** Deployment class name. */
- private String clsName;
+ @Order(1)
+ String clsName;
/** Deployment info. */
- private GridDeploymentInfo depInfo;
+ @Order(2)
+ GridDeploymentInfoBean depInfo;
/** Handler. */
private GridContinuousHandler hnd;
+ /** Serialized handler. */
+ @Order(3)
+ byte[] hndBytes;
+
/** Buffer size. */
- private int bufSize;
+ @Order(4)
+ int bufSize;
/** Time interval. */
- private long interval;
+ @Order(5)
+ long interval;
/** Automatic unsubscribe flag. */
- private boolean autoUnsubscribe;
+ @Order(6)
+ boolean autoUnsubscribe;
- /**
- * Required by {@link java.io.Externalizable}.
- */
- public StartRequestData() {
- // No-op.
- }
+ /** Keep binary flag. */
+ @Order(7)
+ boolean keepBinary;
+
+ /** */
+ public StartRequestData() {}
/**
- * @param prjPred Serialized projection predicate.
+ * @param nodeFilter Node filter.
* @param hnd Handler.
* @param bufSize Buffer size.
* @param interval Time interval.
* @param autoUnsubscribe Automatic unsubscribe flag.
*/
- StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred,
GridContinuousHandler hnd,
- int bufSize, long interval, boolean autoUnsubscribe) {
+ public StartRequestData(
+ IgnitePredicate<ClusterNode> nodeFilter,
+ GridContinuousHandler hnd,
+ int bufSize,
+ long interval,
+ boolean autoUnsubscribe,
+ boolean keepBinary) {
assert hnd != null;
assert bufSize > 0;
assert interval >= 0;
- this.prjPred = prjPred;
+ this.nodeFilter = nodeFilter;
this.hnd = hnd;
this.bufSize = bufSize;
this.interval = interval;
this.autoUnsubscribe = autoUnsubscribe;
+ this.keepBinary = keepBinary;
}
/**
- * @param marsh Marshaller.
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ * @return Node filter.
*/
- void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
- assert marsh != null;
-
- prjPredBytes = U.marshal(marsh, prjPred);
- }
-
- /**
- * @param marsh Marshaller.
- * @param ldr Class loader.
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
- */
- void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws
IgniteCheckedException {
- assert marsh != null;
-
- assert prjPred == null;
- assert prjPredBytes != null;
-
- prjPred = U.unmarshal(marsh, prjPredBytes, ldr);
- }
-
- /**
- * @return Projection predicate.
- */
- public IgnitePredicate<ClusterNode> projectionPredicate() {
- return prjPred;
- }
-
- /**
- * @param prjPred New projection predicate.
- */
- public void projectionPredicate(IgnitePredicate<ClusterNode> prjPred) {
- this.prjPred = prjPred;
- }
-
- /**
- * @return Serialized projection predicate.
- */
- public byte[] projectionPredicateBytes() {
- return prjPredBytes;
- }
-
- /**
- * @param prjPredBytes New serialized projection predicate.
- */
- public void projectionPredicateBytes(byte[] prjPredBytes) {
- this.prjPredBytes = prjPredBytes;
- }
-
- /**
- * @return Deployment class name.
- */
- public String className() {
- return clsName;
+ public IgnitePredicate<ClusterNode> nodeFilter() {
+ return nodeFilter;
}
/**
@@ -154,17 +116,10 @@ class StartRequestData implements Externalizable {
this.clsName = clsName;
}
- /**
- * @return Deployment info.
- */
- public GridDeploymentInfo deploymentInfo() {
- return depInfo;
- }
-
/**
* @param depInfo New deployment info.
*/
- public void deploymentInfo(GridDeploymentInfo depInfo) {
+ public void deploymentInfo(GridDeploymentInfoBean depInfo) {
this.depInfo = depInfo;
}
@@ -175,13 +130,6 @@ class StartRequestData implements Externalizable {
return hnd;
}
- /**
- * @param hnd New handler.
- */
- public void handler(GridContinuousHandler hnd) {
- this.hnd = hnd;
- }
-
/**
* @return Buffer size.
*/
@@ -189,13 +137,6 @@ class StartRequestData implements Externalizable {
return bufSize;
}
- /**
- * @param bufSize New buffer size.
- */
- public void bufferSize(int bufSize) {
- this.bufSize = bufSize;
- }
-
/**
* @return Time interval.
*/
@@ -225,45 +166,61 @@ class StartRequestData implements Externalizable {
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- boolean b = prjPredBytes != null;
-
- out.writeBoolean(b);
+ @Override public String toString() {
+ return S.toString(StartRequestData.class, this);
+ }
- if (b) {
- U.writeByteArray(out, prjPredBytes);
- U.writeString(out, clsName);
- out.writeObject(depInfo);
+ /** */
+ public void prepareMarshal(GridKernalContext ctx) throws
IgniteCheckedException {
+ if (hnd != null) {
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ // Handle peer deployment for other handler-specific objects.
+ hnd.p2pMarshal(ctx);
+ }
+
+ hndBytes = U.marshal(ctx.marshaller(), hnd);
}
- else
- out.writeObject(prjPred);
- out.writeObject(hnd);
- out.writeInt(bufSize);
- out.writeLong(interval);
- out.writeBoolean(autoUnsubscribe);
+ if (nodeFilter != null)
+ nodeFilterBytes = U.marshal(ctx.marshaller(), nodeFilter);
}
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- boolean b = in.readBoolean();
-
- if (b) {
- prjPredBytes = U.readByteArray(in);
- clsName = U.readString(in);
- depInfo = (GridDeploymentInfo)in.readObject();
+ /** */
+ public void finishUnmarshal(GridKernalContext ctx, UUID sndId) throws
IgniteCheckedException {
+ if (ctx.config().isPeerClassLoadingEnabled() && clsName != null) {
+ GridDeployment dep =
ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
+ clsName,
+ clsName,
+ depInfo.userVersion(),
+ sndId,
+ depInfo.classLoaderId(),
+ depInfo.participants(),
+ null);
+
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException("Failed to obtain
deployment for class: " + clsName);
+
+ nodeFilter = U.unmarshal(ctx.marshaller(),
+ nodeFilterBytes,
+ U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ }
+ else {
+ nodeFilter = U.unmarshal(ctx.marshaller(),
+ nodeFilterBytes,
+ U.resolveClassLoader(ctx.config()));
}
- else
- prjPred = (IgnitePredicate<ClusterNode>)in.readObject();
- hnd = (GridContinuousHandler)in.readObject();
- bufSize = in.readInt();
- interval = in.readLong();
- autoUnsubscribe = in.readBoolean();
- }
+ if (hndBytes != null) {
+ hnd = U.unmarshal(ctx.marshaller(), hndBytes,
U.resolveClassLoader(ctx.config()));
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(StartRequestData.class, this);
+ if (ctx.config().isPeerClassLoadingEnabled())
+ hnd.p2pUnmarshal(sndId, ctx);
+
+ if (keepBinary) {
+ assert hnd instanceof CacheContinuousQueryHandler : hnd;
+
+ ((CacheContinuousQueryHandler<?, ?>)hnd).keepBinary(true);
+ }
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
deleted file mode 100644
index 50e197143b7..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.continuous;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Start request data.
- */
-class StartRequestDataV2 implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Serialized node filter. */
- private byte[] nodeFilterBytes;
-
- /** Deployment class name. */
- private String clsName;
-
- /** Deployment info. */
- private GridDeploymentInfo depInfo;
-
- /** Serialized handler. */
- private byte[] hndBytes;
-
- /** Buffer size. */
- private int bufSize;
-
- /** Time interval. */
- private long interval;
-
- /** Automatic unsubscribe flag. */
- private boolean autoUnsubscribe;
-
- /**
- * @param nodeFilterBytes Serialized node filter.
- * @param hndBytes Serialized handler.
- * @param bufSize Buffer size.
- * @param interval Time interval.
- * @param autoUnsubscribe Automatic unsubscribe flag.
- */
- StartRequestDataV2(
- byte[] nodeFilterBytes,
- byte[] hndBytes,
- int bufSize,
- long interval,
- boolean autoUnsubscribe) {
- assert hndBytes != null;
- assert bufSize > 0;
- assert interval >= 0;
-
- this.nodeFilterBytes = nodeFilterBytes;
- this.hndBytes = hndBytes;
- this.bufSize = bufSize;
- this.interval = interval;
- this.autoUnsubscribe = autoUnsubscribe;
- }
-
- /**
- * @return Serialized node filter.
- */
- public byte[] nodeFilterBytes() {
- return nodeFilterBytes;
- }
-
- /**
- * @return Deployment class name.
- */
- public String className() {
- return clsName;
- }
-
- /**
- * @param clsName New deployment class name.
- */
- public void className(String clsName) {
- this.clsName = clsName;
- }
-
- /**
- * @return Deployment info.
- */
- public GridDeploymentInfo deploymentInfo() {
- return depInfo;
- }
-
- /**
- * @param depInfo New deployment info.
- */
- public void deploymentInfo(GridDeploymentInfo depInfo) {
- this.depInfo = depInfo;
- }
-
- /**
- * @return Handler.
- */
- public byte[] handlerBytes() {
- return hndBytes;
- }
-
- /**
- * @return Buffer size.
- */
- public int bufferSize() {
- return bufSize;
- }
-
- /**
- * @param bufSize New buffer size.
- */
- public void bufferSize(int bufSize) {
- this.bufSize = bufSize;
- }
-
- /**
- * @return Time interval.
- */
- public long interval() {
- return interval;
- }
-
- /**
- * @param interval New time interval.
- */
- public void interval(long interval) {
- this.interval = interval;
- }
-
- /**
- * @return Automatic unsubscribe flag.
- */
- public boolean autoUnsubscribe() {
- return autoUnsubscribe;
- }
-
- /**
- * @param autoUnsubscribe New automatic unsubscribe flag.
- */
- public void autoUnsubscribe(boolean autoUnsubscribe) {
- this.autoUnsubscribe = autoUnsubscribe;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(StartRequestDataV2.class, this);
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 4063e05c618..ab19f9bc615 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.processors.continuous;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -35,15 +36,18 @@ public class StartRoutineAckDiscoveryMessage extends
AbstractContinuousMessage {
private static final long serialVersionUID = 0L;
/** */
- private final Map<UUID, IgniteCheckedException> errs;
+ @Order(0)
+ Map<UUID, ErrorMessage> errs;
/** */
@GridToStringExclude
- private final Map<Integer, T2<Long, Long>> updateCntrs;
+ @Order(1)
+ Map<Integer, Long> updateCntrs;
/** */
@GridToStringExclude
- private final Map<UUID, Map<Integer, T2<Long, Long>>> updateCntrsPerNode;
+ @Order(2)
+ Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
/**
* @param routineId Routine id.
@@ -52,16 +56,19 @@ public class StartRoutineAckDiscoveryMessage extends
AbstractContinuousMessage {
* @param cntrsPerNode Partition counters per node.
*/
public StartRoutineAckDiscoveryMessage(UUID routineId,
- Map<UUID, IgniteCheckedException> errs,
- Map<Integer, T2<Long, Long>> cntrs,
- Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) {
+ Map<UUID, ErrorMessage> errs,
+ Map<Integer, Long> cntrs,
+ Map<UUID, Map<Integer, Long>> cntrsPerNode) {
super(routineId);
this.errs = new HashMap<>(errs);
- this.updateCntrs = cntrs;
- this.updateCntrsPerNode = cntrsPerNode;
+ updateCntrs = cntrs;
+ updateCntrsPerNode = cntrsPerNode;
}
+ /** */
+ public StartRoutineAckDiscoveryMessage() {}
+
/** {@inheritDoc} */
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
@@ -70,22 +77,22 @@ public class StartRoutineAckDiscoveryMessage extends
AbstractContinuousMessage {
/**
* @return Update counters for partitions.
*/
- public Map<Integer, T2<Long, Long>> updateCounters() {
+ public Map<Integer, Long> updateCounters() {
return updateCntrs;
}
/**
* @return Update counters for partitions per each node.
*/
- public Map<UUID, Map<Integer, T2<Long, Long>>> updateCountersPerNode() {
+ public Map<UUID, Map<Integer, Long>> updateCountersPerNode() {
return updateCntrsPerNode;
}
/**
* @return Errs.
*/
- public Map<UUID, IgniteCheckedException> errs() {
- return errs;
+ public Map<UUID, Throwable> errors() {
+ return F.viewReadOnly(errs, m -> ErrorMessage.error(m));
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 4bca717bd4d..cf2f639aa8d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -17,18 +17,14 @@
package org.apache.ignite.internal.processors.continuous;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import
org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
/**
* Discovery message used for Continuous Query registration.
@@ -38,35 +34,34 @@ public class StartRoutineDiscoveryMessage extends
AbstractContinuousMessage {
private static final long serialVersionUID = 0L;
/** */
- private final StartRequestData startReqData;
+ @Order(0)
+ StartRequestData startReqData;
/** */
- // Initilized here as well to preserve compatibility with previous versions
- private Map<UUID, IgniteCheckedException> errs = new HashMap<>();
+ @Order(1)
+ Map<UUID, ErrorMessage> errs = new HashMap<>();
/** */
- private Map<Integer, T2<Long, Long>> updateCntrs;
+ @Order(2)
+ Map<Integer, Long> updateCntrs;
/** */
- private Map<UUID, Map<Integer, T2<Long, Long>>> updateCntrsPerNode;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /** */
- private transient ClassNotFoundException deserEx;
+ @Order(3)
+ Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
/**
* @param routineId Routine id.
* @param startReqData Start request data.
*/
- public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData
startReqData, boolean keepBinary) {
+ public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData
startReqData) {
super(routineId);
this.startReqData = startReqData;
- this.keepBinary = keepBinary;
}
+ /** */
+ public StartRoutineDiscoveryMessage() {}
+
/**
* @return Start request data.
*/
@@ -82,21 +77,21 @@ public class StartRoutineDiscoveryMessage extends
AbstractContinuousMessage {
if (errs == null)
errs = new HashMap<>();
- errs.put(nodeId, e);
+ errs.put(nodeId, new ErrorMessage(e));
}
/**
* @param cntrs Update counters.
*/
- private void addUpdateCounters(Map<Integer, T2<Long, Long>> cntrs) {
+ private void addUpdateCounters(Map<Integer, Long> cntrs) {
if (updateCntrs == null)
updateCntrs = new HashMap<>();
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrs.entrySet()) {
- T2<Long, Long> cntr0 = updateCntrs.get(e.getKey());
- T2<Long, Long> cntr1 = e.getValue();
+ for (Map.Entry<Integer, Long> e : cntrs.entrySet()) {
+ Long cntr0 = updateCntrs.get(e.getKey());
+ Long cntr1 = e.getValue();
- if (cntr0 == null || cntr1.get2() > cntr0.get2())
+ if (cntr0 == null || cntr1 > cntr0)
updateCntrs.put(e.getKey(), cntr1);
}
}
@@ -105,31 +100,17 @@ public class StartRoutineDiscoveryMessage extends
AbstractContinuousMessage {
* @param nodeId Local node ID.
* @param cntrs Update counters.
*/
- public void addUpdateCounters(UUID nodeId, Map<Integer, T2<Long, Long>>
cntrs) {
+ public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
addUpdateCounters(cntrs);
if (updateCntrsPerNode == null)
updateCntrsPerNode = new HashMap<>();
- Map<Integer, T2<Long, Long>> old = updateCntrsPerNode.put(nodeId,
cntrs);
+ Map<Integer, Long> old = updateCntrsPerNode.put(nodeId, cntrs);
assert old == null : old;
}
- /**
- * @return Errs.
- */
- public Map<UUID, IgniteCheckedException> errs() {
- return errs != null ? errs : Collections.emptyMap();
- }
-
- /**
- * @return {@code True} if keep binary flag was set on continuous handler.
- */
- public boolean keepBinary() {
- return keepBinary;
- }
-
/** {@inheritDoc} */
@Override public boolean isMutable() {
return true;
@@ -137,28 +118,7 @@ public class StartRoutineDiscoveryMessage extends
AbstractContinuousMessage {
/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
- return new StartRoutineAckDiscoveryMessage(routineId, errs(),
updateCntrs, updateCntrsPerNode);
- }
-
- /** */
- private void readObject(ObjectInputStream in) throws IOException {
- // Override default serialization in order to tolerate missing classes
exceptions (e.g. remote filter class).
- // We need this means because CQ registration process assumes that an
"ack message" will be sent.
- try {
- in.defaultReadObject();
- }
- catch (ClassNotFoundException e) {
- deserEx = e;
-
- throw new IncompleteDeserializationException(this);
- }
- }
-
- /**
- * @return Exception occurred during deserialization.
- */
- @Nullable public ClassNotFoundException deserializationException() {
- return deserEx;
+ return new StartRoutineAckDiscoveryMessage(routineId, errs,
updateCntrs, updateCntrsPerNode);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
index 275765da730..856f1b533eb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.continuous;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -29,41 +30,29 @@ public class StartRoutineDiscoveryMessageV2 extends
AbstractContinuousMessage {
private static final long serialVersionUID = 0L;
/** */
- private static final int KEEP_BINARY_FLAG = 0x01;
+ @Order(0)
+ StartRequestData startReqData;
/** */
- private final StartRequestDataV2 startReqData;
-
- /** Flags. */
- private int flags;
+ public StartRoutineDiscoveryMessageV2() {}
/**
* @param routineId Routine id.
* @param startReqData Start request data.
- * @param keepBinary Keep binary flag.
*/
- StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2
startReqData, boolean keepBinary) {
+ StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestData
startReqData) {
super(routineId);
this.startReqData = startReqData;
-
- if (keepBinary)
- flags |= KEEP_BINARY_FLAG;
}
/**
* @return Start request data.
*/
- public StartRequestDataV2 startRequestData() {
+ public StartRequestData startRequestData() {
return startReqData;
}
- /**
- * @return {@code True} if keep binary flag was set on continuous handler.
- */
- public boolean keepBinary() {
- return (flags & KEEP_BINARY_FLAG) != 0;
- }
/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
index 4710f493609..26263c4d629 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -20,13 +20,12 @@ package org.apache.ignite.internal.processors.continuous;
import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage
implements Message {
+public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -51,5 +50,4 @@ public class StopRoutineAckDiscoveryMessage extends
AbstractContinuousMessage im
@Override public String toString() {
return S.toString(StopRoutineAckDiscoveryMessage.class, this,
"routineId", routineId());
}
-
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
index 39878b81514..45589a86a42 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
@@ -20,13 +20,12 @@ package org.apache.ignite.internal.processors.continuous;
import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage
implements Message {
+public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -51,5 +50,4 @@ public class StopRoutineDiscoveryMessage extends
AbstractContinuousMessage imple
@Override public String toString() {
return S.toString(StopRoutineDiscoveryMessage.class, this,
"routineId", routineId());
}
-
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 381a80995c3..4d37c8c1453 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -22,7 +22,6 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
-import
org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
@@ -128,20 +127,8 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
if (serMsg != null)
msg = (DiscoverySpiCustomMessage)serMsg;
else {
- try {
- if (msgBytes != null)
- msg = U.unmarshal(marsh, msgBytes, ldr);
- }
- catch (IgniteCheckedException e) {
- // Try to resurrect a message in a case of deserialization
failure
- if (e.getCause() instanceof
IncompleteDeserializationException) {
- msg =
((IncompleteDeserializationException)e.getCause()).message();
-
- return;
- }
-
- throw e;
- }
+ if (msgBytes != null)
+ msg = U.unmarshal(marsh, msgBytes, ldr);
}
}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 50848832fe6..53f96dcb337 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -728,7 +728,6 @@
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$1
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$3$1
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$6
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$7
-org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException
org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper
org.apache.ignite.internal.managers.encryption.CacheGroupEncryptionKeys$TrackedWalSegment
org.apache.ignite.internal.managers.encryption.CacheGroupPageScanner$1
@@ -1522,7 +1521,6 @@
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$Discove
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryDataItem
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRoutineInfo
org.apache.ignite.internal.processors.continuous.StartRequestData
-org.apache.ignite.internal.processors.continuous.StartRequestDataV2
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationExceptionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationExceptionTest.java
deleted file mode 100644
index e79af4f287a..00000000000
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IncompleteDeserializationExceptionTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.discovery;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.UUID;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
-
-/** */
-public class IncompleteDeserializationExceptionTest extends
GridCommonAbstractTest {
- /** */
- private Path tmpDir;
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- tmpDir = Files.createTempDirectory(UUID.randomUUID().toString());
-
- Files.createDirectories(tmpDir);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- Files.delete(tmpDir);
-
- super.afterTest();
- }
-
- /** */
- @Test
- public void testMissingClassDeserialization() throws Exception {
- try (ObjectInputStream in = new
ObjectInputStream(getClass().getResourceAsStream("Wrapper.ser"))) {
- in.readObject();
-
- fail("Exception is expected");
- }
- catch (IncompleteDeserializationException e) {
- Wrapper wrp = (Wrapper)e.message();
-
- assertNotNull(wrp);
- assertEquals(42, wrp.i);
- assertNull(wrp.o);
- }
- }
-
- /** */
- public static class Wrapper implements DiscoveryCustomMessage {
- /** */
- private static final long serialVersionUID = 0;
-
- /** */
- private final int i;
-
- /** */
- private final Object o;
-
- /** */
- private Wrapper(int i, Object o) {
- this.i = i;
- this.o = o;
- }
-
- /** */
- private void readObject(ObjectInputStream in) throws IOException {
- try {
- in.defaultReadObject();
- }
- catch (ClassNotFoundException e) {
- throw new IncompleteDeserializationException(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid id() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable DiscoveryCustomMessage ackMessage() {
- return null;
- }
- }
-
- // Commented lines were used to prepare serialized object
-// public static void main(String[] args) throws IOException {
-// try (ObjectOutputStream out = new ObjectOutputStream(new
FileOutputStream("Wrapper.ser"))) {
-// out.writeObject(new Wrapper(42, new ForeignClass()));
-// }
-// }
-//
-// public static class ForeignClass implements Serializable {
-// }
-}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
index eac3c20f8e5..cdff17bb2f9 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
@@ -37,7 +37,6 @@ import
org.apache.ignite.internal.managers.IgniteDiagnosticMessagesMultipleConne
import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest;
import
org.apache.ignite.internal.managers.IgniteDiagnosticPartitionReleaseFutureLimitTest;
import
org.apache.ignite.internal.managers.communication.GridIoManagerFileTransmissionSelfTest;
-import
org.apache.ignite.internal.managers.discovery.IncompleteDeserializationExceptionTest;
import org.apache.ignite.internal.metric.MetricConfigurationTest;
import org.apache.ignite.internal.metric.MetricsClusterActivationTest;
import org.apache.ignite.internal.metric.PeriodicHistogramMetricImplTest;
@@ -205,8 +204,6 @@ import org.junit.runners.Suite;
ClassPathContentLoggingTest.class,
- IncompleteDeserializationExceptionTest.class,
-
GridIoManagerFileTransmissionSelfTest.class,
IgniteStandardMXBeanTest.class,