ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/781e33ec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/781e33ec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/781e33ec Branch: refs/heads/ignite-5075 Commit: 781e33ec135cc65f2fbeba502eeba3e0ec56750f Parents: ec6441a Author: sboikov <sboi...@gridgain.com> Authored: Thu Jun 1 14:17:47 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jun 1 17:46:37 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 32 +- .../processors/cache/CacheGroupContext.java | 943 ++++++++++++++++++ .../cache/CacheGroupInfrastructure.java | 951 ------------------- .../processors/cache/ClusterCachesInfo.java | 149 ++- .../processors/cache/GridCacheAdapter.java | 5 - .../processors/cache/GridCacheContext.java | 6 +- .../processors/cache/GridCacheIoManager.java | 42 +- .../GridCachePartitionExchangeManager.java | 62 +- .../cache/GridCachePreloaderAdapter.java | 4 +- .../processors/cache/GridCacheProcessor.java | 44 +- .../cache/IgniteCacheOffheapManager.java | 2 +- .../cache/IgniteCacheOffheapManagerImpl.java | 16 +- .../cache/database/CacheDataRowAdapter.java | 6 +- .../processors/cache/database/RowStore.java | 4 +- .../dht/GridCachePartitionedConcurrentMap.java | 6 +- .../distributed/dht/GridDhtCacheAdapter.java | 17 +- .../distributed/dht/GridDhtLocalPartition.java | 23 +- .../dht/GridDhtPartitionTopologyImpl.java | 19 +- .../dht/GridDhtTransactionalCacheAdapter.java | 18 +- .../dht/atomic/GridDhtAtomicCache.java | 48 +- .../dht/colocated/GridDhtColocatedCache.java | 13 +- .../dht/preloader/GridDhtPartitionDemander.java | 11 +- .../dht/preloader/GridDhtPartitionSupplier.java | 7 +- .../GridDhtPartitionSupplyMessage.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 40 +- .../dht/preloader/GridDhtPreloader.java | 4 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 6 +- .../near/GridNearTransactionalCache.java | 4 +- .../processors/cache/local/GridLocalCache.java | 6 +- .../query/GridCacheDistributedQueryManager.java | 6 +- .../continuous/CacheContinuousQueryManager.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 22 +- .../cluster/GridClusterStateProcessor.java | 2 +- .../processors/cache/IgniteCacheGroupsTest.java | 273 +++++- .../testframework/junits/GridAbstractTest.java | 4 +- 36 files changed, 1582 insertions(+), 1221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index c563b81..dc4a91f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -305,11 +305,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param grp Cache group. */ - void onCacheGroupCreated(CacheGroupInfrastructure grp) { + void onCacheGroupCreated(CacheGroupContext grp) { final Integer grpId = grp.groupId(); if (!grpHolders.containsKey(grp.groupId())) { - cctx.io().addHandler(true, grpId, GridDhtAffinityAssignmentResponse.class, + cctx.io().addCacheGroupHandler(grpId, GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { processAffinityAssignmentResponse(grpId, nodeId, res); @@ -408,7 +408,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (gprs.add(grpId)) { if (crd && lateAffAssign) initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());else { - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) { assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); @@ -425,7 +425,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.cache().blockGateway(req.request()); if (crd) { - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(req.descriptor().groupId()); + CacheGroupContext grp = cctx.cache().cacheGroup(req.descriptor().groupId()); assert grp != null; @@ -780,7 +780,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap c.apply(grp.affinity()); } else { - for (CacheGroupInfrastructure grp : cctx.kernalContext().cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.kernalContext().cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -805,7 +805,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupHolder grpHolder = grpHolders.get(grpId); - CacheGroupInfrastructure grp = cctx.kernalContext().cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.kernalContext().cache().cacheGroup(grpId); if (grpHolder == null) { grpHolder = grp != null ? @@ -1009,7 +1009,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>(); - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1100,7 +1100,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean centralizedAff; if (lateAffAssign) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1131,7 +1131,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { assert !lateAffAssign; - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1164,10 +1164,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap // Need initialize holders and affinity if this node became coordinator during this exchange. final Integer grpId = desc.groupId(); - CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp == null) { - cctx.io().addHandler(true, desc.groupId(), GridDhtAffinityAssignmentResponse.class, + cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { processAffinityAssignmentResponse(grpId, nodeId, res); @@ -1256,10 +1256,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cacheGrp != null) return cacheGrp; - final CacheGroupInfrastructure grp = cctx.cache().cacheGroup(desc.groupId()); + final CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); if (grp == null) { - cctx.io().addHandler(true, desc.groupId(), GridDhtAffinityAssignmentResponse.class, + cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { processAffinityAssignmentResponse(desc.groupId(), nodeId, res); @@ -1294,7 +1294,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); if (!crd) { - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -1713,13 +1713,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private class CacheGroupHolder1 extends CacheGroupHolder { /** */ - private final CacheGroupInfrastructure grp; + private final CacheGroupContext grp; /** * @param grp Cache group. * @param initAff Current affinity. */ - CacheGroupHolder1(CacheGroupInfrastructure grp, @Nullable GridAffinityAssignmentCache initAff) { + CacheGroupHolder1(CacheGroupContext grp, @Nullable GridAffinityAssignmentCache initAff) { super(grp.rebalanceEnabled(), grp.affinity(), initAff); assert !grp.isLocal() : grp; http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java new file mode 100644 index 0000000..4844a55 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -0,0 +1,943 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataPageEvictionMode; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.CacheRebalancingEvent; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.database.MemoryPolicy; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheRebalanceMode.NONE; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; + +/** + * + */ +public class CacheGroupContext { + /** + * Unique group ID. Currently for shared group it is generated as group name hash, + * for non-shared as cache name hash (see {@link ClusterCachesInfo#checkCacheConflict}). + */ + private final int grpId; + + /** Node ID cache group was received from. */ + private final UUID rcvdFrom; + + /** Flag indicating that this cache group is in a recovery mode due to partitions loss. */ + private boolean needsRecovery; + + /** */ + private final AffinityTopologyVersion locStartVer; + + /** */ + private final CacheConfiguration<?, ?> ccfg; + + /** */ + private final GridCacheSharedContext ctx; + + /** */ + private final boolean affNode; + + /** */ + private final CacheType cacheType; + + /** */ + private final byte ioPlc; + + /** */ + private final boolean depEnabled; + + /** */ + private final boolean storeCacheId; + + /** */ + private volatile List<GridCacheContext> caches; + + /** */ + private volatile List<GridCacheContext> contQryCaches; + + /** */ + private final IgniteLogger log; + + /** */ + private GridAffinityAssignmentCache aff; + + /** */ + private GridDhtPartitionTopologyImpl top; + + /** */ + private IgniteCacheOffheapManager offheapMgr; + + /** */ + private GridCachePreloader preldr; + + /** */ + private final MemoryPolicy memPlc; + + /** */ + private final CacheObjectContext cacheObjCtx; + + /** */ + private final FreeList freeList; + + /** */ + private final ReuseList reuseList; + + /** */ + private boolean drEnabled; + + /** */ + private boolean qryEnabled; + + /** + * @param grpId Group ID. + * @param ctx Context. + * @param rcvdFrom Node ID cache group was received from. + * @param cacheType Cache type. + * @param ccfg Cache configuration. + * @param affNode Affinity node flag. + * @param memPlc Memory policy. + * @param cacheObjCtx Cache object context. + * @param freeList Free list. + * @param reuseList Reuse list. + * @param locStartVer Topology version when group was started on local node. + */ + CacheGroupContext( + GridCacheSharedContext ctx, + int grpId, + UUID rcvdFrom, + CacheType cacheType, + CacheConfiguration ccfg, + boolean affNode, + MemoryPolicy memPlc, + CacheObjectContext cacheObjCtx, + FreeList freeList, + ReuseList reuseList, + AffinityTopologyVersion locStartVer) { + assert ccfg != null; + assert memPlc != null || !affNode; + assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; + + this.grpId = grpId; + this.rcvdFrom = rcvdFrom; + this.ctx = ctx; + this.ccfg = ccfg; + this.affNode = affNode; + this.memPlc = memPlc; + this.cacheObjCtx = cacheObjCtx; + this.freeList = freeList; + this.reuseList = reuseList; + this.locStartVer = locStartVer; + this.cacheType = cacheType; + + ioPlc = cacheType.ioPolicy(); + + depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg); + + storeCacheId = affNode && memPlc.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED; + + log = ctx.kernalContext().log(getClass()); + + caches = new ArrayList<>(); + } + + /** + * @return {@code True} if this is cache group for one of system caches. + */ + public boolean systemCache() { + return !sharedGroup() && CU.isSystemCache(ccfg.getName()); + } + + /** + * @return Node ID initiated cache group start. + */ + public UUID receivedFrom() { + return rcvdFrom; + } + + /** + * @return {@code True} if cacheId should be stored in data pages. + */ + public boolean storeCacheIdInDataPage() { + return storeCacheId; + } + + /** + * @return {@code True} if deployment is enabled. + */ + public boolean deploymentEnabled() { + return depEnabled; + } + + /** + * @return Preloader. + */ + public GridCachePreloader preloader() { + return preldr; + } + + /** + * @return IO policy for the given cache group. + */ + public byte ioPolicy() { + return ioPlc; + } + + /** + * @param cctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { + addCacheContext(cctx); + + offheapMgr.onCacheStarted(cctx); + } + + /** + * @param cacheName Cache name. + * @return {@code True} if group contains cache with given name. + */ + public boolean hasCache(String cacheName) { + List<GridCacheContext> caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + if (caches.get(i).name().equals(cacheName)) + return true; + } + + return false; + } + + /** + * @param cctx Cache context. + */ + private void addCacheContext(GridCacheContext cctx) { + assert cacheType.userCache() == cctx.userCache() : cctx.name(); + assert grpId == cctx.groupId() : cctx.name(); + + ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches); + + assert sharedGroup() || caches.isEmpty(); + + boolean add = caches.add(cctx); + + assert add : cctx.name(); + + if (!qryEnabled && QueryUtils.isEnabled(cctx.config())) + qryEnabled = true; + + if (!drEnabled && cctx.isDrEnabled()) + drEnabled = true; + + this.caches = caches; + } + + /** + * @param cctx Cache context. + */ + private void removeCacheContext(GridCacheContext cctx) { + ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches); + + // It is possible cache was not added in case of errors on cache start. + for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) { + GridCacheContext next = it.next(); + + if (next == cctx) { + assert sharedGroup() || caches.size() == 1 : caches.size(); + + it.remove(); + + break; + } + } + + if (QueryUtils.isEnabled(cctx.config())) { + boolean qryEnabled = false; + + for (int i = 0; i < caches.size(); i++) { + if (QueryUtils.isEnabled(caches.get(i).config())) { + qryEnabled = true; + + break; + } + } + + this.qryEnabled = qryEnabled; + } + + if (cctx.isDrEnabled()) { + boolean drEnabled = false; + + for (int i = 0; i < caches.size(); i++) { + if (caches.get(i).isDrEnabled()) { + drEnabled = true; + + break; + } + } + + this.drEnabled = drEnabled; + } + + this.caches = caches; + } + + /** + * @return Cache context if group contains single cache. + */ + public GridCacheContext singleCacheContext() { + List<GridCacheContext> caches = this.caches; + + assert !sharedGroup() && caches.size() == 1; + + return caches.get(0); + } + + /** + * + */ + public void unwindUndeploys() { + List<GridCacheContext> caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.deploy().unwind(cctx); + } + } + + /** + * @param type Event type to check. + * @return {@code True} if given event type should be recorded. + */ + public boolean eventRecordable(int type) { + return cacheType.userCache() && ctx.gridEvents().isRecordable(type); + } + + /** + * Adds rebalancing event. + * + * @param part Partition. + * @param type Event type. + * @param discoNode Discovery node. + * @param discoType Discovery event type. + * @param discoTs Discovery event timestamp. + */ + public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) { + assert discoNode != null; + assert type > 0; + assert discoType > 0; + assert discoTs > 0; + + if (!eventRecordable(type)) + LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type)); + + List<GridCacheContext> caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + if (cctx.recordEvent(type)) { + cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), + cctx.localNode(), + "Cache rebalancing event.", + type, + part, + discoNode, + discoType, + discoTs)); + } + } + } + /** + * Adds partition unload event. + * + * @param part Partition. + */ + public void addUnloadEvent(int part) { + if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) + LT.warn(log, "Added event without checking if event is recordable: " + + U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED)); + + List<GridCacheContext> caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), + cctx.localNode(), + "Cache unloading event.", + EVT_CACHE_REBALANCE_PART_UNLOADED, + part, + null, + 0, + 0)); + } + } + + /** + * @param part Partition. + * @param key Key. + * @param evtNodeId Event node ID. + * @param type Event type. + * @param newVal New value. + * @param hasNewVal Has new value flag. + * @param oldVal Old values. + * @param hasOldVal Has old value flag. + * @param keepBinary Keep binary flag. + */ + public void addCacheEvent( + int part, + KeyCacheObject key, + UUID evtNodeId, + int type, + @Nullable CacheObject newVal, + boolean hasNewVal, + @Nullable CacheObject oldVal, + boolean hasOldVal, + boolean keepBinary + ) { + List<GridCacheContext> caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + cctx.events().addEvent(part, + key, + evtNodeId, + (IgniteUuid)null, + null, + type, + newVal, + hasNewVal, + oldVal, + hasOldVal, + null, + null, + null, + keepBinary); + } + } + + /** + * @return {@code True} if contains cache with query indexing enabled. + */ + public boolean queriesEnabled() { + return qryEnabled; + } + + /** + * @return {@code True} if fast eviction is allowed. + */ + public boolean allowFastEviction() { + return ctx.database().persistenceEnabled() && !queriesEnabled(); + } + + /** + * @return {@code True} in case replication is enabled. + */ + public boolean isDrEnabled() { + return drEnabled; + } + + /** + * @return Free List. + */ + public FreeList freeList() { + return freeList; + } + + /** + * @return Reuse List. + */ + public ReuseList reuseList() { + return reuseList; + } + + /** + * @return Cache object context. + */ + public CacheObjectContext cacheObjectContext() { + return cacheObjCtx; + } + + /** + * @return Cache shared context. + */ + public GridCacheSharedContext shared() { + return ctx; + } + + /** + * @return Memory policy. + */ + public MemoryPolicy memoryPolicy() { + return memPlc; + } + + /** + * @return {@code True} if local node is affinity node. + */ + public boolean affinityNode() { + return affNode; + } + + /** + * @return Topology. + */ + public GridDhtPartitionTopology topology() { + if (top == null) + throw new IllegalStateException("Topology is not initialized: " + name()); + + return top; + } + + /** + * @return Offheap manager. + */ + public IgniteCacheOffheapManager offheap() { + return offheapMgr; + } + + /** + * @return Current cache state. Must only be modified during exchange. + */ + public boolean needsRecovery() { + return needsRecovery; + } + + /** + * @param needsRecovery Needs recovery flag. + */ + public void needsRecovery(boolean needsRecovery) { + this.needsRecovery = needsRecovery; + } + + /** + * @return Topology version when group was started on local node. + */ + public AffinityTopologyVersion localStartVersion() { + return locStartVer; + } + + /** + * @return {@code True} if cache is local. + */ + public boolean isLocal() { + return ccfg.getCacheMode() == LOCAL; + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration config() { + return ccfg; + } + + /** + * @return Cache node filter. + */ + public IgnitePredicate<ClusterNode> nodeFilter() { + return ccfg.getNodeFilter(); + } + + /** + * @return Configured user objects which should be initialized/stopped on group start/stop. + */ + Collection<?> configuredUserObjects() { + return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), ccfg.getTopologyValidator()); + } + + /** + * @return Configured topology validator. + */ + @Nullable public TopologyValidator topologyValidator() { + return ccfg.getTopologyValidator(); + } + + /** + * @return Configured affinity function. + */ + public AffinityFunction affinityFunction() { + return ccfg.getAffinity(); + } + + /** + * @return Affinity. + */ + public GridAffinityAssignmentCache affinity() { + return aff; + } + + /** + * @return Group name or {@code null} if group name was not specified for cache. + */ + @Nullable public String name() { + return ccfg.getGroupName(); + } + + /** + * @return Group name if it is specified, otherwise cache name. + */ + public String cacheOrGroupName() { + return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName(); + } + + /** + * @return Group ID. + */ + public int groupId() { + return grpId; + } + + /** + * @return {@code True} if group can contain multiple caches. + */ + public boolean sharedGroup() { + return ccfg.getGroupName() != null; + } + + /** + * + */ + public void onKernalStop() { + aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping.")); + + preldr.onKernalStop(); + + offheapMgr.onKernalStop(); + } + + /** + * @param cctx Cache context. + * @param destroy Destroy flag. + */ + void stopCache(GridCacheContext cctx, boolean destroy) { + if (top != null) + top.onCacheStopped(cctx.cacheId()); + + offheapMgr.stopCache(cctx.cacheId(), destroy); + + removeCacheContext(cctx); + } + + /** + * + */ + void stopGroup() { + IgniteCheckedException err = + new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); + + aff.cancelFutures(err); + + offheapMgr.stop(); + + ctx.io().removeCacheGroupHandlers(grpId); + } + + /** + * @return IDs of caches in this group. + */ + public Set<Integer> cacheIds() { + List<GridCacheContext> caches = this.caches; + + Set<Integer> ids = U.newHashSet(caches.size()); + + for (int i = 0; i < caches.size(); i++) + ids.add(caches.get(i).cacheId()); + + return ids; + } + + /** + * @return {@code True} if group contains caches. + */ + boolean hasCaches() { + List<GridCacheContext> caches = this.caches; + + return !caches.isEmpty(); + } + + /** + * @param part Partition ID. + */ + public void onPartitionEvicted(int part) { + List<GridCacheContext> caches = this.caches; + + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + if (cctx.isDrEnabled()) + cctx.dr().partitionEvicted(part); + + cctx.continuousQueries().onPartitionEvicted(part); + + cctx.dataStructures().onPartitionEvicted(part); + } + } + + /** + * @param cctx Cache context. + */ + public void addCacheWithContinuousQuery(GridCacheContext cctx) { + assert sharedGroup() : cacheOrGroupName(); + assert cctx.group() == this : cctx.name(); + assert !cctx.isLocal() : cctx.name(); + + synchronized (this) { + List<GridCacheContext> contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + contQryCaches = new ArrayList<>(); + + contQryCaches.add(cctx); + + this.contQryCaches = contQryCaches; + } + } + + /** + * @param cctx Cache context. + */ + public void removeCacheWithContinuousQuery(GridCacheContext cctx) { + assert sharedGroup() : cacheOrGroupName(); + assert cctx.group() == this : cctx.name(); + assert !cctx.isLocal() : cctx.name(); + + synchronized (this) { + List<GridCacheContext> contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + return; + + contQryCaches.remove(cctx); + + if (contQryCaches.isEmpty()) + contQryCaches = null; + + this.contQryCaches = contQryCaches; + } + } + + /** + * @param cacheId ID of cache initiated counter update. + * @param part Partition number. + * @param cntr Counter. + * @param topVer Topology version for current operation. + */ + public void onPartitionCounterUpdate(int cacheId, + int part, + long cntr, + AffinityTopologyVersion topVer, + boolean primary) { + assert sharedGroup(); + + if (isLocal()) + return; + + List<GridCacheContext> contQryCaches = this.contQryCaches; + + if (contQryCaches == null) + return; + + CounterSkipContext skipCtx = null; + + for (int i = 0; i < contQryCaches.size(); i++) { + GridCacheContext cctx = contQryCaches.get(i); + + if (cacheId != cctx.cacheId()) + skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary); + } + + final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() : null; + + if (procC != null) { + ctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + for (Runnable c : procC) + c.run(); + } + }); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void start() throws IgniteCheckedException { + aff = new GridAffinityAssignmentCache(ctx.kernalContext(), + cacheOrGroupName(), + grpId, + ccfg.getAffinity(), + ccfg.getNodeFilter(), + ccfg.getBackups(), + ccfg.getCacheMode() == LOCAL); + + if (ccfg.getCacheMode() != LOCAL) { + top = new GridDhtPartitionTopologyImpl(ctx, this); + + if (!ctx.kernalContext().clientNode()) { + ctx.io().addCacheGroupHandler(groupId(), GridDhtAffinityAssignmentRequest.class, + new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() { + @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) { + processAffinityAssignmentRequest(nodeId, msg); + } + }); + } + + preldr = new GridDhtPreloader(this); + + preldr.start(); + } + else + preldr = new GridCachePreloaderAdapter(this); + + offheapMgr = new IgniteCacheOffheapManagerImpl(); + + offheapMgr.start(ctx, this); + + ctx.affinity().onCacheGroupCreated(this); + } + + /** + * @param nodeId Node ID. + * @param req Request. + */ + private void processAffinityAssignmentRequest(final UUID nodeId, + final GridDhtAffinityAssignmentRequest req) { + if (log.isDebugEnabled()) + log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']'); + + IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(req.topologyVersion()); + + if (fut != null) { + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + processAffinityAssignmentRequest0(nodeId, req); + } + }); + } + else + processAffinityAssignmentRequest0(nodeId, req); + } + + /** + * @param nodeId Node ID. + * @param req Request. + */ + private void processAffinityAssignmentRequest0(UUID nodeId, final GridDhtAffinityAssignmentRequest req) { + AffinityTopologyVersion topVer = req.topologyVersion(); + + if (log.isDebugEnabled()) + log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + + ", node=" + nodeId + ']'); + + AffinityAssignment assignment = aff.cachedAffinity(topVer); + + GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse( + req.futureId(), + grpId, + topVer, + assignment.assignment()); + + if (aff.centralizedAffinityFunction()) { + assert assignment.idealAssignment() != null; + + res.idealAffinityAssignment(assignment.idealAssignment()); + } + + try { + ctx.io().send(nodeId, res, AFFINITY_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send affinity assignment response to remote node [node=" + nodeId + ']', e); + } + } + + /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture reconnectFut) { + IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, + "Failed to wait for topology update, client disconnected."); + + if (aff != null) + aff.cancelFutures(err); + } + + /** + * @return {@code True} if rebalance is enabled. + */ + public boolean rebalanceEnabled() { + return ccfg.getRebalanceMode() != NONE; + } + + /** + * + */ + public void onReconnected() { + aff.onReconnected(); + + if (top != null) + top.onReconnected(); + + preldr.onReconnected(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheGroupContext [grp=" + cacheOrGroupName() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java deleted file mode 100644 index 5278e4f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ /dev/null @@ -1,951 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataPageEvictionMode; -import org.apache.ignite.configuration.TopologyValidator; -import org.apache.ignite.events.CacheRebalancingEvent; -import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.affinity.AffinityAssignment; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; -import org.apache.ignite.internal.processors.cache.database.MemoryPolicy; -import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; -import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; -import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext; -import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.cache.CacheMode.LOCAL; -import static org.apache.ignite.cache.CacheRebalanceMode.NONE; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; - -/** - * - */ -public class CacheGroupInfrastructure { - /** Group ID. */ - private final int grpId; - - /** Node ID cache group was received from. */ - private final UUID rcvdFrom; - - /** Flag indicating that this cache group is in a recovery mode due to partitions loss. */ - private boolean needsRecovery; - - /** */ - private final AffinityTopologyVersion locStartVer; - - /** */ - private final CacheConfiguration<?, ?> ccfg; - - /** */ - private final GridCacheSharedContext ctx; - - /** */ - private final boolean affNode; - - /** */ - private final CacheType cacheType; - - /** */ - private final byte ioPlc; - - /** */ - private final boolean depEnabled; - - /** */ - private final boolean storeCacheId; - - /** */ - private volatile List<GridCacheContext> caches; - - /** */ - private volatile List<GridCacheContext> contQryCaches; - - /** */ - private final IgniteLogger log; - - /** */ - private GridAffinityAssignmentCache aff; - - /** */ - private GridDhtPartitionTopologyImpl top; - - /** */ - private IgniteCacheOffheapManager offheapMgr; - - /** */ - private GridCachePreloader preldr; - - /** */ - private final MemoryPolicy memPlc; - - /** */ - private final CacheObjectContext cacheObjCtx; - - /** */ - private final FreeList freeList; - - /** */ - private final ReuseList reuseList; - - /** */ - private boolean drEnabled; - - /** */ - private boolean qryEnabled; - - /** - * @param grpId Group ID. - * @param ctx Context. - * @param rcvdFrom Node ID cache group was received from. - * @param cacheType Cache type. - * @param ccfg Cache configuration. - * @param affNode Affinity node flag. - * @param memPlc Memory policy. - * @param cacheObjCtx Cache object context. - * @param freeList Free list. - * @param reuseList Reuse list. - * @param locStartVer Topology version when group was started on local node. - */ - CacheGroupInfrastructure( - GridCacheSharedContext ctx, - int grpId, - UUID rcvdFrom, - CacheType cacheType, - CacheConfiguration ccfg, - boolean affNode, - MemoryPolicy memPlc, - CacheObjectContext cacheObjCtx, - FreeList freeList, - ReuseList reuseList, - AffinityTopologyVersion locStartVer) { - assert ccfg != null; - assert memPlc != null || !affNode; - assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; - - this.grpId = grpId; - this.rcvdFrom = rcvdFrom; - this.ctx = ctx; - this.ccfg = ccfg; - this.affNode = affNode; - this.memPlc = memPlc; - this.cacheObjCtx = cacheObjCtx; - this.freeList = freeList; - this.reuseList = reuseList; - this.locStartVer = locStartVer; - this.cacheType = cacheType; - - ioPlc = cacheType.ioPolicy(); - - depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg); - - storeCacheId = affNode && memPlc.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED; - - log = ctx.kernalContext().log(getClass()); - - caches = new ArrayList<>(); - } - - /** - * @return {@code True} if this is cache group for one of system caches. - */ - public boolean systemCache() { - return !sharedGroup() && CU.isSystemCache(ccfg.getName()); - } - - /** - * @return Node ID initiated cache group start. - */ - public UUID receivedFrom() { - return rcvdFrom; - } - - /** - * @return {@code True} if cacheId should be stored in data pages. - */ - public boolean storeCacheIdInDataPage() { - return storeCacheId; - } - - /** - * @return {@code True} if deployment is enabled. - */ - public boolean deploymentEnabled() { - return depEnabled; - } - - /** - * @return Preloader. - */ - public GridCachePreloader preloader() { - return preldr; - } - - /** - * @return IO policy for the given cache group. - */ - public byte ioPolicy() { - return ioPlc; - } - - /** - * @param cctx Cache context. - * @throws IgniteCheckedException If failed. - */ - void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { - addCacheContext(cctx); - - offheapMgr.onCacheStarted(cctx); - } - - /** - * @param cacheName Cache name. - * @return {@code True} if group contains cache with given name. - */ - public boolean hasCache(String cacheName) { - List<GridCacheContext> caches = this.caches; - - for (int i = 0; i < caches.size(); i++) { - if (caches.get(i).name().equals(cacheName)) - return true; - } - - return false; - } - - /** - * @param cctx Cache context. - */ - private void addCacheContext(GridCacheContext cctx) { - assert cacheType.userCache() == cctx.userCache() : cctx.name(); - assert grpId == cctx.groupId() : cctx.name(); - - ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches); - - assert sharedGroup() || caches.isEmpty(); - - boolean add = caches.add(cctx); - - assert add : cctx.name(); - - if (!qryEnabled && QueryUtils.isEnabled(cctx.config())) - qryEnabled = true; - - if (!drEnabled && cctx.isDrEnabled()) - drEnabled = true; - - this.caches = caches; - } - - /** - * @param cctx Cache context. - */ - private void removeCacheContext(GridCacheContext cctx) { - ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches); - - // It is possible cache was not added in case of errors on cache start. - for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) { - GridCacheContext next = it.next(); - - if (next == cctx) { - assert sharedGroup() || caches.size() == 1 : caches.size(); - - it.remove(); - - break; - } - } - - if (QueryUtils.isEnabled(cctx.config())) { - boolean qryEnabled = false; - - for (int i = 0; i < caches.size(); i++) { - if (QueryUtils.isEnabled(caches.get(i).config())) { - qryEnabled = true; - - break; - } - } - - this.qryEnabled = qryEnabled; - } - - if (cctx.isDrEnabled()) { - boolean drEnabled = false; - - for (int i = 0; i < caches.size(); i++) { - if (caches.get(i).isDrEnabled()) { - drEnabled = true; - - break; - } - } - - this.drEnabled = drEnabled; - } - - this.caches = caches; - } - - /** - * @return Cache context if group contains single cache. - */ - public GridCacheContext singleCacheContext() { - List<GridCacheContext> caches = this.caches; - - assert !sharedGroup() && caches.size() == 1; - - return caches.get(0); - } - - /** - * - */ - public void unwindUndeploys() { - List<GridCacheContext> caches = this.caches; - - for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = caches.get(i); - - cctx.deploy().unwind(cctx); - } - } - - /** - * @param type Event type to check. - * @return {@code True} if given event type should be recorded. - */ - public boolean eventRecordable(int type) { - return cacheType.userCache() && ctx.gridEvents().isRecordable(type); - } - - /** - * Adds rebalancing event. - * - * @param part Partition. - * @param type Event type. - * @param discoNode Discovery node. - * @param discoType Discovery event type. - * @param discoTs Discovery event timestamp. - */ - public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) { - assert discoNode != null; - assert type > 0; - assert discoType > 0; - assert discoTs > 0; - - if (!eventRecordable(type)) - LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type)); - - List<GridCacheContext> caches = this.caches; - - for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = caches.get(i); - - if (cctx.recordEvent(type)) { - cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), - cctx.localNode(), - "Cache rebalancing event.", - type, - part, - discoNode, - discoType, - discoTs)); - } - } - } - /** - * Adds partition unload event. - * - * @param part Partition. - */ - public void addUnloadEvent(int part) { - if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) - LT.warn(log, "Added event without checking if event is recordable: " + - U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED)); - - List<GridCacheContext> caches = this.caches; - - for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = caches.get(i); - - cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), - cctx.localNode(), - "Cache unloading event.", - EVT_CACHE_REBALANCE_PART_UNLOADED, - part, - null, - 0, - 0)); - } - } - - /** - * @param part Partition. - * @param key Key. - * @param evtNodeId Event node ID. - * @param type Event type. - * @param newVal New value. - * @param hasNewVal Has new value flag. - * @param oldVal Old values. - * @param hasOldVal Has old value flag. - * @param keepBinary Keep binary flag. - */ - public void addCacheEvent( - int part, - KeyCacheObject key, - UUID evtNodeId, - int type, - @Nullable CacheObject newVal, - boolean hasNewVal, - @Nullable CacheObject oldVal, - boolean hasOldVal, - boolean keepBinary - ) { - List<GridCacheContext> caches = this.caches; - - for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = caches.get(i); - - cctx.events().addEvent(part, - key, - evtNodeId, - (IgniteUuid)null, - null, - type, - newVal, - hasNewVal, - oldVal, - hasOldVal, - null, - null, - null, - keepBinary); - } - } - - /** - * @return {@code True} if contains cache with query indexing enabled. - */ - public boolean queriesEnabled() { - return qryEnabled; - } - - /** - * @return {@code True} if fast eviction is allowed. - */ - public boolean allowFastEviction() { - return ctx.database().persistenceEnabled() && !queriesEnabled(); - } - - /** - * @return {@code True} in case replication is enabled. - */ - public boolean isDrEnabled() { - return drEnabled; - } - - /** - * @return Free List. - */ - public FreeList freeList() { - return freeList; - } - - /** - * @return Reuse List. - */ - public ReuseList reuseList() { - return reuseList; - } - - /** - * @return Cache object context. - */ - public CacheObjectContext cacheObjectContext() { - return cacheObjCtx; - } - - /** - * @return Cache shared context. - */ - public GridCacheSharedContext shared() { - return ctx; - } - - /** - * @return Memory policy. - */ - public MemoryPolicy memoryPolicy() { - return memPlc; - } - - /** - * @return {@code True} if local node is affinity node. - */ - public boolean affinityNode() { - return affNode; - } - - /** - * @return Topology. - */ - public GridDhtPartitionTopology topology() { - if (top == null) - throw new IllegalStateException("Topology is not initialized: " + name()); - - return top; - } - - /** - * @return Offheap manager. - */ - public IgniteCacheOffheapManager offheap() { - return offheapMgr; - } - - /** - * @return Current cache state. Must only be modified during exchange. - */ - public boolean needsRecovery() { - return needsRecovery; - } - - /** - * @param needsRecovery Needs recovery flag. - */ - public void needsRecovery(boolean needsRecovery) { - this.needsRecovery = needsRecovery; - } - - /** - * @return Topology version when group was started on local node. - */ - public AffinityTopologyVersion localStartVersion() { - return locStartVer; - } - - /** - * @return {@code True} if cache is local. - */ - public boolean isLocal() { - return ccfg.getCacheMode() == LOCAL; - } - - /** - * @return Cache configuration. - */ - public CacheConfiguration config() { - return ccfg; - } - - /** - * @return Cache node filter. - */ - public IgnitePredicate<ClusterNode> nodeFilter() { - return ccfg.getNodeFilter(); - } - - /** - * @return Configured user objects which should be initialized/stopped on group start/stop. - */ - Collection<?> configuredUserObjects() { - return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), ccfg.getTopologyValidator()); - } - - /** - * @return Configured topology validator. - */ - @Nullable public TopologyValidator topologyValidator() { - return ccfg.getTopologyValidator(); - } - - /** - * @return Configured affinity function. - */ - public AffinityFunction affinityFunction() { - return ccfg.getAffinity(); - } - - /** - * @return Affinity. - */ - public GridAffinityAssignmentCache affinity() { - return aff; - } - - /** - * @return Group name or {@code null} if group name was not specified for cache. - */ - @Nullable public String name() { - return ccfg.getGroupName(); - } - - /** - * @return Group name if it is specified, otherwise cache name. - */ - public String cacheOrGroupName() { - return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName(); - } - - /** - * @return Group ID. - */ - public int groupId() { - return grpId; - } - - /** - * @return {@code True} if group can contain multiple caches. - */ - public boolean sharedGroup() { - return ccfg.getGroupName() != null; - } - - /** - * - */ - public void onKernalStop() { - aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping.")); - - preldr.onKernalStop(); - - offheapMgr.onKernalStop(); - } - - /** - * @param cctx Cache context. - * @param destroy Destroy flag. - */ - void stopCache(GridCacheContext cctx, boolean destroy) { - if (top != null) - top.onCacheStopped(cctx.cacheId()); - - offheapMgr.stopCache(cctx.cacheId(), destroy); - - removeCacheContext(cctx); - } - - /** - * - */ - void stopGroup() { - IgniteCheckedException err = - new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); - - aff.cancelFutures(err); - - offheapMgr.stop(); - - ctx.io().removeCacheGroupHandlers(grpId); - } - - /** - * @return IDs of caches in this group. - */ - public Set<Integer> cacheIds() { - List<GridCacheContext> caches = this.caches; - - Set<Integer> ids = U.newHashSet(caches.size()); - - for (int i = 0; i < caches.size(); i++) - ids.add(caches.get(i).cacheId()); - - return ids; - } - - /** - * @return {@code True} if group contains caches. - */ - boolean hasCaches() { - List<GridCacheContext> caches = this.caches; - - return !caches.isEmpty(); - } - - /** - * @param part Partition ID. - */ - public void onPartitionEvicted(int part) { - List<GridCacheContext> caches = this.caches; - - for (int i = 0; i < caches.size(); i++) { - GridCacheContext cctx = caches.get(i); - - if (cctx.isDrEnabled()) - cctx.dr().partitionEvicted(part); - - cctx.continuousQueries().onPartitionEvicted(part); - - cctx.dataStructures().onPartitionEvicted(part); - } - } - - /** - * @param cctx Cache context. - */ - public void addCacheWithContinuousQuery(GridCacheContext cctx) { - assert sharedGroup() : cacheOrGroupName(); - assert cctx.group() == this : cctx.name(); - assert !cctx.isLocal() : cctx.name(); - - synchronized (this) { - List<GridCacheContext> contQryCaches = this.contQryCaches; - - if (contQryCaches == null) - contQryCaches = new ArrayList<>(); - - contQryCaches.add(cctx); - - this.contQryCaches = contQryCaches; - } - } - - /** - * @param cctx Cache context. - */ - public void removeCacheWithContinuousQuery(GridCacheContext cctx) { - assert sharedGroup() : cacheOrGroupName(); - assert cctx.group() == this : cctx.name(); - assert !cctx.isLocal() : cctx.name(); - - synchronized (this) { - List<GridCacheContext> contQryCaches = this.contQryCaches; - - if (contQryCaches == null) - return; - - contQryCaches.remove(cctx); - - if (contQryCaches.isEmpty()) - contQryCaches = null; - - this.contQryCaches = contQryCaches; - } - } - - /** - * @param cacheId ID of cache initiated counter update. - * @param part Partition number. - * @param cntr Counter. - * @param topVer Topology version for current operation. - */ - public void onPartitionCounterUpdate(int cacheId, - int part, - long cntr, - AffinityTopologyVersion topVer, - boolean primary) { - assert sharedGroup(); - - if (isLocal()) - return; - - List<GridCacheContext> contQryCaches = this.contQryCaches; - - if (contQryCaches == null) - return; - - CounterSkipContext skipCtx = null; - - for (int i = 0; i < contQryCaches.size(); i++) { - GridCacheContext cctx = contQryCaches.get(i); - - if (cacheId != cctx.cacheId()) - skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary); - } - - final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() : null; - - if (procC != null) { - ctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - for (Runnable c : procC) - c.run(); - } - }); - } - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void start() throws IgniteCheckedException { - aff = new GridAffinityAssignmentCache(ctx.kernalContext(), - cacheOrGroupName(), - grpId, - ccfg.getAffinity(), - ccfg.getNodeFilter(), - ccfg.getBackups(), - ccfg.getCacheMode() == LOCAL); - - if (ccfg.getCacheMode() != LOCAL) { - GridCacheMapEntryFactory entryFactory = new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtCacheEntry(ctx, topVer, key); - } - }; - - top = new GridDhtPartitionTopologyImpl(ctx, this, entryFactory); - - if (!ctx.kernalContext().clientNode()) { - ctx.io().addHandler(true, groupId(), GridDhtAffinityAssignmentRequest.class, - new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() { - @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) { - processAffinityAssignmentRequest(nodeId, msg); - } - }); - } - - preldr = new GridDhtPreloader(this); - - preldr.start(); - } - else - preldr = new GridCachePreloaderAdapter(this); - - offheapMgr = new IgniteCacheOffheapManagerImpl(); - - offheapMgr.start(ctx, this); - - ctx.affinity().onCacheGroupCreated(this); - } - - /** - * @param nodeId Node ID. - * @param req Request. - */ - private void processAffinityAssignmentRequest(final UUID nodeId, - final GridDhtAffinityAssignmentRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']'); - - IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(req.topologyVersion()); - - if (fut != null) { - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - processAffinityAssignmentRequest0(nodeId, req); - } - }); - } - else - processAffinityAssignmentRequest0(nodeId, req); - } - - /** - * @param nodeId Node ID. - * @param req Request. - */ - private void processAffinityAssignmentRequest0(UUID nodeId, final GridDhtAffinityAssignmentRequest req) { - AffinityTopologyVersion topVer = req.topologyVersion(); - - if (log.isDebugEnabled()) - log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + - ", node=" + nodeId + ']'); - - AffinityAssignment assignment = aff.cachedAffinity(topVer); - - GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse( - req.futureId(), - grpId, - topVer, - assignment.assignment()); - - if (aff.centralizedAffinityFunction()) { - assert assignment.idealAssignment() != null; - - res.idealAffinityAssignment(assignment.idealAssignment()); - } - - try { - ctx.io().send(nodeId, res, AFFINITY_POOL); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send affinity assignment response to remote node [node=" + nodeId + ']', e); - } - } - - /** - * @param reconnectFut Reconnect future. - */ - public void onDisconnected(IgniteFuture reconnectFut) { - IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, - "Failed to wait for topology update, client disconnected."); - - if (aff != null) - aff.cancelFutures(err); - } - - /** - * @return {@code True} if rebalance is enabled. - */ - public boolean rebalanceEnabled() { - return ccfg.getRebalanceMode() != NONE; - } - - /** - * - */ - public void onReconnected() { - aff.onReconnected(); - - if (top != null) - top.onReconnected(); - - preldr.onReconnected(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CacheGroupInfrastructure [grp=" + cacheOrGroupName() + ']'; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 58b6a60..08c08c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.query.QuerySchema; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.DiscoveryDataBag; @@ -79,7 +80,7 @@ class ClusterCachesInfo { private CacheJoinNodeDiscoveryData joinDiscoData; /** */ - private CacheNodeCommonDiscoveryData gridData; + private GridData gridData; /** */ private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches; @@ -117,7 +118,21 @@ class ClusterCachesInfo { validateCacheGroupConfiguration(ccfg, info.config()); } - processJoiningNode(joinDiscoData, ctx.localNodeId()); + String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true); + + if (conflictErr != null) + throw new IgniteCheckedException("Failed to start configured cache. " + conflictErr); + } + + /** + * @param cacheName Cache name. + * @param grpName Group name. + * @return Group ID. + */ + private int cacheGroupId(String cacheName, @Nullable String grpName) { + assert cacheName != null; + + return grpName != null ? CU.cacheId(grpName) : CU.cacheId(cacheName); } /** @@ -125,11 +140,14 @@ class ClusterCachesInfo { * @throws IgniteCheckedException If failed. */ void onKernalStart(boolean checkConsistency) throws IgniteCheckedException { + if (gridData != null && gridData.conflictErr != null) + throw new IgniteCheckedException(gridData.conflictErr); + if (checkConsistency && joinDiscoData != null && gridData != null) { for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) { CacheConfiguration locCfg = locCacheInfo.config(); - CacheData cacheData = gridData.caches().get(locCfg.getName()); + CacheData cacheData = gridData.gridData.caches().get(locCfg.getName()); if (cacheData != null) checkCache(locCacheInfo, cacheData, cacheData.receivedFrom()); @@ -305,6 +323,17 @@ class ClusterCachesInfo { if (req.start()) { if (desc == null) { + String conflictErr = checkCacheConflict(req.startCacheConfiguration()); + + if (conflictErr != null) { + U.warn(log, "Ignore cache start request. " + conflictErr); + + ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + + "cache. " + conflictErr)); + + continue; + } + if (req.clientStartOnly()) { ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + "client cache (a cache with the given name is not started): " + req.cacheName())); @@ -543,7 +572,7 @@ class ClusterCachesInfo { Map<Integer, CacheGroupDescriptor> grps = cachesOnDisconnect.cacheGrps; Map<String, DynamicCacheDescriptor> caches = cachesOnDisconnect.caches; - for (CacheGroupInfrastructure grp : ctx.cache().cacheGroups()) { + for (CacheGroupContext grp : ctx.cache().cacheGroups()) { CacheGroupDescriptor desc = grps.get(grp.groupId()); assert desc != null : grp.cacheOrGroupName(); @@ -815,7 +844,24 @@ class ClusterCachesInfo { } } - gridData = cachesData; + String conflictErr = null; + + if (joinDiscoData != null) { + for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) { + if (!registeredCaches.containsKey(e.getKey())) { + conflictErr = checkCacheConflict(e.getValue().config()); + + if (conflictErr != null) { + conflictErr = "Failed to start configured cache due to conflict with started caches. " + + conflictErr; + + break; + } + } + } + } + + gridData = new GridData(cachesData, conflictErr); if (!disconnectedState()) initStartCachesForLocalJoin(false); @@ -894,7 +940,7 @@ class ClusterCachesInfo { processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId()); } else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) - processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId()); + processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), false); } } @@ -918,10 +964,59 @@ class ClusterCachesInfo { } /** + * @param cfg Cache configuration. + * @return {@code True} if validation passed. + */ + private String checkCacheConflict(CacheConfiguration<?, ?> cfg) { + int cacheId = CU.cacheId(cfg.getName()); + + if (cacheGroupByName(cfg.getName()) != null) + return "Cache name conflict with existing cache group (change cache name) [cacheName=" + cfg.getName() + ']'; + + if (cfg.getGroupName() != null) { + DynamicCacheDescriptor desc = registeredCaches.get(cfg.getGroupName()); + + if (desc != null) + return "Cache group name conflict with existing cache (change group name) [cacheName=" + cfg.getName() + + ", conflictingCacheName=" + desc.cacheName() + ']'; + } + + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (desc.cacheId() == cacheId) + return "Cache ID conflict (change cache name) [cacheName=" + cfg.getName() + + ", conflictingCacheName=" + desc.cacheName() + ']'; + } + + int grpId = cacheGroupId(cfg.getName(), cfg.getGroupName()); + + if (cfg.getGroupName() != null) { + if (cacheGroupByName(cfg.getGroupName()) == null) { + CacheGroupDescriptor desc = registeredCacheGrps.get(grpId); + + if (desc != null) + return "Cache group ID conflict (change cache group name) [cacheName=" + cfg.getName() + + ", groupName=" + cfg.getGroupName() + + (desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']'; + } + } + else { + CacheGroupDescriptor desc = registeredCacheGrps.get(grpId); + + if (desc != null) + return "Cache group ID conflict (change cache name) [cacheName=" + cfg.getName() + + (desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']'; + } + + return null; + } + + /** * @param joinData Joined node discovery data. * @param nodeId Joined node ID. + * @param locJoin {@code True} if called on local node join. + * @return Configuration conflict error. */ - private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) { + private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, boolean locJoin) { for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { CacheConfiguration<?, ?> cfg = cacheInfo.config(); @@ -947,6 +1042,17 @@ class ClusterCachesInfo { CacheConfiguration<?, ?> cfg = cacheInfo.config(); if (!registeredCaches.containsKey(cfg.getName())) { + String conflictErr = checkCacheConflict(cfg); + + if (conflictErr != null) { + if (locJoin) + return conflictErr; + + U.warn(log, "Ignore cache received from joining node. " + conflictErr); + + continue; + } + int cacheId = CU.cacheId(cfg.getName()); CacheGroupDescriptor grpDesc = registerCacheGroup(null, @@ -987,6 +1093,8 @@ class ClusterCachesInfo { desc.cacheConfiguration().getNearConfiguration() != null); } } + + return null; } /** @@ -1045,12 +1153,7 @@ class ClusterCachesInfo { } } - int grpId; - - if (startedCacheCfg.getGroupName() == null) - grpId = CU.cacheId(startedCacheCfg.getName()); - else - grpId = CU.cacheId("grp#" + startedCacheCfg.getGroupName()); + int grpId = cacheGroupId(startedCacheCfg.getName(), startedCacheCfg.getGroupName()); Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId); @@ -1266,6 +1369,26 @@ class ClusterCachesInfo { /** * */ + static class GridData { + /** */ + private final CacheNodeCommonDiscoveryData gridData; + + /** */ + private final String conflictErr; + + /** + * @param gridData Grid data. + * @param conflictErr Cache configuration conflict error. + */ + GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) { + this.gridData = gridData; + this.conflictErr = conflictErr; + } + } + + /** + * + */ private static class CachesOnDisconnect { /** */ final Map<Integer, CacheGroupDescriptor> cacheGrps; http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 153adf3..75584d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -540,11 +540,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @return Entry factory. - */ - protected abstract GridCacheMapEntryFactory entryFactory(); - - /** * Starts this cache. Child classes should override this method * to provide custom start-up behavior. * http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 50e0121..839ddbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -135,7 +135,7 @@ public class GridCacheContext<K, V> implements Externalizable { private GridCacheSharedContext<K, V> sharedCtx; /** Cache group. */ - private CacheGroupInfrastructure grp; + private CacheGroupContext grp; /** Logger. */ private IgniteLogger log; @@ -284,7 +284,7 @@ public class GridCacheContext<K, V> implements Externalizable { GridKernalContext ctx, GridCacheSharedContext sharedCtx, CacheConfiguration cacheCfg, - CacheGroupInfrastructure grp, + CacheGroupContext grp, CacheType cacheType, AffinityTopologyVersion locStartTopVer, boolean affNode, @@ -382,7 +382,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @return Cache group. */ - public CacheGroupInfrastructure group() { + public CacheGroupContext group() { return grp; } http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 8556c87..a251047 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -1193,19 +1193,31 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** - * Adds message handler. - * - * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. * @param hndId Message handler ID. * @param type Type of message. * @param c Handler. */ - public void addHandler( - boolean cacheGrp, + public void addCacheHandler( int hndId, Class<? extends GridCacheMessage> type, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { - addHandler(hndId, type, c, cacheGrp ? grpHandlers : cacheHandlers); + assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type; + + addHandler(hndId, type, c, cacheHandlers); + } + + /** + * @param hndId Message handler ID. + * @param type Type of message. + * @param c Handler. + */ + public void addCacheGroupHandler( + int hndId, + Class<? extends GridCacheGroupIdMessage> type, + IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + assert !type.isAssignableFrom(GridCacheIdMessage.class) : type; + + addHandler(hndId, type, c, grpHandlers); } /** @@ -1321,6 +1333,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param topic Topic. + * @param c Handler. + */ + public void addOrderedCacheHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) { + addOrderedHandler(false, topic, c); + } + + /** + * @param topic Topic. + * @param c Handler. + */ + public void addOrderedCacheGroupHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) { + addOrderedHandler(true, topic, c); + } + + /** * Adds ordered message handler. * * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. @@ -1328,7 +1356,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @param c Handler. */ @SuppressWarnings({"unchecked"}) - public void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + private void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; IgniteLogger log0 = log;