This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 39c872a85b4 IGNITE-26168 Added support of partition loss detection 
after cluster inactivity (#12288)
39c872a85b4 is described below

commit 39c872a85b46bc88b7a1ba7b09a647a004fb299c
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sun Sep 7 20:50:56 2025 +0300

    IGNITE-26168 Added support of partition loss detection after cluster 
inactivity (#12288)
---
 .../processors/cache/CacheGroupContext.java        |   5 +
 .../processors/cache/CacheGroupRecoveryState.java  |  79 +++
 .../cache/CacheJoinNodeDiscoveryData.java          |  14 +
 .../cache/CacheNodeCommonDiscoveryData.java        |  17 +-
 .../cache/ClusterCacheGroupRecoveryData.java       |  73 +++
 .../processors/cache/ClusterCachesInfo.java        |  37 +-
 .../cache/GridCachePartitionExchangeManager.java   |   2 +
 .../processors/cache/GridCacheProcessor.java       | 104 +++-
 .../preloader/CachePartitionFullCountersMap.java   |  15 +
 .../preloader/GridDhtPartitionsExchangeFuture.java | 119 ++---
 .../preloader/GridDhtPartitionsSingleMessage.java  |   5 +
 .../dht/topology/GridClientPartitionTopology.java  |  10 +-
 .../dht/topology/GridDhtPartitionTopology.java     |   6 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java | 210 +++++---
 .../processors/cluster/BaselineTopology.java       |   5 +
 .../cluster/GridClusterStateProcessor.java         |   6 -
 .../cluster/IGridClusterStateProcessor.java        |   6 -
 .../IgniteLostPartitionsRecoveryTest.java          | 533 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite12.java  |   2 +
 19 files changed, 1056 insertions(+), 192 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index f7ba9b9a2c2..80c20e2b556 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -1344,6 +1344,11 @@ public class CacheGroupContext {
         return preparedToStop;
     }
 
+    /** */
+    public void applyRecoveryData(CacheGroupRecoveryState grpState) {
+        top.applyRecoveryData(grpState);
+    }
+
     /**
      * @param ccfg Cache configuration.
      * @param plugins Ignite plugin processor.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java
new file mode 100644
index 00000000000..880fa09b21c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+public class CacheGroupRecoveryState implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private Set<Integer> lostParts;
+
+    /** */
+    private Set<Integer> zeroParts;
+
+    /** */
+    public CacheGroupRecoveryState() {
+        // No-op.
+    }
+
+    /** */
+    public CacheGroupRecoveryState(CacheGroupContext grp) {
+        GridDhtPartitionTopology top = grp.topology();
+
+        lostParts = top.lostPartitions();
+        zeroParts = top.fullUpdateCounters().zeroUpdateCounterPartitions();
+    }
+
+    /** */
+    public Set<Integer> zeroUpdateCounterParititons() {
+        return Collections.unmodifiableSet(zeroParts);
+    }
+
+    /** */
+    public Set<Integer> lostParititons() {
+        return Collections.unmodifiableSet(lostParts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeIntArray(out, 
lostParts.stream().mapToInt(Number::intValue).toArray());
+        U.writeIntArray(out, 
zeroParts.stream().mapToInt(Number::intValue).toArray());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        int[] lostParts = U.readIntArray(in);
+        this.lostParts = lostParts.length == 0 ? Collections.emptySet() : 
Arrays.stream(lostParts).boxed().collect(Collectors.toSet());
+
+        int[] zeroParts = U.readIntArray(in);
+        this.zeroParts = zeroParts.length == 0 ? Collections.emptySet() : 
Arrays.stream(zeroParts).boxed().collect(Collectors.toSet());
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index 47f8dc42b9c..700d5166608 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Information about configured caches sent from joining node.
@@ -47,6 +48,9 @@ public class CacheJoinNodeDiscoveryData implements 
Serializable {
     /** */
     private final boolean startCaches;
 
+    /** */
+    @Nullable private ClusterCacheGroupRecoveryData 
clusterCacheGroupRecoveryData;
+
     /**
      * @param cacheDeploymentId Deployment ID for started caches.
      * @param caches Caches.
@@ -93,6 +97,16 @@ public class CacheJoinNodeDiscoveryData implements 
Serializable {
         return caches;
     }
 
+    /** */
+    public void clusterCacheGroupRecoveryData(@Nullable 
ClusterCacheGroupRecoveryData clusterCacheGroupRecoveryData) {
+        this.clusterCacheGroupRecoveryData = clusterCacheGroupRecoveryData;
+    }
+
+    /** */
+    @Nullable public ClusterCacheGroupRecoveryData 
clusterCacheGroupRecoveryData() {
+        return clusterCacheGroupRecoveryData;
+    }
+
     /**
      *
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
index abcc1927d92..379d49e46d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Cache information sent in discovery data to joining node.
@@ -47,19 +48,25 @@ public class CacheNodeCommonDiscoveryData implements 
Serializable {
     private final Map<String, Map<UUID, Boolean>> clientNodesMap;
 
     /** */
-    private Collection<String> restartingCaches;
+    private final Collection<String> restartingCaches;
+
+    /** */
+    @Nullable private final ClusterCacheGroupRecoveryData 
clusterCacheGrpRecoveryData;
 
     /**
      * @param caches Started caches.
      * @param templates Configured templates.
      * @param cacheGrps Started cache groups.
      * @param clientNodesMap Information about cache client nodes.
+     * @param restartingCaches Collection of cache names that is being 
restarted.
+     * @param clusterCacheGrpRecoveryData Cluster cache group recovery data.
      */
     public CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
         Map<String, CacheData> templates,
         Map<Integer, CacheGroupData> cacheGrps,
         Map<String, Map<UUID, Boolean>> clientNodesMap,
-        Collection<String> restartingCaches
+        Collection<String> restartingCaches,
+        @Nullable ClusterCacheGroupRecoveryData clusterCacheGrpRecoveryData
     ) {
         assert caches != null;
         assert templates != null;
@@ -71,6 +78,7 @@ public class CacheNodeCommonDiscoveryData implements 
Serializable {
         this.cacheGrps = cacheGrps;
         this.clientNodesMap = clientNodesMap;
         this.restartingCaches = restartingCaches;
+        this.clusterCacheGrpRecoveryData = clusterCacheGrpRecoveryData;
     }
 
     /**
@@ -80,6 +88,11 @@ public class CacheNodeCommonDiscoveryData implements 
Serializable {
         return cacheGrps;
     }
 
+    /** */
+    @Nullable ClusterCacheGroupRecoveryData clusterCacheGroupRecoveryData() {
+        return clusterCacheGrpRecoveryData;
+    }
+
     /**
      * @return Started caches.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java
new file mode 100644
index 00000000000..0d71593378a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class ClusterCacheGroupRecoveryData implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long clusterBaselineTopologyVersion;
+
+    /** */
+    private Map<Integer, CacheGroupRecoveryState> grpStates;
+
+    /** */
+    public ClusterCacheGroupRecoveryData() {
+        // No-op.
+    }
+
+    /** */
+    public ClusterCacheGroupRecoveryData(long clusterBaselineTopologyVersion, 
Collection<CacheGroupContext> grps) {
+        this.clusterBaselineTopologyVersion = clusterBaselineTopologyVersion;
+        this.grpStates = 
grps.stream().collect(Collectors.toMap(CacheGroupContext::groupId, 
CacheGroupRecoveryState::new));
+    }
+
+    /** */
+    public boolean isMoreRelevantThan(ClusterCacheGroupRecoveryData data) {
+        return clusterBaselineTopologyVersion > 
data.clusterBaselineTopologyVersion;
+    }
+
+    /** */
+    @Nullable public CacheGroupRecoveryState cacheGroupRecoveryState(int 
grpId) {
+        return grpStates.get(grpId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(clusterBaselineTopologyVersion);
+        U.writeMap(out, grpStates);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        clusterBaselineTopologyVersion = in.readLong();
+        grpStates = U.readHashMap(in);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 86bce4091ea..a76d7dfb50e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -174,6 +174,9 @@ public class ClusterCachesInfo {
     /** Flag that caches were already filtered out. */
     private final AtomicBoolean alreadyFiltered = new AtomicBoolean();
 
+    /** */
+    @Nullable private volatile ClusterCacheGroupRecoveryData 
clusterCacheGrpRecoveryData;
+
     /**
      * @param ctx Context.
      */
@@ -1484,7 +1487,9 @@ public class ClusterCachesInfo {
             templates,
             cacheGrps,
             ctx.discovery().clientNodesMap(),
-            restarting);
+            restarting,
+            clusterCacheGrpRecoveryData
+        );
     }
 
     /**
@@ -1716,6 +1721,8 @@ public class ClusterCachesInfo {
                 grpData.config().getNodeFilter(),
                 grpData.config().getCacheMode());
         }
+
+        clusterCacheGrpRecoveryData = 
cachesData.clusterCacheGroupRecoveryData();
     }
 
     /**
@@ -2198,6 +2205,8 @@ public class ClusterCachesInfo {
     private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, 
UUID nodeId, boolean locJoin) {
         registerNewCacheTemplates(joinData, nodeId);
 
+        
processJoiningNodeClusterCacheGroupRecoveryData(joinData.clusterCacheGroupRecoveryData());
+
         Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new 
HashMap<>();
 
         boolean hasSchemaPatchConflict = false;
@@ -2805,6 +2814,32 @@ public class ClusterCachesInfo {
         restartingCaches.clear();
     }
 
+    /** */
+    @Nullable public ClusterCacheGroupRecoveryData 
clusterCacheGroupRecoveryData() {
+        return clusterCacheGrpRecoveryData;
+    }
+
+    /** */
+    public void clusterCacheGroupRecoveryData(ClusterCacheGroupRecoveryData 
data) {
+        clusterCacheGrpRecoveryData = data;
+    }
+
+    /** */
+    private void 
processJoiningNodeClusterCacheGroupRecoveryData(ClusterCacheGroupRecoveryData 
joiningNodeData) {
+        if (joiningNodeData == null)
+            return;
+
+        DiscoveryDataClusterState clusterState = ctx.state().clusterState();
+
+        if (clusterState.transition() || clusterState.state().active())
+            return;
+
+        ClusterCacheGroupRecoveryData locData = clusterCacheGrpRecoveryData;
+
+        if (locData == null || joiningNodeData.isMoreRelevantThan(locData))
+            clusterCacheGrpRecoveryData = joiningNodeData;
+    }
+
     /**
      * Holds direct comparator (first system caches) and reverse comparator 
(first user caches).
      * Use DIRECT comparator for ordering cache start operations.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 35e2706e63e..6bff39ae692 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1576,6 +1576,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                 m.addPartitionSizes(grp.groupId(), 
grp.topology().partitionSizes());
             }
+            else if (sndCounters && grp.persistenceEnabled() && 
exchActions.deactivate())
+                m.addPartitionUpdateCounters(grp.groupId(), 
grp.topology().localUpdateCounters(true));
         }
 
         for (GridClientPartitionTopology top : clientTops.values()) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 07e00b9a2b6..fefd243a2e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -102,6 +102,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCluster
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.FinishPreloadingTask;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionDefferedDeleteQueueCleanupTask;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager;
@@ -228,6 +229,9 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.doInParallel;
  */
 @SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
 public class GridCacheProcessor extends GridProcessorAdapter {
+    /** */
+    public static final String 
METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY = 
"org.apache.ignite.cluster.cache.group.recovery";
+
     /** */
     public static final String CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT =
         "Failed to perform %s operation (cluster is in read-only mode) 
[cacheGrp=%s, cache=%s]";
@@ -2905,9 +2909,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         }
     }
 
-    /**
-     * @param grpToStop Group for which listener shuold be removed.
-     */
+    /** @param grpToStop Group for which listener should be removed. */
     private void 
removeOffheapCheckpointListener(List<IgniteBiTuple<CacheGroupContext, Boolean>> 
grpToStop) {
         sharedCtx.database().checkpointReadLock();
         try {
@@ -2929,15 +2931,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /**
      * Callback invoked when first exchange future for dynamic cache is 
completed.
      *
-     * @param cacheStartVer Started caches version to create proxy for.
-     * @param exchActions Change requests.
+     * @param fut Partitions exchage future.
      * @param err Error.
      */
-    public void onExchangeDone(
-        AffinityTopologyVersion cacheStartVer,
-        @Nullable ExchangeActions exchActions,
-        @Nullable Throwable err
-    ) {
+    public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, @Nullable 
Throwable err) throws IgniteCheckedException {
+        AffinityTopologyVersion cacheStartVer = fut.initialVersion();
+        ExchangeActions exchActions = fut.exchangeActions();
+
         initCacheProxies(cacheStartVer, err);
 
         if (exchActions == null)
@@ -2946,8 +2946,11 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         if (exchActions.systemCachesStarting() && 
exchActions.stateChangeRequest() == null)
             ctx.dataStructures().restoreStructuresState(ctx);
 
-        if (err == null)
+        if (err == null) {
+            processCacheGroupRecoveryDataOnExchangeDone(fut);
+
             processCacheStopRequestOnExchangeDone(cacheStartVer, exchActions);
+        }
     }
 
     /**
@@ -3018,7 +3021,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @return Shared context.
      * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings("unchecked")
     private GridCacheSharedContext createSharedContext(
         GridKernalContext kernalCtx,
         Collection<CacheStoreSessionListener> storeSesLsnrs
@@ -5365,6 +5367,82 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             true)));
     }
 
+    /** */
+    @Nullable private ClusterCacheGroupRecoveryData 
restoreClusterCacheGroupRecoveryData(
+        ReadOnlyMetastorage metastorage
+    ) throws IgniteCheckedException {
+        return 
(ClusterCacheGroupRecoveryData)metastorage.read(METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY);
+    }
+
+    /** */
+    private void 
processCacheGroupRecoveryDataOnExchangeDone(GridDhtPartitionsExchangeFuture 
fut) throws IgniteCheckedException {
+        if (sharedCtx.kernalContext().clientNode())
+            return;
+
+        if (fut.deactivateCluster())
+            persistClusterCacheGroupRecoveryData();
+        else if (fut.activateCluster() || fut.localJoinExchange())
+            clearClusterCacheGroupRecoveryData();
+    }
+
+    /** */
+    private void persistClusterCacheGroupRecoveryData() throws 
IgniteCheckedException {
+        List<CacheGroupContext> persistenceEnabledGrps = 
ctx.cache().cacheGroups().stream()
+            .filter(CacheGroupContext::persistenceEnabled)
+            .collect(Collectors.toList());
+
+        if (persistenceEnabledGrps.isEmpty())
+            return;
+
+        ClusterCacheGroupRecoveryData recData = new 
ClusterCacheGroupRecoveryData(
+            ctx.state().clusterState().baselineTopology().version(),
+            persistenceEnabledGrps
+        );
+
+        sharedCtx.database().checkpointReadLock();
+
+        try {
+            
sharedCtx.database().metaStorage().write(METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY,
 recData);
+        }
+        finally {
+            sharedCtx.database().checkpointReadUnlock();
+        }
+
+        cachesInfo.clusterCacheGroupRecoveryData(recData);
+    }
+
+    /** */
+    private void clearClusterCacheGroupRecoveryData() throws 
IgniteCheckedException {
+        if (cachesInfo.clusterCacheGroupRecoveryData() == null)
+            return;
+
+        sharedCtx.database().checkpointReadLock();
+
+        try {
+            
sharedCtx.database().metaStorage().remove(METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY);
+        }
+        finally {
+            sharedCtx.database().checkpointReadUnlock();
+        }
+
+        cachesInfo.clusterCacheGroupRecoveryData(null);
+    }
+
+    /** */
+    public void applyCacheGroupRecoveryData() {
+        ClusterCacheGroupRecoveryData grpRecoveyData = 
cachesInfo.clusterCacheGroupRecoveryData();
+
+        if (grpRecoveyData == null)
+            return;
+
+        for (CacheGroupContext grp : cacheGrps.values()) {
+            CacheGroupRecoveryState grpState = 
grpRecoveyData.cacheGroupRecoveryState(grp.groupId());
+
+            if (grpState != null)
+                grp.applyRecoveryData(grpState);
+        }
+    }
+
     /**
      * Filter map by key.
      *
@@ -5432,6 +5510,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) 
throws IgniteCheckedException {
             CacheJoinNodeDiscoveryData data = 
locCfgMgr.restoreCacheConfigurations();
 
+            
data.clusterCacheGroupRecoveryData(restoreClusterCacheGroupRecoveryData(metastorage));
+
             cachesInfo.onStart(data);
         }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 1384a558e47..80858a8a106 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -19,6 +19,9 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  *
@@ -89,6 +92,18 @@ public class CachePartitionFullCountersMap implements 
Serializable {
         updCntrs[p] = updCntr;
     }
 
+    /** @return Set of partitions which update counter is zero. */
+    public Set<Integer> zeroUpdateCounterPartitions() {
+        Set<Integer> res = new HashSet<>();
+
+        for (int p = 0; p < updCntrs.length; p++) {
+            if (updCntrs[p] == 0)
+                res.add(p);
+        }
+
+        return res.isEmpty() ? Collections.emptySet() : 
Collections.unmodifiableSet(res);
+    }
+
     /**
      * Clears full counters map.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0674e9de754..d9715338ea2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -187,9 +187,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** */
     public static final String EXCHANGE_LATCH_ID = "exchange";
 
-    /** */
-    private static final String EXCHANGE_FREE_LATCH_ID = "exchange-free";
-
     /** @see IgniteSystemProperties#IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT 
*/
     public static final int DFLT_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT = 30 * 
60_000;
 
@@ -777,7 +774,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * @return {@code True} if deactivate cluster exchange.
      */
-    private boolean deactivateCluster() {
+    public boolean deactivateCluster() {
         return exchActions != null && exchActions.deactivate();
     }
 
@@ -1753,12 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     cctx.kernalContext().pools().getSystemExecutorService(),
                     cctx.affinity().cacheGroups().values(),
                     desc -> {
-                        CacheGroupContext grp = 
cctx.cache().cacheGroup(desc.groupId());
-
-                        GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
-                            cctx.exchange().clientTopology(desc.groupId(), 
events().discoveryCache());
-
-                        top.beforeExchange(this, true, false); // Not 
expecting new moving partitions.
+                        partitionTopology(desc.groupId()).beforeExchange(this, 
true, false); // Not expecting new moving partitions.
 
                         return null;
                     });
@@ -2467,7 +2459,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 comp.onDoneBeforeTopologyUnlock(this);
 
             // Create and destroy caches and cache proxies.
-            cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
+            cctx.cache().onExchangeDone(this, err);
 
             Map<T2<Integer, Integer>, Long> locReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
 
@@ -3163,7 +3155,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
 
         boolean allReceived = false; // Received all expected messages.
-        boolean updateSingleMap = false;
+        boolean isPartitionUpdateRequired = false;
 
         FinishState finishState0 = null;
 
@@ -3188,7 +3180,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     assert crd.isLocal() : crd;
 
                     if (remaining.remove(nodeId)) {
-                        updateSingleMap = true;
+                        isPartitionUpdateRequired = true;
 
                         pendingSingleUpdates++;
 
@@ -3236,11 +3228,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             return;
         }
 
-        if (updateSingleMap) {
+        if (isPartitionUpdateRequired) {
             try {
-                // Do not update partition map, in case cluster transitioning 
to inactive state.
+                msgs.put(nodeId, msg);
+
                 if (!deactivateCluster())
-                    updatePartitionSingleMap(nodeId, msg);
+                    updatePartitionSingleMap(msg);
             }
             finally {
                 synchronized (mux) {
@@ -3592,12 +3585,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 cctx.kernalContext().pools().getSystemExecutorService(),
                 cctx.affinity().cacheGroups().values(),
                 desc -> {
-                    CacheGroupContext grp = 
cctx.cache().cacheGroup(desc.groupId());
-
-                    GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
-                        cctx.exchange().clientTopology(desc.groupId(), 
events().discoveryCache());
-
-                    top.detectLostPartitions(resTopVer, this);
+                    
partitionTopology(desc.groupId()).detectLostPartitions(resTopVer, this);
 
                     return null;
                 });
@@ -3621,14 +3609,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 cctx.kernalContext().pools().getSystemExecutorService(),
                 cctx.affinity().caches().values(),
                 desc -> {
-                    if (cacheNames.contains(desc.cacheName())) {
-                        CacheGroupContext grp = 
cctx.cache().cacheGroup(desc.groupId());
-
-                        GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
-                            cctx.exchange().clientTopology(desc.groupId(), 
events().discoveryCache());
-
-                        top.resetLostPartitions(initialVersion());
-                    }
+                    if (cacheNames.contains(desc.cacheName()))
+                        
partitionTopology(desc.groupId()).resetLostPartitions(initialVersion());
 
                     return null;
                 });
@@ -3759,7 +3741,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e 
: mergedJoinExchMsgs.entrySet()) {
                             msgs.put(e.getKey(), e.getValue());
 
-                            updatePartitionSingleMap(e.getKey(), e.getValue());
+                            updatePartitionSingleMap(e.getValue());
                         }
                     }
                 }
@@ -3778,12 +3760,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     cctx.kernalContext().pools().getSystemExecutorService(),
                     cctx.affinity().cacheGroups().values(),
                     desc -> {
-                        CacheGroupContext grp = 
cctx.cache().cacheGroup(desc.groupId());
-
-                        GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
-                            cctx.exchange().clientTopology(desc.groupId(), 
events().discoveryCache());
-
-                        top.beforeExchange(this, true, true);
+                        partitionTopology(desc.groupId()).beforeExchange(this, 
true, true);
 
                         return null;
                     });
@@ -3862,6 +3839,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             timeBag.finishGlobalStage("Apply update counters");
 
+            if (activateCluster())
+                cctx.cache().applyCacheGroupRecoveryData();
+
             updateLastVersion(cctx.versions().last());
 
             cctx.versions().onExchange(lastVer.get().order());
@@ -3970,24 +3950,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                     
cctx.kernalContext().state().onStateChangeError(exchangeGlobalExceptions, req);
                 }
-                else {
-                    boolean hasMoving = !partsToReload.isEmpty();
-
-                    Set<Integer> waitGrps = cctx.affinity().waitGroups();
-
-                    if (!hasMoving) {
-                        for (CacheGroupContext grpCtx : 
cctx.cache().cacheGroups()) {
-                            if (waitGrps.contains(grpCtx.groupId()) && 
grpCtx.topology().hasMovingPartitions()) {
-                                hasMoving = true;
-
-                                break;
-                            }
-
-                        }
-                    }
-
-                    
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
-                }
 
                 if 
(!cctx.kernalContext().state().clusterState().localBaselineAutoAdjustment()) {
                     ClusterState state = stateChangeErr ? 
ClusterState.INACTIVE : req.state();
@@ -4030,20 +3992,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         GridDhtPartitionsSingleMessage msg,
         Map<Integer, CacheGroupAffinityMessage> messageAccumulator
     ) {
-        for (Map.Entry<Integer, GridDhtPartitionMap> e : 
msg.partitions().entrySet()) {
-            Integer grpId = e.getKey();
-
-            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
-            GridDhtPartitionTopology top = grp != null
-                ? grp.topology()
-                : cctx.exchange().clientTopology(grpId, 
events().discoveryCache());
-
-            CachePartitionPartialCountersMap cntrs = 
msg.partitionUpdateCounters(grpId);
-
-            if (cntrs != null)
-                top.collectUpdateCounters(cntrs);
-        }
+        msg.partitionUpdateCounters().forEach((grpId, updCntrs) -> 
partitionTopology(grpId).collectUpdateCounters(updCntrs));
 
         Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
@@ -4139,11 +4088,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 cctx.kernalContext().pools().getSystemExecutorService(),
                 cacheGroupDescriptors(),
                 grpDesc -> {
-                    CacheGroupContext grpCtx = 
cctx.cache().cacheGroup(grpDesc.groupId());
-
-                    GridDhtPartitionTopology top = grpCtx != null
-                        ? grpCtx.topology()
-                        : cctx.exchange().clientTopology(grpDesc.groupId(), 
events().discoveryCache());
+                    GridDhtPartitionTopology top = 
partitionTopology(grpDesc.groupId());
 
                     if (CU.isPersistentCache(grpDesc.config(), 
cctx.gridConfig().getDataStorageConfiguration())) {
                         List<SupplyPartitionInfo> list;
@@ -4755,21 +4700,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * Updates partition map in all caches.
      *
-     * @param nodeId Node message received from.
      * @param msg Partitions single message.
      */
-    private void updatePartitionSingleMap(UUID nodeId, 
GridDhtPartitionsSingleMessage msg) {
-        msgs.put(nodeId, msg);
-
-        for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg.partitions().entrySet()) {
-            Integer grpId = entry.getKey();
-            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
-            GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                cctx.exchange().clientTopology(grpId, 
events().discoveryCache());
-
-            top.update(exchId, entry.getValue(), false);
-        }
+    private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) {
+        msg.partitions().forEach((grpId, partMap) -> 
partitionTopology(grpId).update(exchId, partMap, false));
     }
 
     /**
@@ -5243,7 +5177,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     if (dynamicCacheStartExchange() && msg.getError() != null)
                         exchangeGlobalExceptions.put(e.getKey().id(), 
msg.getError());
 
-                    updatePartitionSingleMap(e.getKey().id(), msg);
+                    updatePartitionSingleMap(msg);
                 }
             }
         }
@@ -5366,6 +5300,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         return true;
     }
 
+    /** @return Partiton topology for the group with specified ID. */
+    private GridDhtPartitionTopology partitionTopology(int grpId) {
+        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+        return grp != null
+            ? grp.topology()
+            : cctx.exchange().clientTopology(grpId, events().discoveryCache());
+    }
+
     /** {@inheritDoc} */
     @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
         return exchId.compareTo(fut.exchId);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index de334eaec82..d47f0f3312a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -203,6 +203,11 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         partCntrs.put(grpId, cntrMap);
     }
 
+    /** @return Partition update counters per cache group. */
+    public Map<Integer, CachePartitionPartialCountersMap> 
partitionUpdateCounters() {
+        return partCntrs == null ? Collections.emptyMap() : 
Collections.unmodifiableMap(partCntrs);
+    }
+
     /**
      * @param grpId Cache group ID.
      * @return Partition update counters.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 06531eefe49..9ca26f636ae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.CacheGroupRecoveryState;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -107,7 +108,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     private volatile boolean stopping;
 
     /** A future that will be completed when topology with version topVer will 
be ready to use. */
-    private volatile GridDhtTopologyFuture topReadyFut;
+    private volatile GridDhtPartitionsExchangeFuture topReadyFut;
 
     /** */
     private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -221,7 +222,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
-        GridDhtTopologyFuture exchFut,
+        GridDhtPartitionsExchangeFuture exchFut,
         DiscoCache discoCache,
         long updSeq,
         boolean stopping
@@ -901,6 +902,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         // No-op on client topology.
     }
 
+    /** {@inheritDoc} */
+    @Override public void applyRecoveryData(CacheGroupRecoveryState grpState) {
+        // No-op.
+    }
+
     /**
      * Method checks is new partition map more stale than current partition map
      * New partition map is stale if topology version or update sequence are 
less or equal than of current map
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 8efc3ee8ebb..0c2f415a7a1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupRecoveryState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
@@ -73,7 +74,7 @@ public interface GridDhtPartitionTopology {
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
-        GridDhtTopologyFuture exchFut,
+        GridDhtPartitionsExchangeFuture exchFut,
         DiscoCache discoCache,
         long updateSeq,
         boolean stopping
@@ -336,6 +337,9 @@ public interface GridDhtPartitionTopology {
      */
     public void applyUpdateCounters();
 
+    /** Restores cache group data after node restart if PDS is enabled. Called 
on coordinator. */
+    public void applyRecoveryData(CacheGroupRecoveryState grpState);
+
     /**
      * Checks if there is at least one owner for each partition in the cache 
topology for a local node.
      * If not marks such a partition as LOST or OWNING depending on a policy.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 2c9d582eed9..b004053ca6f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -37,13 +36,14 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupRecoveryState;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -139,7 +139,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     private volatile boolean stopping;
 
     /** A future that will be completed when topology with version topVer will 
be ready to use. */
-    private volatile GridDhtTopologyFuture topReadyFut;
+    private volatile GridDhtPartitionsExchangeFuture topReadyFut;
 
     /** */
     private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -266,7 +266,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
-        GridDhtTopologyFuture exchFut,
+        GridDhtPartitionsExchangeFuture exchFut,
         @NotNull DiscoCache discoCache,
         long updSeq,
         boolean stopping
@@ -701,10 +701,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @param updateSeq Update sequence to initialize full map.
      */
     private void initializeFullMap(long updateSeq) {
-        if (!(topReadyFut instanceof GridDhtPartitionsExchangeFuture))
-            return;
-
-        GridDhtPartitionsExchangeFuture exchFut = 
(GridDhtPartitionsExchangeFuture)topReadyFut;
+        GridDhtPartitionsExchangeFuture exchFut = topReadyFut;
 
         boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), 
grp.receivedFrom());
 
@@ -924,7 +921,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 loc.updateCounter(updCntr);
 
             // Create a partition in lost state.
-            if (lostParts != null && lostParts.contains(p))
+            if (F.contains(lostParts, p))
                 loc.markLost();
         }
 
@@ -985,8 +982,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         if (!create)
             return null;
 
-        boolean created = false;
-
         ctx.database().checkpointReadLock();
 
         try {
@@ -1034,8 +1029,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         loc.resetUpdateCounter();
 
                     this.updateSeq.incrementAndGet();
-
-                    created = true;
                 }
             }
             finally {
@@ -1613,18 +1606,20 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     if (lostParts != null) {
                         this.lostParts = new HashSet<>(lostParts);
 
-                        for (Integer part : lostParts) {
-                            GridDhtLocalPartition locPart = 
localPartition(part);
+                        if (exchFut != null && !exchFut.activateCluster()) {
+                            for (Integer part : lostParts) {
+                                GridDhtLocalPartition locPart = 
localPartition(part);
 
-                            // EVICTED partitions should not be marked 
directly as LOST, or
-                            // part.clearFuture lifecycle will be broken after 
resetting.
-                            // New partition should be created instead.
-                            if (locPart != null && locPart.state() != EVICTED) 
{
-                                locPart.markLost();
+                                // EVICTED partitions should not be marked 
directly as LOST, or
+                                // part.clearFuture lifecycle will be broken 
after resetting.
+                                // New partition should be created instead.
+                                if (locPart != null && locPart.state() != 
EVICTED) {
+                                    locPart.markLost();
 
-                                GridDhtPartitionMap locMap = 
partMap.get(ctx.localNodeId());
+                                    GridDhtPartitionMap locMap = 
partMap.get(ctx.localNodeId());
 
-                                locMap.put(part, LOST);
+                                    locMap.put(part, LOST);
+                                }
                             }
                         }
                     }
@@ -1773,7 +1768,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         U.nanosToMillis(acquiredNanos - nowNanos) + ']');
             }
 
-            if (stopping)
+            if (stopping && !isDeactivationInProgress(topReadyFut))
                 return;
 
             for (int i = 0; i < cntrMap.size(); i++) {
@@ -1809,7 +1804,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         U.nanosToMillis(acquiredNanos - nowNanos) + ']');
             }
 
-            if (stopping)
+            if (stopping && !isDeactivationInProgress(topReadyFut))
                 return;
 
             for (int i = 0; i < locParts.length(); i++) {
@@ -1855,6 +1850,20 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void applyRecoveryData(CacheGroupRecoveryState grpState) {
+        lock.writeLock().lock();
+
+        try {
+            restoreLostPartitions(grpState);
+
+            detectPartitionLossDuringInactivity(grpState);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     /**
      * Method checks is new partition map more stale than current partition map
      * New partition map is stale if topology version or update sequence are 
less or equal than of current map
@@ -2037,8 +2046,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         lock.writeLock().lock();
 
         try {
-            assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) ||
-                
assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion());
+            assert 
assignment.topologyVersion().equals(topReadyFut.context().events().topologyVersion());
 
             readyTopVer = lastTopChangeVer = assignment.topologyVersion();
 
@@ -2199,30 +2207,16 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 if (node2part == null)
                     return false;
 
-                // Do not trigger lost partition events on activation.
-                DiscoveryEvent discoEvt = fut.activateCluster() ? null : 
fut.firstEvent();
-
-                final GridClusterStateProcessor state = 
grp.shared().kernalContext().state();
-
-                boolean isInMemoryCluster = CU.isInMemoryCluster(
-                    grp.shared().kernalContext().discovery().allNodes(),
-                    
grp.shared().kernalContext().marshallerContext().jdkMarshaller(),
-                    U.resolveClassLoader(grp.shared().kernalContext().config())
-                );
-
-                boolean compatibleWithIgnorePlc = isInMemoryCluster
-                    && state.isBaselineAutoAdjustEnabled() && 
state.baselineAutoAdjustTimeout() == 0L;
-
                 // Calculate how data loss is handled.
-                boolean safe = grp.config().getPartitionLossPolicy() != IGNORE 
|| !compatibleWithIgnorePlc;
+                boolean isLossIgnored = isParitionLossIgnored();
 
-                int parts = grp.affinity().partitions();
-
-                Set<Integer> recentlyLost = null;
+                Set<Integer> recentlyLost = fut.activateCluster() && 
!F.isEmpty(lostParts)
+                    ? new HashSet<>(lostParts)
+                    : new HashSet<>();
 
                 boolean changed = false;
 
-                for (int part = 0; part < parts; part++) {
+                for (int part = 0; part < grp.affinity().partitions(); part++) 
{
                     boolean lost = F.contains(lostParts, part);
 
                     if (!lost) {
@@ -2240,27 +2234,10 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         if (!hasOwner) {
                             lost = true;
 
-                            // Do not detect and record lost partition in 
IGNORE mode.
-                            if (safe) {
-                                if (lostParts == null)
-                                    lostParts = new TreeSet<>();
-
-                                lostParts.add(part);
+                            if (!isLossIgnored) {
+                                registerLostPartition(part);
 
-                                if (discoEvt != null) {
-                                    if (recentlyLost == null)
-                                        recentlyLost = new HashSet<>();
-
-                                    recentlyLost.add(part);
-
-                                    if 
(grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                                        grp.addRebalanceEvent(part,
-                                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
-                                            discoEvt.eventNode(),
-                                            discoEvt.type(),
-                                            discoEvt.timestamp());
-                                    }
-                                }
+                                recentlyLost.add(part);
                             }
                         }
                     }
@@ -2274,7 +2251,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                             final GridDhtPartitionState prevState = 
locPart.state();
 
-                            changed = safe ? locPart.markLost() : 
locPart.own();
+                            changed = isLossIgnored ? locPart.own() : 
locPart.markLost();
 
                             if (changed) {
                                 long updSeq = updateSeq.incrementAndGet();
@@ -2295,17 +2272,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                             GridDhtPartitionState p0 = 
entry.getValue().get(part);
 
                             if (p0 != null && p0 != EVICTED)
-                                entry.getValue().put(part, safe ? LOST : 
OWNING);
+                                entry.getValue().put(part, isLossIgnored ? 
OWNING : LOST);
                         }
                     }
                 }
 
-                if (recentlyLost != null) {
-                    U.warn(log, "Detected lost partitions" + (!safe ? " (will 
ignore)" : "")
-                        + " [grp=" + grp.cacheOrGroupName()
-                        + ", parts=" + S.toStringSortedDistinct(recentlyLost)
-                        + ", topVer=" + resTopVer + "]");
-                }
+                recordPartitionLossEvents(fut, recentlyLost, isLossIgnored);
 
                 return changed;
             }
@@ -2366,7 +2338,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         lock.readLock().lock();
 
         try {
-            return lostParts == null ? Collections.<Integer>emptySet() : new 
HashSet<>(lostParts);
+            return lostParts == null ? Collections.emptySet() : new 
HashSet<>(lostParts);
         }
         finally {
             lock.readLock().unlock();
@@ -3273,6 +3245,96 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         // No-op.
     }
 
+    /** */
+    private void restoreLostPartitions(CacheGroupRecoveryState grpState) {
+        grpState.lostParititons().forEach(this::registerLostPartition);
+    }
+
+    /** */
+    private void detectPartitionLossDuringInactivity(CacheGroupRecoveryState 
grpState) {
+        Set<Integer> recoveryZeroParts = 
grpState.zeroUpdateCounterParititons();
+
+        Set<Integer> detectedLostParts = new HashSet<>();
+
+        for (int zeroPart : cntrMap.zeroUpdateCounterPartitions()) {
+            if (!recoveryZeroParts.contains(zeroPart))
+                detectedLostParts.add(zeroPart);
+        }
+
+        detectedLostParts.forEach(this::registerLostPartition);
+
+        if (!detectedLostParts.isEmpty()) {
+            U.warn(log, "Some partitions data was not restored from the PDS 
during cluster activation, but were present" +
+                " at the time of previous cluster deactivation. This may be 
due to the Ignite PDS folder being" +
+                " cleared/temered on all primary and backup nodes for the 
specified partitions during cluster inactivity." +
+                " [cacheGroup=" + grp.cacheOrGroupName() + ", partIds=" + 
S.toStringSortedDistinct(detectedLostParts) + ']');
+        }
+    }
+
+    /** */
+    private boolean registerLostPartition(int partId) {
+        assert lock.isWriteLockedByCurrentThread();
+
+        if (lostParts == null)
+            lostParts = new HashSet<>();
+
+        return lostParts.add(partId);
+    }
+
+    /** */
+    private boolean isParitionLossIgnored() {
+        GridClusterStateProcessor state = grp.shared().kernalContext().state();
+
+        boolean isInMemoryCluster = CU.isInMemoryCluster(
+            grp.shared().kernalContext().discovery().allNodes(),
+            grp.shared().kernalContext().marshallerContext().jdkMarshaller(),
+            U.resolveClassLoader(grp.shared().kernalContext().config())
+        );
+
+        boolean isIgnorePlcSupported = isInMemoryCluster
+            && state.isBaselineAutoAdjustEnabled()
+            && state.baselineAutoAdjustTimeout() == 0L;
+
+        return isIgnorePlcSupported && grp.config().getPartitionLossPolicy() 
== IGNORE;
+    }
+
+    /** */
+    private void recordPartitionLossEvents(
+        @Nullable GridDhtPartitionsExchangeFuture fut,
+        Collection<Integer> recentlyLost,
+        boolean isLossIgnored
+    ) {
+        if (fut == null || F.isEmpty(recentlyLost))
+            return;
+
+        if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+            DiscoveryEvent discoEvt = fut.firstEvent();
+
+            for (Integer part : recentlyLost) {
+                grp.addRebalanceEvent(part,
+                    EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                    discoEvt.eventNode(),
+                    discoEvt.type(),
+                    discoEvt.timestamp());
+            }
+        }
+
+        U.warn(log, "Detected lost partitions" + (isLossIgnored ? " (will 
ignore)" : "")
+            + " [grp=" + grp.cacheOrGroupName()
+            + ", parts=" + S.toStringSortedDistinct(recentlyLost)
+            + ", topVer=" + fut.context().events().topologyVersion() + "]");
+    }
+
+    /** */
+    private boolean isDeactivationInProgress(GridDhtPartitionsExchangeFuture 
exchFut) {
+        if (exchFut == null)
+            return false;
+
+        ExchangeActions exchangeActions = exchFut.exchangeActions();
+
+        return exchangeActions != null && exchangeActions.deactivate();
+    }
+
     /**
      * Collects states of local partitions.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
index fb7bb00a93c..1f71cb192a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
@@ -205,6 +205,11 @@ public class BaselineTopology implements Serializable {
         branchingHist.add(branchingPntHash);
     }
 
+    /** */
+    public long version() {
+        return U.toLong(id, branchingHist.size());
+    }
+
     /**
      * @return id of this BaselineTopology.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index fc775b6b7aa..da3ecb539ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -1676,12 +1676,6 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
         return exchActs;
     }
 
-    /** {@inheritDoc} */
-    @Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture 
exchangeFuture,
-        boolean hasMovingPartitions) {
-        // no-op
-    }
-
     /** {@inheritDoc} */
     @Override public boolean evictionsAllowed() {
         return true;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
index f4f3d622096..fbd1df1bb9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
@@ -141,12 +141,6 @@ public interface IGridClusterStateProcessor extends 
GridProcessor {
      */
     void onBaselineTopologyChanged(BaselineTopology blt, 
BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException;
 
-    /**
-     * @param exchangeFuture Exchange future.
-     * @param hasMovingPartitions {@code True} if there are moving partitions.
-     */
-    void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchangeFuture, 
boolean hasMovingPartitions);
-
     /**
      * @return {@code True} if partition evictions are allowed in current 
state.
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteLostPartitionsRecoveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteLostPartitionsRecoveryTest.java
new file mode 100644
index 00000000000..c5a1beeef8e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteLostPartitionsRecoveryTest.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import 
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * The following tests are intended to check Ignite behaviour in case PDS is 
enabled and some partitions are "lost"
+ * during cluster inactivity (either it is deactivated or completly stopped). 
"Lost" means that the partition data is no
+ * longer available after the subsequent cluster activation - more formally, 
the partition update counter has become
+ * zero. Tests use 4 nodes and 2 "cells". Each "cell" nodes store all copies 
of some paritions. If all nodes in a "cell"
+ * lose their data, this will certainly result in data loss for some 
partitions.
+ */
+public class IgniteLostPartitionsRecoveryTest extends GridCommonAbstractTest {
+    /** */
+    private static final int SERVER_NODES_CNT = 4;
+
+    /** */
+    private static final int CACHE_KEYS_CNT = 100;
+    
+    /** */
+    private static final String SERVER_CACHE = "server-partitioned";
+
+    /** */
+    private static final String CLIENT_CACHE = "client-partitioned";
+
+    /** */
+    private final ListeningTestLogger listeningLogger = new 
ListeningTestLogger(log);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            
.setWorkDirectory(workDirectory(getTestIgniteInstanceIndex(igniteInstanceName)).toString())
+            .setClusterStateOnStart(INACTIVE)
+            .setUserAttributes(singletonMap("CELL", "CELL-" + 
(getTestIgniteInstanceIndex(igniteInstanceName)) % 2))
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setGridLogger(listeningLogger)
+            .setCacheConfiguration(createCacheConfiguration(SERVER_CACHE))
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration()
+                    .setPersistenceEnabled(true))
+            );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        for (int i = 0; i < SERVER_NODES_CNT + 1; i++)
+            U.delete(workDirectory(i));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        for (int i = 0; i < SERVER_NODES_CNT + 1; i++)
+            U.delete(workDirectory(i));
+
+        super.afterTest();
+    }
+
+    /** */
+    @Test
+    public void testPartitionLossDetectionOnActivation() throws Exception {
+        prepareCluster();
+
+        fillCaches();
+
+        grid(0).cluster().state(INACTIVE);
+
+        restartCellWithDataCleared();
+
+        LogListener logLsnr = LogListener.matches("Some partitions data was 
not restored from the PDS during cluster activation")
+            .times(2)
+            .build();
+
+        listeningLogger.registerListener(logLsnr);
+
+        logLsnr.reset();
+
+        grid(0).cluster().state(ACTIVE);
+
+        logLsnr.check(getTestTimeout());
+
+        Collection<Integer> srvCacheLostParts = 
checkCacheLostParitionsDetected(SERVER_CACHE);
+        Collection<Integer> cliCacheLostParts = 
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+        grid(0).resetLostPartitions(grid(0).cacheNames());
+
+        checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+        checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+    }
+
+    /** */
+    @Test
+    public void testLostPartitionsRestoredAfterClusterRestart() throws 
Exception {
+        prepareCluster();
+
+        fillCaches();
+
+        grid(0).cluster().state(INACTIVE);
+
+        restartCellWithDataCleared();
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkCacheLostParitionsDetected(SERVER_CACHE);
+        checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+        grid(0).cluster().state(INACTIVE);
+
+        stopAllGrids();
+
+        prepareCluster();
+
+        Collection<Integer> srvCacheLostParts = 
checkCacheLostParitionsDetected(SERVER_CACHE);
+        Collection<Integer> cliCacheLostParts = 
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+        grid(0).resetLostPartitions(grid(0).cacheNames());
+
+        checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+        checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+    }
+
+
+    /** */
+    @Test
+    public void testLostPartitionsRestoredAfterInactivity() throws Exception {
+        prepareCluster();
+
+        fillCaches();
+
+        stopCell();
+
+        grid(0).cluster().state(INACTIVE);
+
+        startCell();
+
+        grid(0).cluster().state(ACTIVE);
+
+        grid(0).cacheNames().forEach(this::checkCacheLostParitionsDetected);
+
+        grid(0).resetLostPartitions(grid(0).cacheNames());
+
+        for (Ignite grid : G.allGrids()) {
+            for (String cacheName : grid(0).cacheNames()) {
+                for (int i = 0; i < CACHE_KEYS_CNT; i++)
+                    assertEquals(i, grid.cache(cacheName).get(i));
+            }
+        }
+    }
+
+    /** */
+    @Test
+    public void testNodeJoinAfterActivation() throws Exception {
+        prepareCluster();
+
+        fillCaches();
+
+        grid(0).cluster().state(INACTIVE);
+
+        stopGrid(2);
+
+        grid(0).cluster().state(ACTIVE);
+        grid(0).cluster().state(INACTIVE);
+
+        restartCellWithDataCleared();
+
+        grid(0).cluster().state(ACTIVE);
+
+        startGrid(2);
+
+        Collection<Integer> srvCacheLostParts = 
checkCacheLostParitionsDetected(SERVER_CACHE);
+        Collection<Integer> cliCacheLostParts = 
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+        grid(0).resetLostPartitions(grid(0).cacheNames());
+
+        checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+        checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+    }
+
+    /** */
+    @Test
+    public void testNodeJoinDuringClusterStateTransition() throws Exception {
+        prepareCluster();
+
+        fillCaches();
+
+        grid(0).cluster().state(INACTIVE);
+
+        stopGrid(2);
+
+        grid(0).cluster().state(ACTIVE);
+        grid(0).cluster().state(INACTIVE);
+
+        restartCellWithDataCleared();
+
+        spi(grid(1)).blockMessages(GridDhtPartitionsSingleMessage.class, 
grid(0).name());
+
+        CountDownLatch node2JoinedEvtListenedLatch = new CountDownLatch(3);
+
+        for (Integer nodeIdx : Arrays.asList(0, 1, 3)) {
+            grid(nodeIdx).events().localListen(event -> {
+                if (Objects.equals(getTestIgniteInstanceName(2), 
((DiscoveryEvent)event).eventNode().attribute(ATTR_IGNITE_INSTANCE_NAME)))
+                    node2JoinedEvtListenedLatch.countDown();
+
+                return true;
+            }, EVT_NODE_JOINED);
+        }
+
+        IgniteInternalFuture<Object> activationFut = runAsync(() -> 
grid(0).cluster().state(ACTIVE));
+
+        spi(grid(1)).waitForBlocked();
+
+        runAsync(() -> startGrid(2));
+
+        assertTrue(node2JoinedEvtListenedLatch.await(getTestTimeout(), 
MILLISECONDS));
+
+        spi(grid(1)).stopBlock();
+
+        activationFut.get(getTestTimeout());
+
+        Collection<Integer> srvCacheLostParts = 
checkCacheLostParitionsDetected(SERVER_CACHE);
+        Collection<Integer> cliCacheLostParts = 
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+        grid(0).resetLostPartitions(grid(0).cacheNames());
+
+        checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+        checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+    }
+
+    /** */
+    @Test
+    public void testClusterRestartWthEmptyPartitions() throws Exception {
+        prepareCluster();
+
+        
grid(SERVER_NODES_CNT).createCache(createCacheConfiguration(CLIENT_CACHE));
+
+        writeKeyToParition(SERVER_CACHE, 0);
+        writeKeyToParition(SERVER_CACHE, 2);
+
+        writeKeyToParition(CLIENT_CACHE, 1);
+        writeKeyToParition(CLIENT_CACHE, 3);
+
+        forceCheckpoint();
+
+        grid(0).cluster().state(INACTIVE);
+        grid(0).cluster().state(ACTIVE);
+
+        checkNoLostPartitionsDetected();
+
+        grid(0).cluster().state(INACTIVE);
+
+        stopAllGrids();
+
+        prepareCluster();
+    }
+
+    /** */
+    @Test
+    public void testNodeJoinWithStaleCacheGroupRecoveryData() throws Exception 
{
+        prepareCluster();
+
+        fillCaches();
+
+        grid(0).cluster().state(INACTIVE);
+
+        restartCellWithDataCleared();
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkCacheLostParitionsDetected(SERVER_CACHE);
+        checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+        grid(0).cluster().state(INACTIVE);
+
+        stopGrid(2);
+
+        grid(0).cluster().state(ACTIVE);
+
+        grid(0).resetLostPartitions(grid(0).cacheNames());
+
+        checkNoLostPartitionsDetected();
+
+        grid(0).cluster().state(INACTIVE);
+
+        startGrid(2);
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkNoLostPartitionsDetected();
+
+        grid(0).cluster().state(INACTIVE);
+
+        stopAllGrids();
+
+        startGrid(2);
+        startGrid(0);
+        startGrid(1);
+        startGrid(3);
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkNoLostPartitionsDetected();
+    }
+
+    /** */
+    @Test
+    public void testCoordinatorWithMissingCacheGroupRecoveryData() throws 
Exception {
+        prepareCluster();
+
+        fillCaches();
+
+        stopGrid(0);
+
+        grid(1).cluster().state(INACTIVE);
+
+        stopAllGrids();
+
+        startGrid(0);
+        startGrid(2);
+        startNodeWithPdsCleared(1);
+        startNodeWithPdsCleared(3);
+        startClientGrid(SERVER_NODES_CNT);
+
+        grid(0).cluster().state(ACTIVE);
+
+        checkCacheLostParitionsDetected(SERVER_CACHE);
+        checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+        grid(0).resetLostPartitions(grid(0).cacheNames());
+
+        checkNoLostPartitionsDetected();
+
+        grid(0).cluster().state(INACTIVE);
+        grid(0).cluster().state(ACTIVE);
+
+        checkNoLostPartitionsDetected();
+    }
+
+    /** */
+    private void prepareCluster() throws Exception {
+        startGrids(SERVER_NODES_CNT);
+        startClientGrid(SERVER_NODES_CNT);
+
+        grid(0).cluster().state(ACTIVE);
+    }
+
+    /** */
+    private void checkNoLostPartitionsDetected() {
+        for (Ignite grid : G.allGrids()) {
+            for (String cacheName : grid.cacheNames())
+                assertTrue(F.isEmpty(grid.cache(cacheName).lostPartitions()));
+        }
+    }
+
+    /** */
+    private void checkReadAvailableFromLostPartitions(String cacheName, 
Collection<Integer> parts) {
+        assertFalse(parts.isEmpty());
+
+        for (Ignite grid : G.allGrids()) {
+            for (Integer part : parts)
+                assertNull(readKeyFromPartition(grid, cacheName, part));
+        }
+    }
+
+    /** */
+    private Collection<Integer> checkCacheLostParitionsDetected(String 
cacheName) {
+        List<Collection<Integer>> nodeLostParts = new ArrayList<>();
+
+        for (Ignite grid : G.allGrids())
+            nodeLostParts.add(grid.cache(cacheName).lostPartitions());
+
+        assertFalse(nodeLostParts.isEmpty());
+
+        Collection<Integer> lostPars = nodeLostParts.get(0);
+
+        for (int i = 1; i < nodeLostParts.size(); i++)
+            assertEquals(lostPars, nodeLostParts.get(i));
+
+        for (Ignite grid : G.allGrids()) {
+            for (int part : lostPars) {
+                assertThrowsAnyCause(
+                    log,
+                    () -> readKeyFromPartition(grid, cacheName, part),
+                    CacheInvalidStateException.class,
+                    "Failed to execute the cache operation (all partition 
owners have left the grid");
+            }
+        }
+
+        return lostPars;
+    }
+
+    /** */
+    private CacheConfiguration<Integer, Integer> 
createCacheConfiguration(String name) {
+        return new CacheConfiguration<Integer, Integer>()
+            .setName(name)
+            .setPartitionLossPolicy(READ_ONLY_SAFE)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setBackups(1)
+            .setAffinity(new RendezvousAffinityFunction()
+                .setAffinityBackupFilter(new 
ClusterNodeAttributeColocatedBackupFilter("CELL"))
+                .setPartitions(10));
+    }
+
+    /** */
+    private void writeKeyToParition(String cacheName, int part) {
+        for (int key = 0; key < CACHE_KEYS_CNT; key++) {
+            if (grid(0).affinity(cacheName).partition(key) == part) {
+                grid(0).<Integer, Integer>cache(cacheName).put(key, key);
+
+                return;
+            }
+        }
+
+        throw new IllegalStateException();
+    }
+
+    /** */
+    private Integer readKeyFromPartition(Ignite grid, String cacheName, int 
part) {
+        for (int key = 0; key < CACHE_KEYS_CNT; key++) {
+            if (grid(0).affinity(cacheName).partition(key) == part)
+                return grid.<Integer, Integer>cache(cacheName).get(key);
+        }
+
+        throw new IllegalStateException();
+    }
+
+    /** */
+    private void startNodeWithPdsCleared(int nodeIdx) throws Exception {
+        U.delete(workDirectory(nodeIdx));
+
+        startGrid(nodeIdx);
+    }
+
+    /** */
+    private void fillCaches() throws Exception {
+        
grid(SERVER_NODES_CNT).createCache(createCacheConfiguration(CLIENT_CACHE));
+
+        for (String cacheName : grid(0).cacheNames()) {
+            for (int i = 0; i < CACHE_KEYS_CNT; i++)
+                grid(0).cache(cacheName).put(i, i);
+        }
+
+        forceCheckpoint();
+    }
+
+    /** */
+    private Path workDirectory(int nodeIdx) throws Exception {
+        return Paths.get(U.defaultWorkDirectory(), 
U.maskForFileName(getTestIgniteInstanceName(nodeIdx)));
+    }
+
+    /** */
+    private void restartCellWithDataCleared() throws Exception {
+        stopCell();
+
+        startNodeWithPdsCleared(1);
+        startNodeWithPdsCleared(3);
+    }
+
+    /** */
+    private void stopCell() {
+        stopGrid(1);
+        stopGrid(3);
+    }
+
+    /** */
+    private void startCell() throws Exception {
+        startGrid(1);
+        startGrid(3);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
index 222161fac11..38ab77dcf7b 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Movi
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PreloadingRestartWhileClearingPartitionTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.RentingPartitionIsOwnedDuringEvictionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteLostPartitionsOnLeaveBaselineSelfTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteLostPartitionsRecoveryTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.AtomicPartitionCounterStateConsistencyHistoryRebalanceTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.AtomicPartitionCounterStateConsistencyTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.AtomicVolatilePartitionCounterStateConsistencyTest;
@@ -79,6 +80,7 @@ public class IgniteCacheTestSuite12 {
         GridTestUtils.addTestIfNeeded(suite, 
CachePartitionLostAfterSupplierHasLeftTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CachePartitionLossWithPersistenceTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteLostPartitionsOnLeaveBaselineSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
IgniteLostPartitionsRecoveryTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
TxCrossCacheMapOnInvalidTopologyTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
TxCrossCacheRemoteMultiplePartitionReservationTest.class, ignoredTests);

Reply via email to