ignite-5075

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/781e33ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/781e33ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/781e33ec

Branch: refs/heads/ignite-5075
Commit: 781e33ec135cc65f2fbeba502eeba3e0ec56750f
Parents: ec6441a
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Jun 1 14:17:47 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Jun 1 17:46:37 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  32 +-
 .../processors/cache/CacheGroupContext.java     | 943 ++++++++++++++++++
 .../cache/CacheGroupInfrastructure.java         | 951 -------------------
 .../processors/cache/ClusterCachesInfo.java     | 149 ++-
 .../processors/cache/GridCacheAdapter.java      |   5 -
 .../processors/cache/GridCacheContext.java      |   6 +-
 .../processors/cache/GridCacheIoManager.java    |  42 +-
 .../GridCachePartitionExchangeManager.java      |  62 +-
 .../cache/GridCachePreloaderAdapter.java        |   4 +-
 .../processors/cache/GridCacheProcessor.java    |  44 +-
 .../cache/IgniteCacheOffheapManager.java        |   2 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  16 +-
 .../cache/database/CacheDataRowAdapter.java     |   6 +-
 .../processors/cache/database/RowStore.java     |   4 +-
 .../dht/GridCachePartitionedConcurrentMap.java  |   6 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  17 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  23 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  19 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  18 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  48 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  13 +-
 .../dht/preloader/GridDhtPartitionDemander.java |  11 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |   7 +-
 .../GridDhtPartitionSupplyMessage.java          |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        |  40 +-
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 .../distributed/near/GridNearAtomicCache.java   |   2 +-
 .../distributed/near/GridNearCacheAdapter.java  |   6 +-
 .../near/GridNearTransactionalCache.java        |   4 +-
 .../processors/cache/local/GridLocalCache.java  |   6 +-
 .../query/GridCacheDistributedQueryManager.java |   6 +-
 .../continuous/CacheContinuousQueryManager.java |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |  22 +-
 .../cluster/GridClusterStateProcessor.java      |   2 +-
 .../processors/cache/IgniteCacheGroupsTest.java | 273 +++++-
 .../testframework/junits/GridAbstractTest.java  |   4 +-
 36 files changed, 1582 insertions(+), 1221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index c563b81..dc4a91f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -305,11 +305,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     /**
      * @param grp Cache group.
      */
