http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5c466f4..5d67c3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -307,21 +307,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); - cctx.io().addHandler(false, 0, GridDhtPartitionsSingleMessage.class, + cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class, new MessageHandler<GridDhtPartitionsSingleMessage>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { processSinglePartitionUpdate(node, msg); } }); - cctx.io().addHandler(false, 0, GridDhtPartitionsFullMessage.class, + cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class, new MessageHandler<GridDhtPartitionsFullMessage>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { processFullPartitionUpdate(node, msg); } }); - cctx.io().addHandler(false, 0, GridDhtPartitionsSingleRequest.class, + cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class, new MessageHandler<GridDhtPartitionsSingleRequest>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) { processSinglePartitionRequest(node, msg); @@ -380,29 +380,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { final int idx = cnt; - cctx.io().addOrderedHandler(true, rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { - @Override public void apply(final UUID id, final GridCacheMessage m) { + cctx.io().addOrderedCacheGroupHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() { + @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) { if (!enterBusy()) return; try { - if (m instanceof GridCacheGroupIdMessage) { - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(((GridCacheGroupIdMessage)m).groupId()); + CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId()); - if (grp != null) { - if (m instanceof GridDhtPartitionSupplyMessage) { - grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m); + if (grp != null) { + if (m instanceof GridDhtPartitionSupplyMessage) { + grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m); - return; - } - else if (m instanceof GridDhtPartitionDemandMessage) { - grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m); + return; + } + else if (m instanceof GridDhtPartitionDemandMessage) { + grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m); - return; - } + return; } - else - U.warn(log, "Failed to find cache group [msg=" + m + ']'); } U.error(log, "Unsupported message type: " + m.getClass().getName()); @@ -423,13 +419,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { fut.get(); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) grp.preloader().onInitialExchangeComplete(null); reconnectExchangeFut.onDone(); } catch (IgniteCheckedException e) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) grp.preloader().onInitialExchangeComplete(e); reconnectExchangeFut.onDone(e); @@ -475,7 +471,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (nodeStartVer.equals(grp.localStartVersion())) grp.preloader().onInitialExchangeComplete(null); } @@ -811,7 +807,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { // Check rebalance state & send CacheAffinityChangeMessage if need. - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) { GridDhtPartitionTopology top = grp.topology(); @@ -887,7 +883,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) { if (exchId != null) { AffinityTopologyVersion startTopVer = grp.localStartVersion(); @@ -1019,7 +1015,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) { GridDhtPartitionMap locMap = grp.topology().localPartitionMap(); @@ -1249,7 +1245,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = null; @@ -1292,7 +1288,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null && grp.localStartVersion().compareTo(entry.getValue().topologyVersion()) > 0) @@ -1398,7 +1394,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana dumpPendingObjects(exchTopVer); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) grp.preloader().dumpDebugInfo(); cctx.affinity().dumpDebugInfo(); @@ -1562,7 +1558,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1717,7 +1713,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { boolean preloadFinished = true; - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1828,7 +1824,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean changed = false; - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1852,7 +1848,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!exchFut.skipPreload() && cctx.kernalContext().state().active()) { assignsMap = new HashMap<>(); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { long delay = grp.config().getRebalanceDelay(); GridDhtPreloaderAssignments assigns = null; @@ -1878,7 +1874,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { int grpId = e.getKey(); - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); int order = grp.config().getRebalanceOrder(); @@ -1896,7 +1892,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (Integer order : orderMap.descendingKeySet()) { for (Integer grpId : orderMap.get(order)) { - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPreloaderAssignments assigns = assignsMap.get(grpId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index df563f9..98874e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable; */ public class GridCachePreloaderAdapter implements GridCachePreloader { /** */ - protected final CacheGroupInfrastructure grp; + protected final CacheGroupContext grp; /** */ protected final GridCacheSharedContext ctx; @@ -57,7 +57,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** * @param grp Cache group. */ - public GridCachePreloaderAdapter(CacheGroupInfrastructure grp) { + public GridCachePreloaderAdapter(CacheGroupContext grp) { assert grp != null; this.grp = grp; http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5990c42..ad83b14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -175,7 +175,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { private GridCacheSharedContext<?, ?> sharedCtx; /** */ - private final ConcurrentMap<Integer, CacheGroupInfrastructure> cacheGrps = new ConcurrentHashMap<>(); + private final ConcurrentMap<Integer, CacheGroupContext> cacheGrps = new ConcurrentHashMap<>(); /** */ private final Map<String, GridCacheAdapter<?, ?>> caches; @@ -609,7 +609,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param grp Cache group. */ - private void cleanup(CacheGroupInfrastructure grp) { + private void cleanup(CacheGroupContext grp) { CacheConfiguration cfg = grp.config(); for (Object obj : grp.configuredUserObjects()) @@ -814,14 +814,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param grpId Group ID. * @return Cache group. */ - @Nullable public CacheGroupInfrastructure cacheGroup(int grpId) { + @Nullable public CacheGroupContext cacheGroup(int grpId) { return cacheGrps.get(grpId); } /** * @return Cache groups. */ - public Collection<CacheGroupInfrastructure> cacheGroups() { + public Collection<CacheGroupContext> cacheGroups() { return cacheGrps.values(); } @@ -992,7 +992,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { stopCache(cache, cancel, false); } - for (CacheGroupInfrastructure grp : cacheGrps.values()) + for (CacheGroupContext grp : cacheGrps.values()) stopCacheGroup(grp.groupId()); cachesInfo.clearCaches(); @@ -1017,7 +1017,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { // No new caches should be added after this point. exch.onKernalStop(cancel); - for (CacheGroupInfrastructure grp : cacheGrps.values()) + for (CacheGroupContext grp : cacheGrps.values()) grp.onKernalStop(); onKernalStopCaches(cancel); @@ -1042,7 +1042,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCheckedException affErr = new IgniteCheckedException("Failed to wait for topology update, node is stopping."); - for (CacheGroupInfrastructure grp : cacheGrps.values()) { + for (CacheGroupContext grp : cacheGrps.values()) { GridAffinityAssignmentCache aff = grp.affinity(); aff.cancelFutures(affErr); @@ -1081,7 +1081,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (IgniteInternalFuture fut : pendingTemplateFuts.values()) ((GridFutureAdapter)fut).onDone(err); - for (CacheGroupInfrastructure grp : cacheGrps.values()) + for (CacheGroupContext grp : cacheGrps.values()) grp.onDisconnected(reconnectFut); for (GridCacheAdapter cache : caches.values()) { @@ -1153,7 +1153,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { final Set<Integer> stoppedGrps = reconnectRes.stoppedCacheGroups(); - for (CacheGroupInfrastructure grp : cacheGrps.values()) { + for (CacheGroupContext grp : cacheGrps.values()) { if (stoppedGrps.contains(grp.groupId())) cacheGrps.remove(grp.groupId()); else @@ -1171,7 +1171,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { stopFut = ctx.closure().runLocalSafe(new Runnable() { @Override public void run() { for (GridCacheAdapter cache : stoppedCaches) { - CacheGroupInfrastructure grp = cache.context().group(); + CacheGroupContext grp = cache.context().group(); onKernalStop(cache, true); stopCache(cache, true, false); @@ -1424,7 +1424,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed to create cache. */ private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, - CacheGroupInfrastructure grp, + CacheGroupContext grp, @Nullable CachePluginManager pluginMgr, DynamicCacheDescriptor desc, AffinityTopologyVersion locStartTopVer, @@ -1896,10 +1896,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { String grpName = startCfg.getGroupName(); - CacheGroupInfrastructure grp = null; + CacheGroupContext grp = null; if (grpName != null) { - for (CacheGroupInfrastructure grp0 : cacheGrps.values()) { + for (CacheGroupContext grp0 : cacheGrps.values()) { if (grp0.sharedGroup() && grpName.equals(grp0.name())) { grp = grp0; @@ -1956,7 +1956,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Started cache group. * @throws IgniteCheckedException If failed. */ - private CacheGroupInfrastructure startCacheGroup( + private CacheGroupContext startCacheGroup( CacheGroupDescriptor desc, CacheType cacheType, boolean affNode, @@ -1971,7 +1971,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { FreeList freeList = sharedCtx.database().freeList(memPlcName); ReuseList reuseList = sharedCtx.database().reuseList(memPlcName); - CacheGroupInfrastructure grp = new CacheGroupInfrastructure(sharedCtx, + CacheGroupContext grp = new CacheGroupContext(sharedCtx, desc.groupId(), desc.receivedFrom(), cacheType, @@ -1990,7 +1990,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { grp.start(); - CacheGroupInfrastructure old = cacheGrps.put(desc.groupId(), grp); + CacheGroupContext old = cacheGrps.put(desc.groupId(), grp); assert old == null : old.name(); @@ -2033,7 +2033,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param req Stop request. * @return Cache group for stopped cache. */ - private CacheGroupInfrastructure prepareCacheStop(DynamicCacheChangeRequest req, boolean forceClose) { + private CacheGroupContext prepareCacheStop(DynamicCacheChangeRequest req, boolean forceClose) { assert req.stop() || req.close() || forceClose : req; GridCacheAdapter<?, ?> cache = caches.remove(req.cacheName()); @@ -2127,7 +2127,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy.context().gate().onStopped(); - CacheGroupInfrastructure grp = prepareCacheStop(req.request(), forceClose); + CacheGroupContext grp = prepareCacheStop(req.request(), forceClose); if (grp != null && !grp.hasCaches()) stopCacheGroup(grp.groupId()); @@ -2144,7 +2144,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param grpId Group ID. */ private void stopCacheGroup(int grpId) { - CacheGroupInfrastructure grp = cacheGrps.remove(grpId); + CacheGroupContext grp = cacheGrps.remove(grpId); if (grp != null) stopCacheGroup(grp); @@ -2153,7 +2153,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param grp Cache group. */ - private void stopCacheGroup(CacheGroupInfrastructure grp) { + private void stopCacheGroup(CacheGroupContext grp) { grp.stopGroup(); U.stopLifecycleAware(log, grp.configuredUserObjects()); @@ -3517,7 +3517,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param objs Extra components. * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface. */ - private Iterable<Object> lifecycleAwares(CacheGroupInfrastructure grp, CacheConfiguration ccfg, Object... objs) { + private Iterable<Object> lifecycleAwares(CacheGroupContext grp, CacheConfiguration ccfg, Object... objs) { Collection<Object> ret = new ArrayList<>(7 + objs.length); if (grp.affinityFunction() != ccfg.getAffinity()) @@ -3897,7 +3897,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.closure().runLocalSafe(new Runnable() { @Override public void run() { try { - for (CacheGroupInfrastructure grp : sharedCtx.cache().cacheGroups()) { + for (CacheGroupContext grp : sharedCtx.cache().cacheGroups()) { if (!grp.isLocal() && grp.affinityNode()) { GridDhtPartitionTopology top = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 55485bd..2f7d945 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -44,7 +44,7 @@ public interface IgniteCacheOffheapManager { * @param grp Cache group. * @throws IgniteCheckedException If failed. */ - public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException;; + public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException;; /** * @param cctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 81f8ce2..21210ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -86,7 +86,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager protected GridCacheSharedContext ctx; /** */ - protected CacheGroupInfrastructure grp; + protected CacheGroupContext grp; /** */ protected IgniteLogger log; @@ -121,7 +121,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException { + @Override public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException { this.ctx = ctx; this.grp = grp; this.log = ctx.logger(getClass()); @@ -1616,7 +1616,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager private final CacheDataRowStore rowStore; /** */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** * @param grp Ccahe group. @@ -1628,7 +1628,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @throws IgniteCheckedException If failed. */ CacheDataTree( - CacheGroupInfrastructure grp, + CacheGroupContext grp, String name, ReuseList reuseList, CacheDataRowStore rowStore, @@ -1832,7 +1832,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param freeList Free list. * @param partId Partition number. */ - public CacheDataRowStore(CacheGroupInfrastructure grp, FreeList freeList, int partId) { + public CacheDataRowStore(CacheGroupContext grp, FreeList freeList, int partId) { super(grp, freeList); this.partId = partId; @@ -2211,7 +2211,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @return Row. * @throws IgniteCheckedException If failed. */ - PendingRow initKey(CacheGroupInfrastructure grp) throws IgniteCheckedException { + PendingRow initKey(CacheGroupContext grp) throws IgniteCheckedException { CacheDataRowAdapter rowData = new CacheDataRowAdapter(link); rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY); @@ -2234,7 +2234,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager private final static Object WITHOUT_KEY = new Object(); /** */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** * @param grp Cache group. @@ -2246,7 +2246,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @throws IgniteCheckedException If failed. */ PendingEntriesTree( - CacheGroupInfrastructure grp, + CacheGroupContext grp, String name, PageMemory pageMem, long metaPageId, http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index f486601..a25d794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -22,7 +22,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -97,7 +97,7 @@ public class CacheDataRowAdapter implements CacheDataRow { * @param rowData Required row data. * @throws IgniteCheckedException If failed. */ - public final void initFromLink(CacheGroupInfrastructure grp, RowData rowData) throws IgniteCheckedException { + public final void initFromLink(CacheGroupContext grp, RowData rowData) throws IgniteCheckedException { initFromLink(grp, grp.shared(), grp.memoryPolicy().pageMemory(), rowData); } @@ -112,7 +112,7 @@ public class CacheDataRowAdapter implements CacheDataRow { * @throws IgniteCheckedException If failed. */ public final void initFromLink( - @Nullable CacheGroupInfrastructure grp, + @Nullable CacheGroupContext grp, GridCacheSharedContext<?, ?> sharedCtx, PageMemory pageMem, RowData rowData) http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java index bea17c8..0e0faef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.database; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; @@ -44,7 +44,7 @@ public class RowStore { * @param grp Cache group. * @param freeList Free list. */ - public RowStore(CacheGroupInfrastructure grp, FreeList freeList) { + public RowStore(CacheGroupContext grp, FreeList freeList) { assert grp != null; assert freeList != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java index fd8932f..3b41ffa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java @@ -25,7 +25,7 @@ import java.util.NoSuchElementException; import java.util.Set; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -39,12 +39,12 @@ import org.jetbrains.annotations.Nullable; */ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap { /** Cache group. */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** * @param grp Cache group. */ - GridCachePartitionedConcurrentMap(CacheGroupInfrastructure grp) { + GridCachePartitionedConcurrentMap(CacheGroupContext grp) { this.grp = grp; } http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 6ff26b2..71ab741 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -368,7 +368,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - ctx.io().addHandler(false, ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) { processTtlUpdateRequest(req); } @@ -385,21 +385,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** - * @return Cache map entry factory. - */ - @Override protected GridCacheMapEntryFactory entryFactory() { - return new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtCacheEntry(ctx, topVer, key); - } - }; - } - - /** * @return Near cache. */ public abstract GridNearCacheAdapter<K, V> near(); http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 48fb352..27c27e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -38,7 +38,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -76,6 +76,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Key partition. */ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable { + /** */ + private static final GridCacheMapEntryFactory ENTRY_FACTORY = new GridCacheMapEntryFactory() { + @Override public GridCacheMapEntry create( + GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key + ) { + return new GridDhtCacheEntry(ctx, topVer, key); + } + }; + /** Maximum size for delete queue. */ public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); @@ -112,7 +123,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** */ @GridToStringExclude - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** Create time. */ @GridToStringExclude @@ -161,14 +172,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param ctx Context. * @param grp Cache group. * @param id Partition ID. - * @param entryFactory Entry factory. */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") GridDhtLocalPartition(GridCacheSharedContext ctx, - CacheGroupInfrastructure grp, - int id, - GridCacheMapEntryFactory entryFactory) { - super(entryFactory); + CacheGroupContext grp, + int id) { + super(ENTRY_FACTORY); this.id = id; this.ctx = ctx; http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 4de2221..799512f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -42,10 +42,8 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.ClusterState; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -88,7 +86,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private final GridCacheSharedContext ctx; /** */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** Logger. */ private final IgniteLogger log; @@ -123,9 +121,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Lock. */ private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); - /** */ - private final GridCacheMapEntryFactory entryFactory; - /** Partition update counter. */ private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>(); @@ -138,18 +133,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param ctx Cache shared context. * @param grp Cache group. - * @param entryFactory Entry factory. */ public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx, - CacheGroupInfrastructure grp, - GridCacheMapEntryFactory entryFactory) { + CacheGroupContext grp) { assert ctx != null; assert grp != null; - assert entryFactory != null; this.ctx = ctx; this.grp = grp; - this.entryFactory = entryFactory; log = ctx.logger(getClass()); @@ -736,7 +727,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition loc = locParts.get(p); if (loc == null || loc.state() == EVICTED) { - locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); if (ctx.pageStore() != null) { try { @@ -804,7 +795,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); - locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); if (updateSeq) this.updateSeq.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 5fd3111..d607ff1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -119,56 +119,56 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(false, ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @Override public void apply(UUID nodeId, GridNearGetRequest req) { processNearGetRequest(nodeId, req); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { processNearSingleGetRequest(nodeId, req); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { @Override public void apply(UUID nodeId, GridNearLockRequest req) { processNearLockRequest(nodeId, req); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { @Override public void apply(UUID nodeId, GridDhtLockRequest req) { processDhtLockRequest(nodeId, req); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { @Override public void apply(UUID nodeId, GridDhtLockResponse req) { processDhtLockResponse(nodeId, req); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { @Override public void apply(UUID nodeId, GridNearUnlockRequest req) { processNearUnlockRequest(nodeId, req); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) { processDhtUnlockRequest(nodeId, req); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysRequest.class, + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class, new MessageHandler<GridDhtForceKeysRequest>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { processForceKeysRequest(node, msg); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysResponse.class, + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class, new MessageHandler<GridDhtForceKeysResponse>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { processForceKeyResponse(node, msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index fc35000..443b1b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -193,13 +193,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { - assert false : ctx.name(); - - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override protected void init() { super.init(); @@ -233,8 +226,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { metrics = m; - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @@ -248,8 +240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { @@ -263,8 +254,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridNearAtomicAbstractUpdateRequest.class, new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() { @@ -283,8 +273,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { @@ -303,8 +292,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicAbstractUpdateRequest.class, new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { @@ -323,8 +311,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { @@ -343,8 +330,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicDeferredUpdateResponse.class, new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { @@ -363,8 +349,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() { @@ -378,8 +363,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridNearAtomicCheckUpdateRequest.class, new CI2<UUID, GridNearAtomicCheckUpdateRequest>() { @@ -393,14 +377,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysRequest.class, + ctx.io().addCacheHandler( + ctx.cacheId(), + GridDhtForceKeysRequest.class, new MessageHandler<GridDhtForceKeysRequest>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { processForceKeysRequest(node, msg); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysResponse.class, + ctx.io().addCacheHandler( + ctx.cacheId(), + GridDhtForceKeysResponse.class, new MessageHandler<GridDhtForceKeysResponse>() { @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { processForceKeyResponse(node, msg); @@ -408,8 +396,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { }); if (near == null) { - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @@ -423,8 +410,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( - false, + ctx.io().addCacheHandler( ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 576fa63..708df49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -115,29 +115,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { - assert false : ctx.name(); - - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processNearGetResponse(nodeId, res); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { processNearSingleGetResponse(nodeId, res); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index c4c628f..a1b45df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -41,7 +40,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -84,7 +83,7 @@ public class GridDhtPartitionDemander { private final GridCacheSharedContext<?, ?> ctx; /** */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -112,7 +111,7 @@ public class GridDhtPartitionDemander { /** * @param grp Ccahe group. */ - public GridDhtPartitionDemander(CacheGroupInfrastructure grp) { + public GridDhtPartitionDemander(CacheGroupContext grp) { assert grp != null; this.grp = grp; @@ -803,7 +802,7 @@ public class GridDhtPartitionDemander { private final GridCacheSharedContext<?, ?> ctx; /** */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -831,7 +830,7 @@ public class GridDhtPartitionDemander { * @param updateSeq Update sequence. */ RebalanceFuture( - CacheGroupInfrastructure grp, + CacheGroupContext grp, GridDhtPreloaderAssignments assigns, IgniteLogger log, long updateSeq) { http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index ce5f9ea..afdeb8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -27,8 +27,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; @@ -49,7 +48,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh */ class GridDhtPartitionSupplier { /** */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -69,7 +68,7 @@ class GridDhtPartitionSupplier { /** * @param grp Cache group. */ - GridDhtPartitionSupplier(CacheGroupInfrastructure grp) { + GridDhtPartitionSupplier(CacheGroupContext grp) { assert grp != null; this.grp = grp; http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 5d02f3f..f8d4344 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -31,7 +31,7 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -237,7 +237,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - CacheGroupInfrastructure grp = ctx.cache().cacheGroup(grpId); + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); for (CacheEntryInfoCollection col : infos().values()) { List<GridCacheEntryInfo> entries = col.infos(); http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d000e26..0c5cdd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -55,7 +55,7 @@ import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscovery import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; @@ -617,7 +617,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { if (crd != null) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -635,7 +635,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void updateTopologies(boolean crd) throws IgniteCheckedException { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -750,7 +750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (crd != null) { if (crd.isLocal()) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { boolean updateTop = !grp.isLocal() && exchId.topologyVersion().equals(grp.localStartVersion()); @@ -780,7 +780,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { if (centralizedAff) { // Last server node failed. - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { GridAffinityAssignmentCache aff = grp.affinity(); aff.initialize(topologyVersion(), aff.idealAssignment()); @@ -799,7 +799,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert !cctx.kernalContext().clientNode(); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -820,7 +820,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal() || cacheGroupStopping(grp.groupId())) continue; @@ -943,7 +943,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * */ private void onLeft() { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1158,7 +1158,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (err == null && realExchange) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1191,7 +1191,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, CacheValidation> m = new HashMap<>(cctx.cache().cacheGroups().size()); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { Collection<Integer> lostParts = grp.isLocal() ? Collections.<Integer>emptyList() : grp.topology().lostPartitions(); @@ -1224,7 +1224,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initFut.onDone(err == null); if (exchId.isLeft()) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) grp.affinityFunction().removeNode(exchId.nodeId()); } @@ -1260,7 +1260,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return new CacheInvalidStateException( "Failed to perform cache operation (cluster is not activated): " + cctx.name()); - CacheGroupInfrastructure grp = cctx.group(); + CacheGroupContext grp = cctx.group(); PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy(); @@ -1574,7 +1574,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (Thread.currentThread().isInterrupted()) return; - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) grp.topology().detectLostPartitions(discoEvt); } @@ -1589,7 +1589,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (Thread.currentThread().isInterrupted()) return; - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1612,7 +1612,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert crd.isLocal(); if (!crd.equals(discoCache.serverNodes().get(0))) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) grp.topology().beforeExchange(this, !centralizedAff); } @@ -1624,7 +1624,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) { Integer grpId = entry.getKey(); - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = grp != null ? grp.topology() : cctx.exchange().clientTopology(grpId, this); @@ -1707,7 +1707,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void assignPartitionsStates() { if (cctx.database().persistenceEnabled()) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1840,7 +1840,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId); - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null) grp.topology().update(exchId, entry.getValue(), cntrMap); @@ -1863,7 +1863,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = grp != null ? grp.topology() : cctx.exchange().clientTopology(grpId, this); @@ -2018,7 +2018,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<ClusterNode> empty = Collections.emptyList(); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { List<List<ClusterNode>> affAssignment = new ArrayList<>(grp.affinity().partitions()); for (int i = 0; i < grp.affinity().partitions(); i++) http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index e373f4c..a931ef4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -31,7 +31,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; @@ -96,7 +96,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** * @param grp Cache group. */ - public GridDhtPreloader(CacheGroupInfrastructure grp) { + public GridDhtPreloader(CacheGroupContext grp) { super(grp); top = grp.topology(); http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 0c33edc..e6c5c10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -103,7 +103,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 0505dfd..5914ae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -99,8 +99,10 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } } - /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { + /** + * @return Entry factory. + */ + private GridCacheMapEntryFactory entryFactory() { return new GridCacheMapEntryFactory() { @Override public GridCacheMapEntry create( GridCacheContext ctx, http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 0b9a1c8..a691cbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -87,13 +87,13 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } }); - ctx.io().addHandler(false, ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 02f50a8..6252f7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -87,8 +87,10 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { return preldr; } - /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { + /** + * @return Entry factory. + */ + private GridCacheMapEntryFactory entryFactory() { return new GridCacheMapEntryFactory() { @Override public GridCacheMapEntry create( GridCacheContext ctx, http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index bb525bb..b112e1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -104,7 +104,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage assert cctx.config().getCacheMode() != LOCAL; - cctx.io().addHandler(false, cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() { + cctx.io().addCacheHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() { @Override public void apply(UUID nodeId, GridCacheQueryRequest req) { processQueryRequest(nodeId, req); } @@ -560,7 +560,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(false, topic, resHnd); + cctx.io().addOrderedCacheHandler(topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { @@ -744,7 +744,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(false, topic, resHnd); + cctx.io().addOrderedCacheHandler(topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 184e872..f264056 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -127,7 +127,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); if (cctx.affinityNode()) { - cctx.io().addHandler(false, cctx.cacheId(), CacheContinuousQueryBatchAck.class, + cctx.io().addCacheHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, new CI2<UUID, CacheContinuousQueryBatchAck>() { @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index e686252..ba3b2b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -136,68 +136,68 @@ public class IgniteTxHandler { txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY); txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY); - ctx.io().addHandler(false, 0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg); } }); - ctx.io().addHandler(false, 0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg); } }); - ctx.io().addHandler(false, 0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg); } }); - ctx.io().addHandler(false, 0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg); } }); - ctx.io().addHandler(false, 0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg); } }); - ctx.io().addHandler(false, 0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg); } }); - ctx.io().addHandler(false, 0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg); } }); - ctx.io().addHandler(false, 0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg); } }); - ctx.io().addHandler(false, 0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { + ctx.io().addCacheHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg); } }); - ctx.io().addHandler(false, 0, GridCacheTxRecoveryRequest.class, + ctx.io().addCacheHandler(0, GridCacheTxRecoveryRequest.class, new CI2<UUID, GridCacheTxRecoveryRequest>() { @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) { processCheckPreparedTxRequest(nodeId, req); } }); - ctx.io().addHandler(false, 0, GridCacheTxRecoveryResponse.class, + ctx.io().addCacheHandler(0, GridCacheTxRecoveryResponse.class, new CI2<UUID, GridCacheTxRecoveryResponse>() { @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) { processCheckPreparedTxResponse(nodeId, res);