Repository: ignite Updated Branches: refs/heads/ignite-5075 0a98fc657 -> b53950ac9
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b53950ac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b53950ac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b53950ac Branch: refs/heads/ignite-5075 Commit: b53950ac9ab815072c5bc2005a626ea7af2b198d Parents: 0a98fc6 Author: sboikov <[email protected]> Authored: Fri May 12 13:20:27 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri May 12 13:20:27 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 8 +- .../cache/CacheGroupInfrastructure.java | 4 +- .../cache/GridCacheGroupIdMessage.java | 5 + .../processors/cache/GridCacheIdMessage.java | 5 + .../processors/cache/GridCacheIoManager.java | 142 ++++++++++++++----- .../processors/cache/GridCacheMessage.java | 5 + .../GridCachePartitionExchangeManager.java | 16 +-- .../cache/GridCacheSharedContext.java | 4 +- .../GridChangeGlobalStateMessageResponse.java | 5 + .../GridDistributedTxFinishResponse.java | 5 + .../distributed/dht/GridDhtCacheAdapter.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 14 +- .../dht/GridDhtTxOnePhaseCommitAckRequest.java | 5 + .../dht/atomic/GridDhtAtomicCache.java | 23 ++- .../dht/colocated/GridDhtColocatedCache.java | 6 +- .../GridDhtPartitionsAbstractMessage.java | 5 + .../dht/preloader/GridDhtPreloader.java | 4 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../near/GridNearTransactionalCache.java | 4 +- .../query/GridCacheDistributedQueryManager.java | 10 +- .../continuous/CacheContinuousQueryManager.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 22 +-- .../cache/transactions/TxLocksRequest.java | 5 + .../cache/transactions/TxLocksResponse.java | 5 + .../cluster/GridClusterStateProcessor.java | 4 +- .../GridCacheConditionalDeploymentSelfTest.java | 5 + .../processors/cache/IgniteCacheGroupsTest.java | 4 +- .../communication/GridCacheMessageSelfTest.java | 15 ++ 28 files changed, 242 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 11253fc..d9c6071 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -310,7 +310,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Integer grpId = grp.groupId(); if (!grpHolders.containsKey(grp.groupId())) { - cctx.io().addHandler(grpId, GridDhtAffinityAssignmentResponse.class, + cctx.io().addHandler(true, grpId, GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { processAffinityAssignmentResponse(grpId, nodeId, res); @@ -469,7 +469,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap stoppedGrps.add(cacheGrp.groupId()); - cctx.io().removeHandler(cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); + cctx.io().removeHandler(true, cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); } } } @@ -1139,7 +1139,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); if (grp == null) { - cctx.io().addHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, + cctx.io().addHandler(true, desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { processAffinityAssignmentResponse(grpId, nodeId, res); @@ -1231,7 +1231,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final CacheGroupInfrastructure grp = cctx.cache().cacheGroup(desc.groupId()); if (grp == null) { - cctx.io().addHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, + cctx.io().addHandler(true, desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { processAffinityAssignmentResponse(desc.groupId(), nodeId, res); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 1b5187e..94d4357 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -378,6 +378,8 @@ public class CacheGroupInfrastructure { */ void stopGroup() { offheapMgr.stop(); + + ctx.io().removeCacheGroupHandlers(grpId); } /** @@ -406,7 +408,7 @@ public class CacheGroupInfrastructure { top = new GridDhtPartitionTopologyImpl(ctx, this, entryFactory); if (!ctx.kernalContext().clientNode()) { - ctx.io().addHandler(groupId(), GridDhtAffinityAssignmentRequest.class, + ctx.io().addHandler(true, groupId(), GridDhtAffinityAssignmentRequest.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) { processAffinityAssignmentRequest(nodeId, msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java index 29a978e..67ca115 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java @@ -39,6 +39,11 @@ public abstract class GridCacheGroupIdMessage extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return true; + } + + /** {@inheritDoc} */ @Override public final int handlerId() { return grpId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java index 25a553b..f27f3c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java @@ -38,6 +38,11 @@ public abstract class GridCacheIdMessage extends GridCacheMessage { return cacheId; } + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * @param cacheId Cache ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index fb2ac3d..d505825 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -110,16 +110,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Number of retries using to send messages. */ private int retryCnt; - /** Indexed class handlers. */ - private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + /** */ + private final MessageHandlers cacheHandlers = new MessageHandlers(); - /** Handler registry. */ - private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> - clsHandlers = new ConcurrentHashMap8<>(); - - /** Ordered handler registry. */ - private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = - new ConcurrentHashMap8<>(); + /** */ + private final MessageHandlers grpHandlers = new MessageHandlers(); /** Stopping flag. */ private boolean stopping; @@ -259,12 +254,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings("unchecked") private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) { + handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers); + } + + /** + * @param nodeId Sender node ID. + * @param cacheMsg Message. + * @param msgHandlers Message handlers. + */ + @SuppressWarnings("unchecked") + private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) { int msgIdx = cacheMsg.lookupIndex(); IgniteBiInClosure<UUID, GridCacheMessage> c = null; if (msgIdx >= 0) { - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId()); @@ -273,7 +278,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } if (c == null) - c = clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass())); + c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass())); if (c == null) { IgniteLogger log = cacheMsg.messageLogger(cctx); @@ -289,7 +294,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { msg0.append(U.nl()).append("Registered listeners:"); - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet()) msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue())); @@ -322,7 +327,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Override protected void onKernalStop0(boolean cancel) { cctx.gridIO().removeMessageListener(TOPIC_CACHE); - for (Object ordTopic : orderedHandlers.keySet()) + for (Object ordTopic : cacheHandlers.orderedHandlers.keySet()) + cctx.gridIO().removeMessageListener(ordTopic); + + for (Object ordTopic : grpHandlers.orderedHandlers.keySet()) cctx.gridIO().removeMessageListener(ordTopic); boolean interrupted = false; @@ -1180,19 +1188,35 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** * Adds message handler. * + * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. * @param hndId Message handler ID. * @param type Type of message. * @param c Handler. */ - @SuppressWarnings({"unchecked"}) public void addHandler( + boolean cacheGrp, int hndId, Class<? extends GridCacheMessage> type, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + addHandler(hndId, type, c, cacheGrp ? grpHandlers : cacheHandlers); + } + + /** + * @param hndId Message handler ID. + * @param type Type of message. + * @param c Handler. + * @param msgHandlers Message handlers. + */ + @SuppressWarnings({"unchecked"}) + private void addHandler( + int hndId, + Class<? extends GridCacheMessage> type, + IgniteBiInClosure<UUID, ? extends GridCacheMessage> c, + MessageHandlers msgHandlers) { int msgIdx = messageIndex(type); if (msgIdx != -1) { - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId); @@ -1208,17 +1232,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { cacheClsHandlers[msgIdx] = c; - idxClsHandlers = idxClsHandlers0; + msgHandlers.idxClsHandlers = idxClsHandlers0; return; } else { ListenerKey key = new ListenerKey(hndId, type); - if (clsHandlers.putIfAbsent(key, + if (msgHandlers.clsHandlers.putIfAbsent(key, (IgniteBiInClosure<UUID, GridCacheMessage>)c) != null) assert false : "Handler for class already registered [hndId=" + hndId + ", cls=" + type + - ", old=" + clsHandlers.get(key) + ", new=" + c + ']'; + ", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c + ']'; } IgniteLogger log0 = log; @@ -1232,25 +1256,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** * @param cacheId Cache ID to remove handlers for. */ - public void removeHandlers(int cacheId) { - assert cacheId != 0; + void removeCacheHandlers(int cacheId) { + removeHandlers(cacheHandlers, cacheId); + } + + /** + * @param grpId Cache group ID to remove handlers for. + */ + void removeCacheGroupHandlers(int grpId) { + removeHandlers(grpHandlers, grpId); + } - idxClsHandlers.remove(cacheId); + /** + * @param msgHandlers Handlers. + * @param hndId ID to remove handlers for. + */ + private void removeHandlers(MessageHandlers msgHandlers, int hndId) { + assert hndId != 0; - for (Iterator<ListenerKey> iter = clsHandlers.keySet().iterator(); iter.hasNext(); ) { + msgHandlers.idxClsHandlers.remove(hndId); + + for (Iterator<ListenerKey> iter = msgHandlers.clsHandlers.keySet().iterator(); iter.hasNext(); ) { ListenerKey key = iter.next(); - if (key.cacheId == cacheId) + if (key.hndId == hndId) iter.remove(); } } /** - * @param cacheId Cache ID to remove handlers for. + * @param cacheGrp {@code True} if cache group handler, {@code false} if cache handler. + * @param hndId Handler ID. * @param type Message type. */ - public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) { - clsHandlers.remove(new ListenerKey(cacheId, type)); + public void removeHandler(boolean cacheGrp, int hndId, Class<? extends GridCacheMessage> type) { + MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; + + msgHandlers.clsHandlers.remove(new ListenerKey(hndId, type)); } /** @@ -1274,14 +1316,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** * Adds ordered message handler. * + * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. * @param topic Topic. * @param c Handler. */ @SuppressWarnings({"unchecked"}) - public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + public void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; + IgniteLogger log0 = log; - if (orderedHandlers.putIfAbsent(topic, c) == null) { + if (msgHandlers.orderedHandlers.putIfAbsent(topic, c) == null) { cctx.gridIO().addMessageListener(topic, new OrderedMessageListener( (IgniteBiInClosure<UUID, GridCacheMessage>)c)); @@ -1296,10 +1341,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** * Removed ordered message handler. * + * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. * @param topic Topic. */ - public void removeOrderedHandler(Object topic) { - if (orderedHandlers.remove(topic) != null) { + public void removeOrderedHandler(boolean cacheGrp, Object topic) { + MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; + + if (msgHandlers.orderedHandlers.remove(topic) != null) { cctx.gridIO().removeMessageListener(topic); if (log != null && log.isDebugEnabled()) @@ -1354,8 +1402,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Cache IO manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); - X.println(">>> clsHandlersSize: " + clsHandlers.size()); - X.println(">>> orderedHandlersSize: " + orderedHandlers.size()); + X.println(">>> cacheClsHandlersSize: " + cacheHandlers.clsHandlers.size()); + X.println(">>> cacheOrderedHandlersSize: " + cacheHandlers.orderedHandlers.size()); + X.println(">>> cacheGrpClsHandlersSize: " + grpHandlers.clsHandlers.size()); + X.println(">>> cacheGrpOrderedHandlersSize: " + grpHandlers.orderedHandlers.size()); + } + + /** + * + */ + static class MessageHandlers { + /** Indexed class handlers. */ + volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + + /** Handler registry. */ + ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> + clsHandlers = new ConcurrentHashMap8<>(); + + /** Ordered handler registry. */ + ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = + new ConcurrentHashMap8<>(); } /** @@ -1389,17 +1455,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ private static class ListenerKey { /** Cache ID. */ - private int cacheId; + private int hndId; /** Message class. */ private Class<? extends GridCacheMessage> msgCls; /** - * @param cacheId Cache ID. + * @param hndId Handler ID. * @param msgCls Message class. */ - private ListenerKey(int cacheId, Class<? extends GridCacheMessage> msgCls) { - this.cacheId = cacheId; + private ListenerKey(int hndId, Class<? extends GridCacheMessage> msgCls) { + this.hndId = hndId; this.msgCls = msgCls; } @@ -1413,12 +1479,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ListenerKey that = (ListenerKey)o; - return cacheId == that.cacheId && msgCls.equals(that.msgCls); + return hndId == that.hndId && msgCls.equals(that.msgCls); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = cacheId; + int res = hndId; res = 31 * res + msgCls.hashCode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index ec5efad..6578bc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -90,6 +90,11 @@ public abstract class GridCacheMessage implements Message { public abstract int handlerId(); /** + * @return {@code True} if cache group message. + */ + public abstract boolean cacheGroupMessage(); + + /** * @return Error, if any. */ @Nullable public Throwable error() { http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 98ad758..a666297 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -308,21 +308,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); - cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, + cctx.io().addHandler(false, 0, GridDhtPartitionsSingleMessage.class, new MessageHandler<GridDhtPartitionsSingleMessage>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { processSinglePartitionUpdate(node, msg); } }); - cctx.io().addHandler(0, GridDhtPartitionsFullMessage.class, + cctx.io().addHandler(false, 0, GridDhtPartitionsFullMessage.class, new MessageHandler<GridDhtPartitionsFullMessage>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { processFullPartitionUpdate(node, msg); } }); - cctx.io().addHandler(0, GridDhtPartitionsSingleRequest.class, + cctx.io().addHandler(false, 0, GridDhtPartitionsSingleRequest.class, new MessageHandler<GridDhtPartitionsSingleRequest>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) { processSinglePartitionRequest(node, msg); @@ -381,7 +381,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { final int idx = cnt; - cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { + cctx.io().addOrderedHandler(true, rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { @Override public void apply(final UUID id, final GridCacheMessage m) { if (!enterBusy()) return; @@ -498,9 +498,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void onKernalStop0(boolean cancel) { cctx.gridEvents().removeDiscoveryEventListener(discoLsnr); - cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); - cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); - cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class); + cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleMessage.class); + cctx.io().removeHandler(false, 0, GridDhtPartitionsFullMessage.class); + cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleRequest.class); stopErr = cctx.kernalContext().clientDisconnected() ? new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(), @@ -520,7 +520,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!cctx.kernalContext().clientNode()) { for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) - cctx.io().removeOrderedHandler(rebalanceTopic(cnt)); + cctx.io().removeOrderedHandler(true, rebalanceTopic(cnt)); } U.cancel(exchWorker); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 55f3c42..6be440d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -404,7 +404,7 @@ public class GridCacheSharedContext<K, V> { /** * @param cacheCtx Cache context to remove. */ - public void removeCacheContext(GridCacheContext cacheCtx) { + void removeCacheContext(GridCacheContext cacheCtx) { int cacheId = cacheCtx.cacheId(); ctxMap.remove(cacheId, cacheCtx); @@ -415,7 +415,7 @@ public class GridCacheSharedContext<K, V> { locStoreCnt.decrementAndGet(); // Safely clean up the message listeners. - ioMgr.removeHandlers(cacheId); + ioMgr.removeCacheHandlers(cacheId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java index bfe6eee..a64c8bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java @@ -60,6 +60,11 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public int handlerId() { return 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java index 79db810..c36e633 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java @@ -76,6 +76,11 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public final int partition() { return part; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index d90ee6c..f58e0df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -171,7 +171,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { + ctx.io().addHandler(false, ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) { processTtlUpdateRequest(req); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 9b61f14..8f46f89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -118,43 +118,43 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @Override public void apply(UUID nodeId, GridNearGetRequest req) { processNearGetRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { processNearSingleGetRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { @Override public void apply(UUID nodeId, GridNearLockRequest req) { processNearLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { + ctx.io().addHandler(false, ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { @Override public void apply(UUID nodeId, GridDhtLockRequest req) { processDhtLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { + ctx.io().addHandler(false, ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { @Override public void apply(UUID nodeId, GridDhtLockResponse req) { processDhtLockResponse(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { @Override public void apply(UUID nodeId, GridNearUnlockRequest req) { processNearUnlockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { + ctx.io().addHandler(false, ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) { processDhtUnlockRequest(nodeId, req); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java index 3b68a5a..67eacd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java @@ -52,6 +52,11 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { return 0; } + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * * @param vers Near Tx xid Versions. http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2d855fe..c080470 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -239,6 +239,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { metrics = m; ctx.io().addHandler( + false, ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @@ -253,6 +254,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); ctx.io().addHandler( + false, ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { @@ -267,6 +269,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); ctx.io().addHandler( + false, ctx.cacheId(), GridNearAtomicAbstractUpdateRequest.class, new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() { @@ -285,7 +288,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addHandler( + false, + ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { @Override public void apply( @@ -304,6 +309,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); ctx.io().addHandler( + false, ctx.cacheId(), GridDhtAtomicAbstractUpdateRequest.class, new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { @@ -323,6 +329,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); ctx.io().addHandler( + false, ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { @@ -341,7 +348,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addHandler( + false, + ctx.cacheId(), GridDhtAtomicDeferredUpdateResponse.class, new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { @Override public void apply( @@ -359,7 +368,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addHandler( + false, + ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() { @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { @@ -372,7 +383,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addHandler( + false, + ctx.cacheId(), GridNearAtomicCheckUpdateRequest.class, new CI2<UUID, GridNearAtomicCheckUpdateRequest>() { @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) { @@ -387,6 +400,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (near == null) { ctx.io().addHandler( + false, ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @@ -401,6 +415,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); ctx.io().addHandler( + false, ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 9ee701a..6ad1d9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -132,19 +132,19 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processNearGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { processNearSingleGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index d5a60ef..441952d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -65,6 +65,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public int partition() { return GridIoMessage.STRIPE_DISABLED_PART; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 3d62c2f..8dfb4d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -161,14 +161,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("Starting DHT rebalancer..."); - ctx.io().addHandler(grp.groupId(), GridDhtForceKeysRequest.class, + ctx.io().addHandler(true, grp.groupId(), GridDhtForceKeysRequest.class, new MessageHandler<GridDhtForceKeysRequest>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { processForceKeysRequest(node, msg); } }); - ctx.io().addHandler(grp.groupId(), GridDhtForceKeysResponse.class, + ctx.io().addHandler(true, grp.groupId(), GridDhtForceKeysResponse.class, new MessageHandler<GridDhtForceKeysResponse>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { processForceKeyResponse(node, msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 2d5c8a5..0c33edc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -103,7 +103,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index cc90be0..0b9a1c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -87,13 +87,13 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + ctx.io().addHandler(false, ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 06a3416..bb525bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -104,7 +104,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage assert cctx.config().getCacheMode() != LOCAL; - cctx.io().addHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() { + cctx.io().addHandler(false, cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() { @Override public void apply(UUID nodeId, GridCacheQueryRequest req) { processQueryRequest(nodeId, req); } @@ -560,11 +560,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(topic, resHnd); + cctx.io().addOrderedHandler(false, topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - cctx.io().removeOrderedHandler(topic); + cctx.io().removeOrderedHandler(false, topic); } }); @@ -744,11 +744,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(topic, resHnd); + cctx.io().addOrderedHandler(false, topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - cctx.io().removeOrderedHandler(topic); + cctx.io().removeOrderedHandler(false, topic); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index a1e0bad..03e1e1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -127,7 +127,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); if (cctx.affinityNode()) { - cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, + cctx.io().addHandler(false, cctx.cacheId(), CacheContinuousQueryBatchAck.class, new CI2<UUID, CacheContinuousQueryBatchAck>() { @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index c6dc114..e686252 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -136,68 +136,68 @@ public class IgniteTxHandler { txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY); txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY); - ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg); } }); - ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg); } }); - ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg); } }); - ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg); } }); - ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg); } }); - ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg); } }); - ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg); } }); - ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg); } }); - ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addHandler(false, 0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg); } }); - ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class, + ctx.io().addHandler(false, 0, GridCacheTxRecoveryRequest.class, new CI2<UUID, GridCacheTxRecoveryRequest>() { @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) { processCheckPreparedTxRequest(nodeId, req); } }); - ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class, + ctx.io().addHandler(false, 0, GridCacheTxRecoveryResponse.class, new CI2<UUID, GridCacheTxRecoveryResponse>() { @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) { processCheckPreparedTxResponse(nodeId, res); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java index a1fb9a3..94fe005 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java @@ -75,6 +75,11 @@ public class TxLocksRequest extends GridCacheMessage { return 0; } + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * @return Future ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java index ab2e579..a5c8f09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java @@ -78,6 +78,11 @@ public class TxLocksResponse extends GridCacheMessage { return 0; } + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * @return Future ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index b25b229..dbdc47c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -134,7 +134,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { cacheProc = ctx.cache(); sharedCtx = cacheProc.context(); - sharedCtx.io().addHandler(0, + sharedCtx.io().addHandler(false, 0, GridChangeGlobalStateMessageResponse.class, new CI2<UUID, GridChangeGlobalStateMessageResponse>() { @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { @@ -194,7 +194,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { @Override public void stop(boolean cancel) throws IgniteCheckedException { super.stop(cancel); - sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class); + sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class); ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); IgniteCheckedException stopErr = new IgniteInterruptedCheckedException( http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java index f5cf177..3e68260 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java @@ -188,6 +188,11 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public short directType() { return DIRECT_TYPE; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index bf99013..4ba4cfe 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -177,7 +177,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCreateCache1() throws Exception { - Ignite srv0 = ignite(0); + Ignite srv0 = startGrid(0); { IgniteCache<Object, Object> cache1 = srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 2)); @@ -215,7 +215,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCreateCache2() throws Exception { - Ignite srv0 = ignite(0); + Ignite srv0 = startGrid(0); { IgniteCache<Object, Object> cache1 = srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 0)); http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java index ab5bfd5..4a6b765 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java @@ -221,6 +221,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return false; } @@ -321,6 +326,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return false; } @@ -449,6 +459,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return false; }