-    void onCacheGroupCreated(CacheGroupInfrastructure grp) {
+    void onCacheGroupCreated(CacheGroupContext grp) {
         final Integer grpId = grp.groupId();
 
         if (!grpHolders.containsKey(grp.groupId())) {
-            cctx.io().addHandler(true, grpId, 
GridDhtAffinityAssignmentResponse.class,
+            cctx.io().addCacheGroupHandler(grpId, 
GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
                         processAffinityAssignmentResponse(grpId, nodeId, res);
@@ -408,7 +408,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     if (gprs.add(grpId)) {
                         if (crd && lateAffAssign)
                     initStartedGroupOnCoordinator(fut, 
action.descriptor().groupDescriptor());else  {
-                        CacheGroupInfrastructure grp = 
cctx.cache().cacheGroup(grpId);
+                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                         if (grp != null && !grp.isLocal() && 
grp.localStartVersion().equals(fut.topologyVersion())) {
                         assert 
grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : 
grp.affinity().lastVersion();
@@ -425,7 +425,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             cctx.cache().blockGateway(req.request());
 
             if (crd) {
-                CacheGroupInfrastructure grp = 
cctx.cache().cacheGroup(req.descriptor().groupId());
+                CacheGroupContext grp = 
cctx.cache().cacheGroup(req.descriptor().groupId());
 
                 assert grp != null;
 
@@ -780,7 +780,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 c.apply(grp.affinity());
         }
         else {
-            for (CacheGroupInfrastructure grp : 
cctx.kernalContext().cache().cacheGroups()) {
+            for (CacheGroupContext grp : 
cctx.kernalContext().cache().cacheGroups()) {
                 if (grp.isLocal())
                     continue;
 
@@ -805,7 +805,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         CacheGroupHolder grpHolder = grpHolders.get(grpId);
 
-        CacheGroupInfrastructure grp = 
cctx.kernalContext().cache().cacheGroup(grpId);
+        CacheGroupContext grp = cctx.kernalContext().cache().cacheGroup(grpId);
 
         if (grpHolder == null) {
             grpHolder = grp != null ?
@@ -1009,7 +1009,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
 
-        for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
             if (grp.isLocal())
                 continue;
 
@@ -1100,7 +1100,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         boolean centralizedAff;
 
         if (lateAffAssign) {
-            for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (grp.isLocal())
                     continue;
 
@@ -1131,7 +1131,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture 
fut) throws IgniteCheckedException {
         assert !lateAffAssign;
 
-        for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
             if (grp.isLocal())
                 continue;
 
@@ -1164,10 +1164,10 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 // Need initialize holders and affinity if this node became 
coordinator during this exchange.
                 final Integer grpId = desc.groupId();
 
-                CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
+                CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                 if (grp == null) {
-                    cctx.io().addHandler(true, desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
+                    cctx.io().addCacheGroupHandler(desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
                         new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                             @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
                                 processAffinityAssignmentResponse(grpId, 
nodeId, res);
@@ -1256,10 +1256,10 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         if (cacheGrp != null)
             return cacheGrp;
 
-        final CacheGroupInfrastructure grp = 
cctx.cache().cacheGroup(desc.groupId());
+        final CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
 
         if (grp == null) {
-            cctx.io().addHandler(true, desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
+            cctx.io().addCacheGroupHandler(desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
                         processAffinityAssignmentResponse(desc.groupId(), 
nodeId, res);
@@ -1294,7 +1294,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
         if (!crd) {
-            for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (grp.isLocal())
                     continue;
 
@@ -1713,13 +1713,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      */
     private class CacheGroupHolder1 extends CacheGroupHolder {
         /** */
-        private final CacheGroupInfrastructure grp;
+        private final CacheGroupContext grp;
 
         /**
          * @param grp Cache group.
          * @param initAff Current affinity.
          */
-        CacheGroupHolder1(CacheGroupInfrastructure grp, @Nullable 
GridAffinityAssignmentCache initAff) {
+        CacheGroupHolder1(CacheGroupContext grp, @Nullable 
GridAffinityAssignmentCache initAff) {
             super(grp.rebalanceEnabled(), grp.affinity(), initAff);
 
             assert !grp.isLocal() : grp;

http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
new file mode 100644
index 0000000..4844a55
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -0,0 +1,943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.database.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
+import 
org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+
+/**
+ *
+ */
+public class CacheGroupContext {
+    /**
+     * Unique group ID. Currently for shared group it is generated as group 
name hash,
+     * for non-shared as cache name hash (see {@link 
ClusterCachesInfo#checkCacheConflict}).
+     */
+    private final int grpId;
+
+    /** Node ID cache group was received from. */
+    private final UUID rcvdFrom;
+
+    /** Flag indicating that this cache group is in a recovery mode due to 
partitions loss. */
+    private boolean needsRecovery;
+
+    /** */
+    private final AffinityTopologyVersion locStartVer;
+
+    /** */
+    private final CacheConfiguration<?, ?> ccfg;
+
+    /** */
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    private final boolean affNode;
+
+    /** */
+    private final CacheType cacheType;
+
+    /** */
+    private final byte ioPlc;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** */
+    private final boolean storeCacheId;
+
+    /** */
+    private volatile List<GridCacheContext> caches;
+
+    /** */
+    private volatile List<GridCacheContext> contQryCaches;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private GridAffinityAssignmentCache aff;
+
+    /** */
+    private GridDhtPartitionTopologyImpl top;
+
+    /** */
+    private IgniteCacheOffheapManager offheapMgr;
+
+    /** */
+    private GridCachePreloader preldr;
+
+    /** */
+    private final MemoryPolicy memPlc;
+
+    /** */
+    private final CacheObjectContext cacheObjCtx;
+
+    /** */
+    private final FreeList freeList;
+
+    /** */
+    private final ReuseList reuseList;
+
+    /** */
+    private boolean drEnabled;
+
+    /** */
+    private boolean qryEnabled;
+
+    /**
+     * @param grpId Group ID.
+     * @param ctx Context.
+     * @param rcvdFrom Node ID cache group was received from.
+     * @param cacheType Cache type.
+     * @param ccfg Cache configuration.
+     * @param affNode Affinity node flag.
+     * @param memPlc Memory policy.
+     * @param cacheObjCtx Cache object context.
+     * @param freeList Free list.
+     * @param reuseList Reuse list.
+     * @param locStartVer Topology version when group was started on local 
node.
+     */
+    CacheGroupContext(
+        GridCacheSharedContext ctx,
+        int grpId,
+        UUID rcvdFrom,
+        CacheType cacheType,
+        CacheConfiguration ccfg,
+        boolean affNode,
+        MemoryPolicy memPlc,
+        CacheObjectContext cacheObjCtx,
+        FreeList freeList,
+        ReuseList reuseList,
+        AffinityTopologyVersion locStartVer) {
+        assert ccfg != null;
+        assert memPlc != null || !affNode;
+        assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", 
grpName=" + ccfg.getGroupName() + ']';
+
+        this.grpId = grpId;
+        this.rcvdFrom = rcvdFrom;
+        this.ctx = ctx;
+        this.ccfg = ccfg;
+        this.affNode = affNode;
+        this.memPlc = memPlc;
+        this.cacheObjCtx = cacheObjCtx;
+        this.freeList = freeList;
+        this.reuseList = reuseList;
+        this.locStartVer = locStartVer;
+        this.cacheType = cacheType;
+
+        ioPlc = cacheType.ioPolicy();
+
+        depEnabled = ctx.kernalContext().deploy().enabled() && 
!ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg);
+
+        storeCacheId = affNode && memPlc.config().getPageEvictionMode() != 
DataPageEvictionMode.DISABLED;
+
+        log = ctx.kernalContext().log(getClass());
+
+        caches = new ArrayList<>();
+    }
+
+    /**
+     * @return {@code True} if this is cache group for one of system caches.
+     */
+    public boolean systemCache() {
+        return !sharedGroup() && CU.isSystemCache(ccfg.getName());
+    }
+
+    /**
+     * @return Node ID initiated cache group start.
+     */
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
+    /**
+     * @return {@code True} if cacheId should be stored in data pages.
+     */
+    public boolean storeCacheIdInDataPage() {
+        return storeCacheId;
+    }
+
+    /**
+     * @return {@code True} if deployment is enabled.
+     */
+    public boolean deploymentEnabled() {
+        return depEnabled;
+    }
+
+    /**
+     * @return Preloader.
+     */
+    public GridCachePreloader preloader() {
+        return preldr;
+    }
+
+    /**
+     * @return IO policy for the given cache group.
+     */
+    public byte ioPolicy() {
+        return ioPlc;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
+        addCacheContext(cctx);
+
+        offheapMgr.onCacheStarted(cctx);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return {@code True} if group contains cache with given name.
+     */
+    public boolean hasCache(String cacheName) {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            if (caches.get(i).name().equals(cacheName))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    private void addCacheContext(GridCacheContext cctx) {
+        assert cacheType.userCache() == cctx.userCache() : cctx.name();
+        assert grpId == cctx.groupId() : cctx.name();
+
+        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
+
+        assert sharedGroup() || caches.isEmpty();
+
+        boolean add = caches.add(cctx);
+
+        assert add : cctx.name();
+
+        if (!qryEnabled && QueryUtils.isEnabled(cctx.config()))
+            qryEnabled = true;
+
+        if (!drEnabled && cctx.isDrEnabled())
+            drEnabled = true;
+
+        this.caches = caches;
+   }
+
+    /**
+     * @param cctx Cache context.
+     */
+    private void removeCacheContext(GridCacheContext cctx) {
+        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
+
+        // It is possible cache was not added in case of errors on cache start.
+        for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) 
{
+            GridCacheContext next = it.next();
+
+            if (next == cctx) {
+                assert sharedGroup() || caches.size() == 1 : caches.size();
+
+                it.remove();
+
+                break;
+            }
+        }
+
+        if (QueryUtils.isEnabled(cctx.config())) {
+            boolean qryEnabled = false;
+
+            for (int i = 0; i < caches.size(); i++) {
+                if (QueryUtils.isEnabled(caches.get(i).config())) {
+                    qryEnabled = true;
+
+                    break;
+                }
+            }
+
+            this.qryEnabled = qryEnabled;
+        }
+
+        if (cctx.isDrEnabled()) {
+            boolean drEnabled = false;
+
+            for (int i = 0; i < caches.size(); i++) {
+                if (caches.get(i).isDrEnabled()) {
+                    drEnabled = true;
+
+                    break;
+                }
+            }
+
+            this.drEnabled = drEnabled;
+        }
+
+        this.caches = caches;
+    }
+
+    /**
+     * @return Cache context if group contains single cache.
+     */
+    public GridCacheContext singleCacheContext() {
+        List<GridCacheContext> caches = this.caches;
+
+        assert !sharedGroup() && caches.size() == 1;
+
+        return caches.get(0);
+    }
+
+    /**
+     *
+     */
+    public void unwindUndeploys() {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.deploy().unwind(cctx);
+        }
+    }
+
+    /**
+     * @param type Event type to check.
+     * @return {@code True} if given event type should be recorded.
+     */
+    public boolean eventRecordable(int type) {
+        return cacheType.userCache() && ctx.gridEvents().isRecordable(type);
+    }
+
+    /**
+     * Adds rebalancing event.
+     *
+     * @param part Partition.
+     * @param type Event type.
+     * @param discoNode Discovery node.
+     * @param discoType Discovery event type.
+     * @param discoTs Discovery event timestamp.
+     */
+    public void addRebalanceEvent(int part, int type, ClusterNode discoNode, 
int discoType, long discoTs) {
+        assert discoNode != null;
+        assert type > 0;
+        assert discoType > 0;
+        assert discoTs > 0;
+
+        if (!eventRecordable(type))
+            LT.warn(log, "Added event without checking if event is recordable: 
" + U.gridEventName(type));
+
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            if (cctx.recordEvent(type)) {
+                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                    cctx.localNode(),
+                    "Cache rebalancing event.",
+                    type,
+                    part,
+                    discoNode,
+                    discoType,
+                    discoTs));
+            }
+        }
+    }
+    /**
+     * Adds partition unload event.
+     *
+     * @param part Partition.
+     */
+    public void addUnloadEvent(int part) {
+        if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+            LT.warn(log, "Added event without checking if event is recordable: 
" +
+                U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
+
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                cctx.localNode(),
+                "Cache unloading event.",
+                EVT_CACHE_REBALANCE_PART_UNLOADED,
+                part,
+                null,
+                0,
+                0));
+        }
+    }
+
+    /**
+     * @param part Partition.
+     * @param key Key.
+     * @param evtNodeId Event node ID.
+     * @param type Event type.
+     * @param newVal New value.
+     * @param hasNewVal Has new value flag.
+     * @param oldVal Old values.
+     * @param hasOldVal Has old value flag.
+     * @param keepBinary Keep binary flag.
+     */
+    public void addCacheEvent(
+        int part,
+        KeyCacheObject key,
+        UUID evtNodeId,
+        int type,
+        @Nullable CacheObject newVal,
+        boolean hasNewVal,
+        @Nullable CacheObject oldVal,
+        boolean hasOldVal,
+        boolean keepBinary
+    ) {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.events().addEvent(part,
+                key,
+                evtNodeId,
+                (IgniteUuid)null,
+                null,
+                type,
+                newVal,
+                hasNewVal,
+                oldVal,
+                hasOldVal,
+                null,
+                null,
+                null,
+                keepBinary);
+        }
+    }
+
+    /**
+     * @return {@code True} if contains cache with query indexing enabled.
+     */
+    public boolean queriesEnabled() {
+        return qryEnabled;
+    }
+
+    /**
+     * @return {@code True} if fast eviction is allowed.
+     */
+    public boolean allowFastEviction() {
+        return ctx.database().persistenceEnabled() && !queriesEnabled();
+    }
+
+    /**
+     * @return {@code True} in case replication is enabled.
+     */
+    public boolean isDrEnabled() {
+        return drEnabled;
+    }
+
+    /**
+     * @return Free List.
+     */
+    public FreeList freeList() {
+        return freeList;
+    }
+
+    /**
+     * @return Reuse List.
+     */
+    public ReuseList reuseList() {
+        return reuseList;
+    }
+
+    /**
+     * @return Cache object context.
+     */
+    public CacheObjectContext cacheObjectContext() {
+        return cacheObjCtx;
+    }
+
+    /**
+     * @return Cache shared context.
+     */
+    public GridCacheSharedContext shared() {
+        return ctx;
+    }
+
+    /**
+     * @return Memory policy.
+     */
+    public MemoryPolicy memoryPolicy() {
+        return memPlc;
+    }
+
+    /**
+     * @return {@code True} if local node is affinity node.
+     */
+    public boolean affinityNode() {
+        return affNode;
+    }
+
+    /**
+     * @return Topology.
+     */
+    public GridDhtPartitionTopology topology() {
+        if (top == null)
+            throw new IllegalStateException("Topology is not initialized: " + 
name());
+
+        return top;
+    }
+
+    /**
+     * @return Offheap manager.
+     */
+    public IgniteCacheOffheapManager offheap() {
+        return offheapMgr;
+    }
+
+    /**
+     * @return Current cache state. Must only be modified during exchange.
+     */
+    public boolean needsRecovery() {
+        return needsRecovery;
+    }
+
+    /**
+     * @param needsRecovery Needs recovery flag.
+     */
+    public void needsRecovery(boolean needsRecovery) {
+        this.needsRecovery = needsRecovery;
+    }
+
+    /**
+     * @return Topology version when group was started on local node.
+     */
+    public AffinityTopologyVersion localStartVersion() {
+        return locStartVer;
+    }
+
+    /**
+     * @return {@code True} if cache is local.
+     */
+    public boolean isLocal() {
+        return ccfg.getCacheMode() == LOCAL;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration config() {
+        return ccfg;
+    }
+
+    /**
+     * @return Cache node filter.
+     */
+    public IgnitePredicate<ClusterNode> nodeFilter() {
+        return ccfg.getNodeFilter();
+    }
+
+    /**
+     * @return Configured user objects which should be initialized/stopped on 
group start/stop.
+     */
+    Collection<?> configuredUserObjects() {
+        return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), 
ccfg.getTopologyValidator());
+    }
+
+    /**
+     * @return Configured topology validator.
+     */
+    @Nullable public TopologyValidator topologyValidator() {
+        return ccfg.getTopologyValidator();
+    }
+
+    /**
+     * @return Configured affinity function.
+     */
+    public AffinityFunction affinityFunction() {
+        return ccfg.getAffinity();
+    }
+
+    /**
+     * @return Affinity.
+     */
+    public GridAffinityAssignmentCache affinity() {
+        return aff;
+    }
+
+    /**
+     * @return Group name or {@code null} if group name was not specified for 
cache.
+     */
+    @Nullable public String name() {
+        return ccfg.getGroupName();
+    }
+
+    /**
+     * @return Group name if it is specified, otherwise cache name.
+     */
+    public String cacheOrGroupName() {
+        return ccfg.getGroupName() != null ? ccfg.getGroupName() : 
ccfg.getName();
+    }
+
+    /**
+     * @return Group ID.
+     */
+    public int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @return {@code True} if group can contain multiple caches.
+     */
+    public boolean sharedGroup() {
+        return ccfg.getGroupName() != null;
+    }
+
+    /**
+     *
+     */
+    public void onKernalStop() {
+        aff.cancelFutures(new IgniteCheckedException("Failed to wait for 
topology update, node is stopping."));
+
+        preldr.onKernalStop();
+
+        offheapMgr.onKernalStop();
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param destroy Destroy flag.
+     */
+    void stopCache(GridCacheContext cctx, boolean destroy) {
+        if (top != null)
+            top.onCacheStopped(cctx.cacheId());
+
+        offheapMgr.stopCache(cctx.cacheId(), destroy);
+
+        removeCacheContext(cctx);
+    }
+
+    /**
+     *
+     */
+    void stopGroup() {
+        IgniteCheckedException err =
+            new IgniteCheckedException("Failed to wait for topology update, 
cache (or node) is stopping.");
+
+        aff.cancelFutures(err);
+
+        offheapMgr.stop();
+
+        ctx.io().removeCacheGroupHandlers(grpId);
+    }
+
+    /**
+     * @return IDs of caches in this group.
+     */
+    public Set<Integer> cacheIds() {
+        List<GridCacheContext> caches = this.caches;
+
+        Set<Integer> ids = U.newHashSet(caches.size());
+
+        for (int i = 0; i < caches.size(); i++)
+            ids.add(caches.get(i).cacheId());
+
+        return ids;
+    }
+
+    /**
+     * @return {@code True} if group contains caches.
+     */
+    boolean hasCaches() {
+        List<GridCacheContext> caches = this.caches;
+
+        return !caches.isEmpty();
+    }
+
+    /**
+     * @param part Partition ID.
+     */
+    public void onPartitionEvicted(int part) {
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            if (cctx.isDrEnabled())
+                cctx.dr().partitionEvicted(part);
+
+            cctx.continuousQueries().onPartitionEvicted(part);
+
+            cctx.dataStructures().onPartitionEvicted(part);
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    public void addCacheWithContinuousQuery(GridCacheContext cctx) {
+        assert sharedGroup() : cacheOrGroupName();
+        assert cctx.group() == this : cctx.name();
+        assert !cctx.isLocal() : cctx.name();
+
+        synchronized (this) {
+            List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+            if (contQryCaches == null)
+                contQryCaches = new ArrayList<>();
+
+            contQryCaches.add(cctx);
+
+            this.contQryCaches = contQryCaches;
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
+        assert sharedGroup() : cacheOrGroupName();
+        assert cctx.group() == this : cctx.name();
+        assert !cctx.isLocal() : cctx.name();
+
+        synchronized (this) {
+            List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+            if (contQryCaches == null)
+                return;
+
+            contQryCaches.remove(cctx);
+
+            if (contQryCaches.isEmpty())
+                contQryCaches = null;
+
+            this.contQryCaches = contQryCaches;
+        }
+    }
+
+    /**
+     * @param cacheId ID of cache initiated counter update.
+     * @param part Partition number.
+     * @param cntr Counter.
+     * @param topVer Topology version for current operation.
+     */
+    public void onPartitionCounterUpdate(int cacheId,
+        int part,
+        long cntr,
+        AffinityTopologyVersion topVer,
+        boolean primary) {
+        assert sharedGroup();
+
+        if (isLocal())
+            return;
+
+        List<GridCacheContext> contQryCaches = this.contQryCaches;
+
+        if (contQryCaches == null)
+            return;
+
+        CounterSkipContext skipCtx = null;
+
+        for (int i = 0; i < contQryCaches.size(); i++) {
+            GridCacheContext cctx = contQryCaches.get(i);
+
+            if (cacheId != cctx.cacheId())
+                skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, 
part, cntr, topVer, primary);
+        }
+
+        final List<Runnable> procC = skipCtx != null ? 
skipCtx.processClosures() : null;
+
+        if (procC != null) {
+            ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    for (Runnable c : procC)
+                        c.run();
+                }
+            });
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start() throws IgniteCheckedException {
+        aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
+            cacheOrGroupName(),
+            grpId,
+            ccfg.getAffinity(),
+            ccfg.getNodeFilter(),
+            ccfg.getBackups(),
+            ccfg.getCacheMode() == LOCAL);
+
+        if (ccfg.getCacheMode() != LOCAL) {
+            top = new GridDhtPartitionTopologyImpl(ctx, this);
+
+            if (!ctx.kernalContext().clientNode()) {
+                ctx.io().addCacheGroupHandler(groupId(), 
GridDhtAffinityAssignmentRequest.class,
+                    new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentRequest>() {
+                        @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentRequest msg) {
+                            processAffinityAssignmentRequest(nodeId, msg);
+                        }
+                    });
+            }
+
+            preldr = new GridDhtPreloader(this);
+
+            preldr.start();
+        }
+        else
+            preldr = new GridCachePreloaderAdapter(this);
+
+        offheapMgr = new IgniteCacheOffheapManagerImpl();
+
+        offheapMgr.start(ctx, this);
+
+        ctx.affinity().onCacheGroupCreated(this);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processAffinityAssignmentRequest(final UUID nodeId,
+        final GridDhtAffinityAssignmentRequest req) {
+        if (log.isDebugEnabled())
+            log.debug("Processing affinity assignment request [node=" + nodeId 
+ ", req=" + req + ']');
+
+        IgniteInternalFuture<AffinityTopologyVersion> fut = 
aff.readyFuture(req.topologyVersion());
+
+        if (fut != null) {
+            fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    processAffinityAssignmentRequest0(nodeId, req);
+                }
+            });
+        }
+        else
+            processAffinityAssignmentRequest0(nodeId, req);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processAffinityAssignmentRequest0(UUID nodeId, final 
GridDhtAffinityAssignmentRequest req) {
+        AffinityTopologyVersion topVer = req.topologyVersion();
+
+        if (log.isDebugEnabled())
+            log.debug("Affinity is ready for topology version, will send 
response [topVer=" + topVer +
+                ", node=" + nodeId + ']');
+
+        AffinityAssignment assignment = aff.cachedAffinity(topVer);
+
+        GridDhtAffinityAssignmentResponse res = new 
GridDhtAffinityAssignmentResponse(
+            req.futureId(),
+            grpId,
+            topVer,
+            assignment.assignment());
+
+        if (aff.centralizedAffinityFunction()) {
+            assert assignment.idealAssignment() != null;
+
+            res.idealAffinityAssignment(assignment.idealAssignment());
+        }
+
+        try {
+            ctx.io().send(nodeId, res, AFFINITY_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send affinity assignment response to 
remote node [node=" + nodeId + ']', e);
+        }
+    }
+
+    /**
+     * @param reconnectFut Reconnect future.
+     */
+    public void onDisconnected(IgniteFuture reconnectFut) {
+        IgniteCheckedException err = new 
IgniteClientDisconnectedCheckedException(reconnectFut,
+            "Failed to wait for topology update, client disconnected.");
+
+        if (aff != null)
+            aff.cancelFutures(err);
+    }
+
+    /**
+     * @return {@code True} if rebalance is enabled.
+     */
+    public boolean rebalanceEnabled() {
+        return ccfg.getRebalanceMode() != NONE;
+    }
+
+    /**
+     *
+     */
+    public void onReconnected() {
+        aff.onReconnected();
+
+        if (top != null)
+            top.onReconnected();
+
+        preldr.onReconnected();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "CacheGroupContext [grp=" + cacheOrGroupName() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
deleted file mode 100644
index 5278e4f..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ /dev/null
@@ -1,951 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataPageEvictionMode;
-import org.apache.ignite.configuration.TopologyValidator;
-import org.apache.ignite.events.CacheRebalancingEvent;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.processors.cache.database.MemoryPolicy;
-import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
-import 
org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
-import 
org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.cache.CacheMode.LOCAL;
-import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
-import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
-import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
-
-/**
- *
- */
-public class CacheGroupInfrastructure {
-    /** Group ID. */
-    private final int grpId;
-
-    /** Node ID cache group was received from. */
-    private final UUID rcvdFrom;
-
-    /** Flag indicating that this cache group is in a recovery mode due to 
partitions loss. */
-    private boolean needsRecovery;
-
-    /** */
-    private final AffinityTopologyVersion locStartVer;
-
-    /** */
-    private final CacheConfiguration<?, ?> ccfg;
-
-    /** */
-    private final GridCacheSharedContext ctx;
-
-    /** */
-    private final boolean affNode;
-
-    /** */
-    private final CacheType cacheType;
-
-    /** */
-    private final byte ioPlc;
-
-    /** */
-    private final boolean depEnabled;
-
-    /** */
-    private final boolean storeCacheId;
-
-    /** */
-    private volatile List<GridCacheContext> caches;
-
-    /** */
-    private volatile List<GridCacheContext> contQryCaches;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private GridAffinityAssignmentCache aff;
-
-    /** */
-    private GridDhtPartitionTopologyImpl top;
-
-    /** */
-    private IgniteCacheOffheapManager offheapMgr;
-
-    /** */
-    private GridCachePreloader preldr;
-
-    /** */
-    private final MemoryPolicy memPlc;
-
-    /** */
-    private final CacheObjectContext cacheObjCtx;
-
-    /** */
-    private final FreeList freeList;
-
-    /** */
-    private final ReuseList reuseList;
-
-    /** */
-    private boolean drEnabled;
-
-    /** */
-    private boolean qryEnabled;
-
-    /**
-     * @param grpId Group ID.
-     * @param ctx Context.
-     * @param rcvdFrom Node ID cache group was received from.
-     * @param cacheType Cache type.
-     * @param ccfg Cache configuration.
-     * @param affNode Affinity node flag.
-     * @param memPlc Memory policy.
-     * @param cacheObjCtx Cache object context.
-     * @param freeList Free list.
-     * @param reuseList Reuse list.
-     * @param locStartVer Topology version when group was started on local 
node.
-     */
-    CacheGroupInfrastructure(
-        GridCacheSharedContext ctx,
-        int grpId,
-        UUID rcvdFrom,
-        CacheType cacheType,
-        CacheConfiguration ccfg,
-        boolean affNode,
-        MemoryPolicy memPlc,
-        CacheObjectContext cacheObjCtx,
-        FreeList freeList,
-        ReuseList reuseList,
-        AffinityTopologyVersion locStartVer) {
-        assert ccfg != null;
-        assert memPlc != null || !affNode;
-        assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", 
grpName=" + ccfg.getGroupName() + ']';
-
-        this.grpId = grpId;
-        this.rcvdFrom = rcvdFrom;
-        this.ctx = ctx;
-        this.ccfg = ccfg;
-        this.affNode = affNode;
-        this.memPlc = memPlc;
-        this.cacheObjCtx = cacheObjCtx;
-        this.freeList = freeList;
-        this.reuseList = reuseList;
-        this.locStartVer = locStartVer;
-        this.cacheType = cacheType;
-
-        ioPlc = cacheType.ioPolicy();
-
-        depEnabled = ctx.kernalContext().deploy().enabled() && 
!ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg);
-
-        storeCacheId = affNode && memPlc.config().getPageEvictionMode() != 
DataPageEvictionMode.DISABLED;
-
-        log = ctx.kernalContext().log(getClass());
-
-        caches = new ArrayList<>();
-    }
-
-    /**
-     * @return {@code True} if this is cache group for one of system caches.
-     */
-    public boolean systemCache() {
-        return !sharedGroup() && CU.isSystemCache(ccfg.getName());
-    }
-
-    /**
-     * @return Node ID initiated cache group start.
-     */
-    public UUID receivedFrom() {
-        return rcvdFrom;
-    }
-
-    /**
-     * @return {@code True} if cacheId should be stored in data pages.
-     */
-    public boolean storeCacheIdInDataPage() {
-        return storeCacheId;
-    }
-
-    /**
-     * @return {@code True} if deployment is enabled.
-     */
-    public boolean deploymentEnabled() {
-        return depEnabled;
-    }
-
-    /**
-     * @return Preloader.
-     */
-    public GridCachePreloader preloader() {
-        return preldr;
-    }
-
-    /**
-     * @return IO policy for the given cache group.
-     */
-    public byte ioPolicy() {
-        return ioPlc;
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @throws IgniteCheckedException If failed.
-     */
-    void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
-        addCacheContext(cctx);
-
-        offheapMgr.onCacheStarted(cctx);
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @return {@code True} if group contains cache with given name.
-     */
-    public boolean hasCache(String cacheName) {
-        List<GridCacheContext> caches = this.caches;
-
-        for (int i = 0; i < caches.size(); i++) {
-            if (caches.get(i).name().equals(cacheName))
-                return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @param cctx Cache context.
-     */
-    private void addCacheContext(GridCacheContext cctx) {
-        assert cacheType.userCache() == cctx.userCache() : cctx.name();
-        assert grpId == cctx.groupId() : cctx.name();
-
-        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
-
-        assert sharedGroup() || caches.isEmpty();
-
-        boolean add = caches.add(cctx);
-
-        assert add : cctx.name();
-
-        if (!qryEnabled && QueryUtils.isEnabled(cctx.config()))
-            qryEnabled = true;
-
-        if (!drEnabled && cctx.isDrEnabled())
-            drEnabled = true;
-
-        this.caches = caches;
-   }
-
-    /**
-     * @param cctx Cache context.
-     */
-    private void removeCacheContext(GridCacheContext cctx) {
-        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
-
-        // It is possible cache was not added in case of errors on cache start.
-        for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) 
{
-            GridCacheContext next = it.next();
-
-            if (next == cctx) {
-                assert sharedGroup() || caches.size() == 1 : caches.size();
-
-                it.remove();
-
-                break;
-            }
-        }
-
-        if (QueryUtils.isEnabled(cctx.config())) {
-            boolean qryEnabled = false;
-
-            for (int i = 0; i < caches.size(); i++) {
-                if (QueryUtils.isEnabled(caches.get(i).config())) {
-                    qryEnabled = true;
-
-                    break;
-                }
-            }
-
-            this.qryEnabled = qryEnabled;
-        }
-
-        if (cctx.isDrEnabled()) {
-            boolean drEnabled = false;
-
-            for (int i = 0; i < caches.size(); i++) {
-                if (caches.get(i).isDrEnabled()) {
-                    drEnabled = true;
-
-                    break;
-                }
-            }
-
-            this.drEnabled = drEnabled;
-        }
-
-        this.caches = caches;
-    }
-
-    /**
-     * @return Cache context if group contains single cache.
-     */
-    public GridCacheContext singleCacheContext() {
-        List<GridCacheContext> caches = this.caches;
-
-        assert !sharedGroup() && caches.size() == 1;
-
-        return caches.get(0);
-    }
-
-    /**
-     *
-     */
-    public void unwindUndeploys() {
-        List<GridCacheContext> caches = this.caches;
-
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = caches.get(i);
-
-            cctx.deploy().unwind(cctx);
-        }
-    }
-
-    /**
-     * @param type Event type to check.
-     * @return {@code True} if given event type should be recorded.
-     */
-    public boolean eventRecordable(int type) {
-        return cacheType.userCache() && ctx.gridEvents().isRecordable(type);
-    }
-
-    /**
-     * Adds rebalancing event.
-     *
-     * @param part Partition.
-     * @param type Event type.
-     * @param discoNode Discovery node.
-     * @param discoType Discovery event type.
-     * @param discoTs Discovery event timestamp.
-     */
-    public void addRebalanceEvent(int part, int type, ClusterNode discoNode, 
int discoType, long discoTs) {
-        assert discoNode != null;
-        assert type > 0;
-        assert discoType > 0;
-        assert discoTs > 0;
-
-        if (!eventRecordable(type))
-            LT.warn(log, "Added event without checking if event is recordable: 
" + U.gridEventName(type));
-
-        List<GridCacheContext> caches = this.caches;
-
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = caches.get(i);
-
-            if (cctx.recordEvent(type)) {
-                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
-                    cctx.localNode(),
-                    "Cache rebalancing event.",
-                    type,
-                    part,
-                    discoNode,
-                    discoType,
-                    discoTs));
-            }
-        }
-    }
-    /**
-     * Adds partition unload event.
-     *
-     * @param part Partition.
-     */
-    public void addUnloadEvent(int part) {
-        if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
-            LT.warn(log, "Added event without checking if event is recordable: 
" +
-                U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
-
-        List<GridCacheContext> caches = this.caches;
-
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = caches.get(i);
-
-            cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
-                cctx.localNode(),
-                "Cache unloading event.",
-                EVT_CACHE_REBALANCE_PART_UNLOADED,
-                part,
-                null,
-                0,
-                0));
-        }
-    }
-
-    /**
-     * @param part Partition.
-     * @param key Key.
-     * @param evtNodeId Event node ID.
-     * @param type Event type.
-     * @param newVal New value.
-     * @param hasNewVal Has new value flag.
-     * @param oldVal Old values.
-     * @param hasOldVal Has old value flag.
-     * @param keepBinary Keep binary flag.
-     */
-    public void addCacheEvent(
-        int part,
-        KeyCacheObject key,
-        UUID evtNodeId,
-        int type,
-        @Nullable CacheObject newVal,
-        boolean hasNewVal,
-        @Nullable CacheObject oldVal,
-        boolean hasOldVal,
-        boolean keepBinary
-    ) {
-        List<GridCacheContext> caches = this.caches;
-
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = caches.get(i);
-
-            cctx.events().addEvent(part,
-                key,
-                evtNodeId,
-                (IgniteUuid)null,
-                null,
-                type,
-                newVal,
-                hasNewVal,
-                oldVal,
-                hasOldVal,
-                null,
-                null,
-                null,
-                keepBinary);
-        }
-    }
-
-    /**
-     * @return {@code True} if contains cache with query indexing enabled.
-     */
-    public boolean queriesEnabled() {
-        return qryEnabled;
-    }
-
-    /**
-     * @return {@code True} if fast eviction is allowed.
-     */
-    public boolean allowFastEviction() {
-        return ctx.database().persistenceEnabled() && !queriesEnabled();
-    }
-
-    /**
-     * @return {@code True} in case replication is enabled.
-     */
-    public boolean isDrEnabled() {
-        return drEnabled;
-    }
-
-    /**
-     * @return Free List.
-     */
-    public FreeList freeList() {
-        return freeList;
-    }
-
-    /**
-     * @return Reuse List.
-     */
-    public ReuseList reuseList() {
-        return reuseList;
-    }
-
-    /**
-     * @return Cache object context.
-     */
-    public CacheObjectContext cacheObjectContext() {
-        return cacheObjCtx;
-    }
-
-    /**
-     * @return Cache shared context.
-     */
-    public GridCacheSharedContext shared() {
-        return ctx;
-    }
-
-    /**
-     * @return Memory policy.
-     */
-    public MemoryPolicy memoryPolicy() {
-        return memPlc;
-    }
-
-    /**
-     * @return {@code True} if local node is affinity node.
-     */
-    public boolean affinityNode() {
-        return affNode;
-    }
-
-    /**
-     * @return Topology.
-     */
-    public GridDhtPartitionTopology topology() {
-        if (top == null)
-            throw new IllegalStateException("Topology is not initialized: " + 
name());
-
-        return top;
-    }
-
-    /**
-     * @return Offheap manager.
-     */
-    public IgniteCacheOffheapManager offheap() {
-        return offheapMgr;
-    }
-
-    /**
-     * @return Current cache state. Must only be modified during exchange.
-     */
-    public boolean needsRecovery() {
-        return needsRecovery;
-    }
-
-    /**
-     * @param needsRecovery Needs recovery flag.
-     */
-    public void needsRecovery(boolean needsRecovery) {
-        this.needsRecovery = needsRecovery;
-    }
-
-    /**
-     * @return Topology version when group was started on local node.
-     */
-    public AffinityTopologyVersion localStartVersion() {
-        return locStartVer;
-    }
-
-    /**
-     * @return {@code True} if cache is local.
-     */
-    public boolean isLocal() {
-        return ccfg.getCacheMode() == LOCAL;
-    }
-
-    /**
-     * @return Cache configuration.
-     */
-    public CacheConfiguration config() {
-        return ccfg;
-    }
-
-    /**
-     * @return Cache node filter.
-     */
-    public IgnitePredicate<ClusterNode> nodeFilter() {
-        return ccfg.getNodeFilter();
-    }
-
-    /**
-     * @return Configured user objects which should be initialized/stopped on 
group start/stop.
-     */
-    Collection<?> configuredUserObjects() {
-        return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), 
ccfg.getTopologyValidator());
-    }
-
-    /**
-     * @return Configured topology validator.
-     */
-    @Nullable public TopologyValidator topologyValidator() {
-        return ccfg.getTopologyValidator();
-    }
-
-    /**
-     * @return Configured affinity function.
-     */
-    public AffinityFunction affinityFunction() {
-        return ccfg.getAffinity();
-    }
-
-    /**
-     * @return Affinity.
-     */
-    public GridAffinityAssignmentCache affinity() {
-        return aff;
-    }
-
-    /**
-     * @return Group name or {@code null} if group name was not specified for 
cache.
-     */
-    @Nullable public String name() {
-        return ccfg.getGroupName();
-    }
-
-    /**
-     * @return Group name if it is specified, otherwise cache name.
-     */
-    public String cacheOrGroupName() {
-        return ccfg.getGroupName() != null ? ccfg.getGroupName() : 
ccfg.getName();
-    }
-
-    /**
-     * @return Group ID.
-     */
-    public int groupId() {
-        return grpId;
-    }
-
-    /**
-     * @return {@code True} if group can contain multiple caches.
-     */
-    public boolean sharedGroup() {
-        return ccfg.getGroupName() != null;
-    }
-
-    /**
-     *
-     */
-    public void onKernalStop() {
-        aff.cancelFutures(new IgniteCheckedException("Failed to wait for 
topology update, node is stopping."));
-
-        preldr.onKernalStop();
-
-        offheapMgr.onKernalStop();
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param destroy Destroy flag.
-     */
-    void stopCache(GridCacheContext cctx, boolean destroy) {
-        if (top != null)
-            top.onCacheStopped(cctx.cacheId());
-
-        offheapMgr.stopCache(cctx.cacheId(), destroy);
-
-        removeCacheContext(cctx);
-    }
-
-    /**
-     *
-     */
-    void stopGroup() {
-        IgniteCheckedException err =
-            new IgniteCheckedException("Failed to wait for topology update, 
cache (or node) is stopping.");
-
-        aff.cancelFutures(err);
-
-        offheapMgr.stop();
-
-        ctx.io().removeCacheGroupHandlers(grpId);
-    }
-
-    /**
-     * @return IDs of caches in this group.
-     */
-    public Set<Integer> cacheIds() {
-        List<GridCacheContext> caches = this.caches;
-
-        Set<Integer> ids = U.newHashSet(caches.size());
-
-        for (int i = 0; i < caches.size(); i++)
-            ids.add(caches.get(i).cacheId());
-
-        return ids;
-    }
-
-    /**
-     * @return {@code True} if group contains caches.
-     */
-    boolean hasCaches() {
-        List<GridCacheContext> caches = this.caches;
-
-        return !caches.isEmpty();
-    }
-
-    /**
-     * @param part Partition ID.
-     */
-    public void onPartitionEvicted(int part) {
-        List<GridCacheContext> caches = this.caches;
-
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = caches.get(i);
-
-            if (cctx.isDrEnabled())
-                cctx.dr().partitionEvicted(part);
-
-            cctx.continuousQueries().onPartitionEvicted(part);
-
-            cctx.dataStructures().onPartitionEvicted(part);
-        }
-    }
-
-    /**
-     * @param cctx Cache context.
-     */
-    public void addCacheWithContinuousQuery(GridCacheContext cctx) {
-        assert sharedGroup() : cacheOrGroupName();
-        assert cctx.group() == this : cctx.name();
-        assert !cctx.isLocal() : cctx.name();
-
-        synchronized (this) {
-            List<GridCacheContext> contQryCaches = this.contQryCaches;
-
-            if (contQryCaches == null)
-                contQryCaches = new ArrayList<>();
-
-            contQryCaches.add(cctx);
-
-            this.contQryCaches = contQryCaches;
-        }
-    }
-
-    /**
-     * @param cctx Cache context.
-     */
-    public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
-        assert sharedGroup() : cacheOrGroupName();
-        assert cctx.group() == this : cctx.name();
-        assert !cctx.isLocal() : cctx.name();
-
-        synchronized (this) {
-            List<GridCacheContext> contQryCaches = this.contQryCaches;
-
-            if (contQryCaches == null)
-                return;
-
-            contQryCaches.remove(cctx);
-
-            if (contQryCaches.isEmpty())
-                contQryCaches = null;
-
-            this.contQryCaches = contQryCaches;
-        }
-    }
-
-    /**
-     * @param cacheId ID of cache initiated counter update.
-     * @param part Partition number.
-     * @param cntr Counter.
-     * @param topVer Topology version for current operation.
-     */
-    public void onPartitionCounterUpdate(int cacheId,
-        int part,
-        long cntr,
-        AffinityTopologyVersion topVer,
-        boolean primary) {
-        assert sharedGroup();
-
-        if (isLocal())
-            return;
-
-        List<GridCacheContext> contQryCaches = this.contQryCaches;
-
-        if (contQryCaches == null)
-            return;
-
-        CounterSkipContext skipCtx = null;
-
-        for (int i = 0; i < contQryCaches.size(); i++) {
-            GridCacheContext cctx = contQryCaches.get(i);
-
-            if (cacheId != cctx.cacheId())
-                skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, 
part, cntr, topVer, primary);
-        }
-
-        final List<Runnable> procC = skipCtx != null ? 
skipCtx.processClosures() : null;
-
-        if (procC != null) {
-            ctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                @Override public void run() {
-                    for (Runnable c : procC)
-                        c.run();
-                }
-            });
-        }
-    }
-
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    public void start() throws IgniteCheckedException {
-        aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
-            cacheOrGroupName(),
-            grpId,
-            ccfg.getAffinity(),
-            ccfg.getNodeFilter(),
-            ccfg.getBackups(),
-            ccfg.getCacheMode() == LOCAL);
-
-        if (ccfg.getCacheMode() != LOCAL) {
-            GridCacheMapEntryFactory entryFactory = new 
GridCacheMapEntryFactory() {
-                @Override public GridCacheMapEntry create(
-                    GridCacheContext ctx,
-                    AffinityTopologyVersion topVer,
-                    KeyCacheObject key
-                ) {
-                    return new GridDhtCacheEntry(ctx, topVer, key);
-                }
-            };
-
-            top = new GridDhtPartitionTopologyImpl(ctx, this, entryFactory);
-
-            if (!ctx.kernalContext().clientNode()) {
-                ctx.io().addHandler(true, groupId(), 
GridDhtAffinityAssignmentRequest.class,
-                    new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentRequest>() {
-                        @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentRequest msg) {
-                            processAffinityAssignmentRequest(nodeId, msg);
-                        }
-                    });
-            }
-
-            preldr = new GridDhtPreloader(this);
-
-            preldr.start();
-        }
-        else
-            preldr = new GridCachePreloaderAdapter(this);
-
-        offheapMgr = new IgniteCacheOffheapManagerImpl();
-
-        offheapMgr.start(ctx, this);
-
-        ctx.affinity().onCacheGroupCreated(this);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    private void processAffinityAssignmentRequest(final UUID nodeId,
-        final GridDhtAffinityAssignmentRequest req) {
-        if (log.isDebugEnabled())
-            log.debug("Processing affinity assignment request [node=" + nodeId 
+ ", req=" + req + ']');
-
-        IgniteInternalFuture<AffinityTopologyVersion> fut = 
aff.readyFuture(req.topologyVersion());
-
-        if (fut != null) {
-            fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    processAffinityAssignmentRequest0(nodeId, req);
-                }
-            });
-        }
-        else
-            processAffinityAssignmentRequest0(nodeId, req);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    private void processAffinityAssignmentRequest0(UUID nodeId, final 
GridDhtAffinityAssignmentRequest req) {
-        AffinityTopologyVersion topVer = req.topologyVersion();
-
-        if (log.isDebugEnabled())
-            log.debug("Affinity is ready for topology version, will send 
response [topVer=" + topVer +
-                ", node=" + nodeId + ']');
-
-        AffinityAssignment assignment = aff.cachedAffinity(topVer);
-
-        GridDhtAffinityAssignmentResponse res = new 
GridDhtAffinityAssignmentResponse(
-            req.futureId(),
-            grpId,
-            topVer,
-            assignment.assignment());
-
-        if (aff.centralizedAffinityFunction()) {
-            assert assignment.idealAssignment() != null;
-
-            res.idealAffinityAssignment(assignment.idealAssignment());
-        }
-
-        try {
-            ctx.io().send(nodeId, res, AFFINITY_POOL);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send affinity assignment response to 
remote node [node=" + nodeId + ']', e);
-        }
-    }
-
-    /**
-     * @param reconnectFut Reconnect future.
-     */
-    public void onDisconnected(IgniteFuture reconnectFut) {
-        IgniteCheckedException err = new 
IgniteClientDisconnectedCheckedException(reconnectFut,
-            "Failed to wait for topology update, client disconnected.");
-
-        if (aff != null)
-            aff.cancelFutures(err);
-    }
-
-    /**
-     * @return {@code True} if rebalance is enabled.
-     */
-    public boolean rebalanceEnabled() {
-        return ccfg.getRebalanceMode() != NONE;
-    }
-
-    /**
-     *
-     */
-    public void onReconnected() {
-        aff.onReconnected();
-
-        if (top != null)
-            top.onReconnected();
-
-        preldr.onReconnected();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "CacheGroupInfrastructure [grp=" + cacheOrGroupName() + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 58b6a60..08c08c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -79,7 +80,7 @@ class ClusterCachesInfo {
     private CacheJoinNodeDiscoveryData joinDiscoData;
 
     /** */
-    private CacheNodeCommonDiscoveryData gridData;
+    private GridData gridData;
 
     /** */
     private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> 
locJoinStartCaches;
@@ -117,7 +118,21 @@ class ClusterCachesInfo {
                 validateCacheGroupConfiguration(ccfg, info.config());
         }
 
-        processJoiningNode(joinDiscoData, ctx.localNodeId());
+        String conflictErr = processJoiningNode(joinDiscoData, 
ctx.localNodeId(), true);
+
+        if (conflictErr != null)
+            throw new IgniteCheckedException("Failed to start configured 
cache. " + conflictErr);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param grpName Group name.
+     * @return Group ID.
+     */
+    private int cacheGroupId(String cacheName, @Nullable String grpName) {
+        assert cacheName != null;
+
+        return grpName != null ? CU.cacheId(grpName) : CU.cacheId(cacheName);
     }
 
     /**
@@ -125,11 +140,14 @@ class ClusterCachesInfo {
      * @throws IgniteCheckedException If failed.
      */
     void onKernalStart(boolean checkConsistency) throws IgniteCheckedException 
{
+        if (gridData != null && gridData.conflictErr != null)
+            throw new IgniteCheckedException(gridData.conflictErr);
+
         if (checkConsistency && joinDiscoData != null && gridData != null) {
             for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : 
joinDiscoData.caches().values()) {
                 CacheConfiguration locCfg = locCacheInfo.config();
 
-                CacheData cacheData = gridData.caches().get(locCfg.getName());
+                CacheData cacheData = 
gridData.gridData.caches().get(locCfg.getName());
 
                 if (cacheData != null)
                     checkCache(locCacheInfo, cacheData, 
cacheData.receivedFrom());
@@ -305,6 +323,17 @@ class ClusterCachesInfo {
 
             if (req.start()) {
                 if (desc == null) {
+                    String conflictErr = 
checkCacheConflict(req.startCacheConfiguration());
+
+                    if (conflictErr != null) {
+                        U.warn(log, "Ignore cache start request. " + 
conflictErr);
+
+                        ctx.cache().completeCacheStartFuture(req, false, new 
IgniteCheckedException("Failed to start " +
+                            "cache. " + conflictErr));
+
+                        continue;
+                    }
+
                     if (req.clientStartOnly()) {
                         ctx.cache().completeCacheStartFuture(req, false, new 
IgniteCheckedException("Failed to start " +
                             "client cache (a cache with the given name is not 
started): " + req.cacheName()));
@@ -543,7 +572,7 @@ class ClusterCachesInfo {
             Map<Integer, CacheGroupDescriptor> grps = 
cachesOnDisconnect.cacheGrps;
             Map<String, DynamicCacheDescriptor> caches = 
cachesOnDisconnect.caches;
 
-            for (CacheGroupInfrastructure grp : ctx.cache().cacheGroups()) {
+            for (CacheGroupContext grp : ctx.cache().cacheGroups()) {
                 CacheGroupDescriptor desc = grps.get(grp.groupId());
 
                 assert desc != null : grp.cacheOrGroupName();
@@ -815,7 +844,24 @@ class ClusterCachesInfo {
             }
         }
 
-        gridData = cachesData;
+        String conflictErr = null;
+
+        if (joinDiscoData != null) {
+            for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : 
joinDiscoData.caches().entrySet()) {
+                if (!registeredCaches.containsKey(e.getKey())) {
+                    conflictErr = checkCacheConflict(e.getValue().config());
+
+                    if (conflictErr != null) {
+                        conflictErr = "Failed to start configured cache due to 
conflict with started caches. " +
+                            conflictErr;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        gridData = new GridData(cachesData, conflictErr);
 
         if (!disconnectedState())
             initStartCachesForLocalJoin(false);
@@ -894,7 +940,7 @@ class ClusterCachesInfo {
                     
processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, 
data.joiningNodeId());
             }
             else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
-                
processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, 
data.joiningNodeId());
+                
processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, 
data.joiningNodeId(), false);
         }
     }
 
@@ -918,10 +964,59 @@ class ClusterCachesInfo {
     }
 
     /**
+     * @param cfg Cache configuration.
+     * @return {@code True} if validation passed.
+     */
+    private String checkCacheConflict(CacheConfiguration<?, ?> cfg) {
+        int cacheId = CU.cacheId(cfg.getName());
+
+        if (cacheGroupByName(cfg.getName()) != null)
+            return "Cache name conflict with existing cache group (change 
cache name) [cacheName=" + cfg.getName() + ']';
+
+        if (cfg.getGroupName() != null) {
+            DynamicCacheDescriptor desc = 
registeredCaches.get(cfg.getGroupName());
+
+            if (desc != null)
+                return "Cache group name conflict with existing cache (change 
group name) [cacheName=" + cfg.getName() +
+                    ", conflictingCacheName=" + desc.cacheName() + ']';
+        }
+
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            if (desc.cacheId() == cacheId)
+                return "Cache ID conflict (change cache name) [cacheName=" + 
cfg.getName() +
+                    ", conflictingCacheName=" + desc.cacheName() + ']';
+        }
+
+        int grpId = cacheGroupId(cfg.getName(), cfg.getGroupName());
+
+        if (cfg.getGroupName() != null) {
+            if (cacheGroupByName(cfg.getGroupName()) == null) {
+                CacheGroupDescriptor desc = registeredCacheGrps.get(grpId);
+
+                if (desc != null)
+                    return "Cache group ID conflict (change cache group name) 
[cacheName=" + cfg.getName() +
+                        ", groupName=" + cfg.getGroupName() +
+                        (desc.sharedGroup() ? ", conflictingGroupName=" : ", 
conflictingCacheName=") + desc.cacheOrGroupName() + ']';
+            }
+        }
+        else {
+            CacheGroupDescriptor desc = registeredCacheGrps.get(grpId);
+
+            if (desc != null)
+                return "Cache group ID conflict (change cache name) 
[cacheName=" + cfg.getName() +
+                    (desc.sharedGroup() ? ", conflictingGroupName=" : ", 
conflictingCacheName=") + desc.cacheOrGroupName() + ']';
+        }
+
+        return null;
+    }
+
+    /**
      * @param joinData Joined node discovery data.
      * @param nodeId Joined node ID.
+     * @param locJoin {@code True} if called on local node join.
+     * @return Configuration conflict error.
      */
-    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID 
nodeId) {
+    private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, 
UUID nodeId, boolean locJoin) {
         for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : 
joinData.templates().values()) {
             CacheConfiguration<?, ?> cfg = cacheInfo.config();
 
@@ -947,6 +1042,17 @@ class ClusterCachesInfo {
             CacheConfiguration<?, ?> cfg = cacheInfo.config();
 
             if (!registeredCaches.containsKey(cfg.getName())) {
+                String conflictErr = checkCacheConflict(cfg);
+
+                if (conflictErr != null) {
+                    if (locJoin)
+                        return conflictErr;
+
+                    U.warn(log, "Ignore cache received from joining node. " + 
conflictErr);
+
+                    continue;
+                }
+
                 int cacheId = CU.cacheId(cfg.getName());
 
                 CacheGroupDescriptor grpDesc = registerCacheGroup(null,
@@ -987,6 +1093,8 @@ class ClusterCachesInfo {
                     desc.cacheConfiguration().getNearConfiguration() != null);
             }
         }
+
+        return null;
     }
 
     /**
@@ -1045,12 +1153,7 @@ class ClusterCachesInfo {
             }
         }
 
-        int grpId;
-
-        if (startedCacheCfg.getGroupName() == null)
-            grpId = CU.cacheId(startedCacheCfg.getName());
-        else
-            grpId = CU.cacheId("grp#" + startedCacheCfg.getGroupName());
+        int grpId = cacheGroupId(startedCacheCfg.getName(), 
startedCacheCfg.getGroupName());
 
         Map<String, Integer> caches = 
Collections.singletonMap(startedCacheCfg.getName(), cacheId);
 
@@ -1266,6 +1369,26 @@ class ClusterCachesInfo {
     /**
      *
      */
+    static class GridData {
+        /** */
+        private final CacheNodeCommonDiscoveryData gridData;
+
+        /** */
+        private final String conflictErr;
+
+        /**
+         * @param gridData Grid data.
+         * @param conflictErr Cache configuration conflict error.
+         */
+        GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) {
+            this.gridData = gridData;
+            this.conflictErr = conflictErr;
+        }
+    }
+
+    /**
+     *
+     */
     private static class CachesOnDisconnect {
         /** */
         final Map<Integer, CacheGroupDescriptor> cacheGrps;

http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 153adf3..75584d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -540,11 +540,6 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
-     * @return Entry factory.
-     */
-    protected abstract GridCacheMapEntryFactory entryFactory();
-
-    /**
      * Starts this cache. Child classes should override this method
      * to provide custom start-up behavior.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 50e0121..839ddbd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -135,7 +135,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     private GridCacheSharedContext<K, V> sharedCtx;
 
     /** Cache group. */
-    private CacheGroupInfrastructure grp;
+    private CacheGroupContext grp;
 
     /** Logger. */
     private IgniteLogger log;
@@ -284,7 +284,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
         GridKernalContext ctx,
         GridCacheSharedContext sharedCtx,
         CacheConfiguration cacheCfg,
-        CacheGroupInfrastructure grp,
+        CacheGroupContext grp,
         CacheType cacheType,
         AffinityTopologyVersion locStartTopVer,
         boolean affNode,
@@ -382,7 +382,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     /**
      * @return Cache group.
      */
-    public CacheGroupInfrastructure group() {
+    public CacheGroupContext group() {
         return grp;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/781e33ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 8556c87..a251047 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1193,19 +1193,31 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Adds message handler.
-     *
-     * @param cacheGrp {@code True} if cache group message, {@code false} if 
cache message.
      * @param hndId Message handler ID.
      * @param type Type of message.
      * @param c Handler.
      */
-    public void addHandler(
-        boolean cacheGrp,
+    public void addCacheHandler(
         int hndId,
         Class<? extends GridCacheMessage> type,
         IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
-        addHandler(hndId, type, c, cacheGrp ? grpHandlers : cacheHandlers);
+        assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
+
+        addHandler(hndId, type, c, cacheHandlers);
+    }
+
+    /**
+     * @param hndId Message handler ID.
+     * @param type Type of message.
+     * @param c Handler.
+     */
+    public void addCacheGroupHandler(
+        int hndId,
+        Class<? extends GridCacheGroupIdMessage> type,
+        IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        assert !type.isAssignableFrom(GridCacheIdMessage.class) : type;
+
+        addHandler(hndId, type, c, grpHandlers);
     }
 
     /**
@@ -1321,6 +1333,22 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param topic Topic.
+     * @param c Handler.
+     */
+    public void addOrderedCacheHandler(Object topic, IgniteBiInClosure<UUID, ? 
extends GridCacheIdMessage> c) {
+        addOrderedHandler(false, topic, c);
+    }
+
+    /**
+     * @param topic Topic.
+     * @param c Handler.
+     */
+    public void addOrderedCacheGroupHandler(Object topic, 
IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) {
+        addOrderedHandler(true, topic, c);
+    }
+
+    /**
      * Adds ordered message handler.
      *
      * @param cacheGrp {@code True} if cache group message, {@code false} if 
cache message.
@@ -1328,7 +1356,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
      * @param c Handler.
      */
     @SuppressWarnings({"unchecked"})
-    public void addOrderedHandler(boolean cacheGrp, Object topic, 
IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+    private void addOrderedHandler(boolean cacheGrp, Object topic, 
IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
         MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
 
         IgniteLogger log0 = log;

Reply via email to