This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 39c872a85b4 IGNITE-26168 Added support of partition loss detection
after cluster inactivity (#12288)
39c872a85b4 is described below
commit 39c872a85b46bc88b7a1ba7b09a647a004fb299c
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sun Sep 7 20:50:56 2025 +0300
IGNITE-26168 Added support of partition loss detection after cluster
inactivity (#12288)
---
.../processors/cache/CacheGroupContext.java | 5 +
.../processors/cache/CacheGroupRecoveryState.java | 79 +++
.../cache/CacheJoinNodeDiscoveryData.java | 14 +
.../cache/CacheNodeCommonDiscoveryData.java | 17 +-
.../cache/ClusterCacheGroupRecoveryData.java | 73 +++
.../processors/cache/ClusterCachesInfo.java | 37 +-
.../cache/GridCachePartitionExchangeManager.java | 2 +
.../processors/cache/GridCacheProcessor.java | 104 +++-
.../preloader/CachePartitionFullCountersMap.java | 15 +
.../preloader/GridDhtPartitionsExchangeFuture.java | 119 ++---
.../preloader/GridDhtPartitionsSingleMessage.java | 5 +
.../dht/topology/GridClientPartitionTopology.java | 10 +-
.../dht/topology/GridDhtPartitionTopology.java | 6 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 210 +++++---
.../processors/cluster/BaselineTopology.java | 5 +
.../cluster/GridClusterStateProcessor.java | 6 -
.../cluster/IGridClusterStateProcessor.java | 6 -
.../IgniteLostPartitionsRecoveryTest.java | 533 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite12.java | 2 +
19 files changed, 1056 insertions(+), 192 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index f7ba9b9a2c2..80c20e2b556 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -1344,6 +1344,11 @@ public class CacheGroupContext {
return preparedToStop;
}
+ /** */
+ public void applyRecoveryData(CacheGroupRecoveryState grpState) {
+ top.applyRecoveryData(grpState);
+ }
+
/**
* @param ccfg Cache configuration.
* @param plugins Ignite plugin processor.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java
new file mode 100644
index 00000000000..880fa09b21c
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+public class CacheGroupRecoveryState implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private Set<Integer> lostParts;
+
+ /** */
+ private Set<Integer> zeroParts;
+
+ /** */
+ public CacheGroupRecoveryState() {
+ // No-op.
+ }
+
+ /** */
+ public CacheGroupRecoveryState(CacheGroupContext grp) {
+ GridDhtPartitionTopology top = grp.topology();
+
+ lostParts = top.lostPartitions();
+ zeroParts = top.fullUpdateCounters().zeroUpdateCounterPartitions();
+ }
+
+ /** */
+ public Set<Integer> zeroUpdateCounterParititons() {
+ return Collections.unmodifiableSet(zeroParts);
+ }
+
+ /** */
+ public Set<Integer> lostParititons() {
+ return Collections.unmodifiableSet(lostParts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeIntArray(out,
lostParts.stream().mapToInt(Number::intValue).toArray());
+ U.writeIntArray(out,
zeroParts.stream().mapToInt(Number::intValue).toArray());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ int[] lostParts = U.readIntArray(in);
+ this.lostParts = lostParts.length == 0 ? Collections.emptySet() :
Arrays.stream(lostParts).boxed().collect(Collectors.toSet());
+
+ int[] zeroParts = U.readIntArray(in);
+ this.zeroParts = zeroParts.length == 0 ? Collections.emptySet() :
Arrays.stream(zeroParts).boxed().collect(Collectors.toSet());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index 47f8dc42b9c..700d5166608 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
/**
* Information about configured caches sent from joining node.
@@ -47,6 +48,9 @@ public class CacheJoinNodeDiscoveryData implements
Serializable {
/** */
private final boolean startCaches;
+ /** */
+ @Nullable private ClusterCacheGroupRecoveryData
clusterCacheGroupRecoveryData;
+
/**
* @param cacheDeploymentId Deployment ID for started caches.
* @param caches Caches.
@@ -93,6 +97,16 @@ public class CacheJoinNodeDiscoveryData implements
Serializable {
return caches;
}
+ /** */
+ public void clusterCacheGroupRecoveryData(@Nullable
ClusterCacheGroupRecoveryData clusterCacheGroupRecoveryData) {
+ this.clusterCacheGroupRecoveryData = clusterCacheGroupRecoveryData;
+ }
+
+ /** */
+ @Nullable public ClusterCacheGroupRecoveryData
clusterCacheGroupRecoveryData() {
+ return clusterCacheGroupRecoveryData;
+ }
+
/**
*
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
index abcc1927d92..379d49e46d3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/**
* Cache information sent in discovery data to joining node.
@@ -47,19 +48,25 @@ public class CacheNodeCommonDiscoveryData implements
Serializable {
private final Map<String, Map<UUID, Boolean>> clientNodesMap;
/** */
- private Collection<String> restartingCaches;
+ private final Collection<String> restartingCaches;
+
+ /** */
+ @Nullable private final ClusterCacheGroupRecoveryData
clusterCacheGrpRecoveryData;
/**
* @param caches Started caches.
* @param templates Configured templates.
* @param cacheGrps Started cache groups.
* @param clientNodesMap Information about cache client nodes.
+ * @param restartingCaches Collection of cache names that is being
restarted.
+ * @param clusterCacheGrpRecoveryData Cluster cache group recovery data.
*/
public CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
Map<String, CacheData> templates,
Map<Integer, CacheGroupData> cacheGrps,
Map<String, Map<UUID, Boolean>> clientNodesMap,
- Collection<String> restartingCaches
+ Collection<String> restartingCaches,
+ @Nullable ClusterCacheGroupRecoveryData clusterCacheGrpRecoveryData
) {
assert caches != null;
assert templates != null;
@@ -71,6 +78,7 @@ public class CacheNodeCommonDiscoveryData implements
Serializable {
this.cacheGrps = cacheGrps;
this.clientNodesMap = clientNodesMap;
this.restartingCaches = restartingCaches;
+ this.clusterCacheGrpRecoveryData = clusterCacheGrpRecoveryData;
}
/**
@@ -80,6 +88,11 @@ public class CacheNodeCommonDiscoveryData implements
Serializable {
return cacheGrps;
}
+ /** */
+ @Nullable ClusterCacheGroupRecoveryData clusterCacheGroupRecoveryData() {
+ return clusterCacheGrpRecoveryData;
+ }
+
/**
* @return Started caches.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java
new file mode 100644
index 00000000000..0d71593378a
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class ClusterCacheGroupRecoveryData implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long clusterBaselineTopologyVersion;
+
+ /** */
+ private Map<Integer, CacheGroupRecoveryState> grpStates;
+
+ /** */
+ public ClusterCacheGroupRecoveryData() {
+ // No-op.
+ }
+
+ /** */
+ public ClusterCacheGroupRecoveryData(long clusterBaselineTopologyVersion,
Collection<CacheGroupContext> grps) {
+ this.clusterBaselineTopologyVersion = clusterBaselineTopologyVersion;
+ this.grpStates =
grps.stream().collect(Collectors.toMap(CacheGroupContext::groupId,
CacheGroupRecoveryState::new));
+ }
+
+ /** */
+ public boolean isMoreRelevantThan(ClusterCacheGroupRecoveryData data) {
+ return clusterBaselineTopologyVersion >
data.clusterBaselineTopologyVersion;
+ }
+
+ /** */
+ @Nullable public CacheGroupRecoveryState cacheGroupRecoveryState(int
grpId) {
+ return grpStates.get(grpId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(clusterBaselineTopologyVersion);
+ U.writeMap(out, grpStates);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ clusterBaselineTopologyVersion = in.readLong();
+ grpStates = U.readHashMap(in);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 86bce4091ea..a76d7dfb50e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -174,6 +174,9 @@ public class ClusterCachesInfo {
/** Flag that caches were already filtered out. */
private final AtomicBoolean alreadyFiltered = new AtomicBoolean();
+ /** */
+ @Nullable private volatile ClusterCacheGroupRecoveryData
clusterCacheGrpRecoveryData;
+
/**
* @param ctx Context.
*/
@@ -1484,7 +1487,9 @@ public class ClusterCachesInfo {
templates,
cacheGrps,
ctx.discovery().clientNodesMap(),
- restarting);
+ restarting,
+ clusterCacheGrpRecoveryData
+ );
}
/**
@@ -1716,6 +1721,8 @@ public class ClusterCachesInfo {
grpData.config().getNodeFilter(),
grpData.config().getCacheMode());
}
+
+ clusterCacheGrpRecoveryData =
cachesData.clusterCacheGroupRecoveryData();
}
/**
@@ -2198,6 +2205,8 @@ public class ClusterCachesInfo {
private String processJoiningNode(CacheJoinNodeDiscoveryData joinData,
UUID nodeId, boolean locJoin) {
registerNewCacheTemplates(joinData, nodeId);
+
processJoiningNodeClusterCacheGroupRecoveryData(joinData.clusterCacheGroupRecoveryData());
+
Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new
HashMap<>();
boolean hasSchemaPatchConflict = false;
@@ -2805,6 +2814,32 @@ public class ClusterCachesInfo {
restartingCaches.clear();
}
+ /** */
+ @Nullable public ClusterCacheGroupRecoveryData
clusterCacheGroupRecoveryData() {
+ return clusterCacheGrpRecoveryData;
+ }
+
+ /** */
+ public void clusterCacheGroupRecoveryData(ClusterCacheGroupRecoveryData
data) {
+ clusterCacheGrpRecoveryData = data;
+ }
+
+ /** */
+ private void
processJoiningNodeClusterCacheGroupRecoveryData(ClusterCacheGroupRecoveryData
joiningNodeData) {
+ if (joiningNodeData == null)
+ return;
+
+ DiscoveryDataClusterState clusterState = ctx.state().clusterState();
+
+ if (clusterState.transition() || clusterState.state().active())
+ return;
+
+ ClusterCacheGroupRecoveryData locData = clusterCacheGrpRecoveryData;
+
+ if (locData == null || joiningNodeData.isMoreRelevantThan(locData))
+ clusterCacheGrpRecoveryData = joiningNodeData;
+ }
+
/**
* Holds direct comparator (first system caches) and reverse comparator
(first user caches).
* Use DIRECT comparator for ordering cache start operations.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 35e2706e63e..6bff39ae692 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1576,6 +1576,8 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
m.addPartitionSizes(grp.groupId(),
grp.topology().partitionSizes());
}
+ else if (sndCounters && grp.persistenceEnabled() &&
exchActions.deactivate())
+ m.addPartitionUpdateCounters(grp.groupId(),
grp.topology().localUpdateCounters(true));
}
for (GridClientPartitionTopology top : clientTops.values()) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 07e00b9a2b6..fefd243a2e4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -102,6 +102,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCluster
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import
org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.FinishPreloadingTask;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionDefferedDeleteQueueCleanupTask;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager;
@@ -228,6 +229,9 @@ import static
org.apache.ignite.internal.util.IgniteUtils.doInParallel;
*/
@SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
public class GridCacheProcessor extends GridProcessorAdapter {
+ /** */
+ public static final String
METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY =
"org.apache.ignite.cluster.cache.group.recovery";
+
/** */
public static final String CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT =
"Failed to perform %s operation (cluster is in read-only mode)
[cacheGrp=%s, cache=%s]";
@@ -2905,9 +2909,7 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
}
}
- /**
- * @param grpToStop Group for which listener shuold be removed.
- */
+ /** @param grpToStop Group for which listener should be removed. */
private void
removeOffheapCheckpointListener(List<IgniteBiTuple<CacheGroupContext, Boolean>>
grpToStop) {
sharedCtx.database().checkpointReadLock();
try {
@@ -2929,15 +2931,13 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
/**
* Callback invoked when first exchange future for dynamic cache is
completed.
*
- * @param cacheStartVer Started caches version to create proxy for.
- * @param exchActions Change requests.
+ * @param fut Partitions exchage future.
* @param err Error.
*/
- public void onExchangeDone(
- AffinityTopologyVersion cacheStartVer,
- @Nullable ExchangeActions exchActions,
- @Nullable Throwable err
- ) {
+ public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, @Nullable
Throwable err) throws IgniteCheckedException {
+ AffinityTopologyVersion cacheStartVer = fut.initialVersion();
+ ExchangeActions exchActions = fut.exchangeActions();
+
initCacheProxies(cacheStartVer, err);
if (exchActions == null)
@@ -2946,8 +2946,11 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
if (exchActions.systemCachesStarting() &&
exchActions.stateChangeRequest() == null)
ctx.dataStructures().restoreStructuresState(ctx);
- if (err == null)
+ if (err == null) {
+ processCacheGroupRecoveryDataOnExchangeDone(fut);
+
processCacheStopRequestOnExchangeDone(cacheStartVer, exchActions);
+ }
}
/**
@@ -3018,7 +3021,6 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
* @return Shared context.
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings("unchecked")
private GridCacheSharedContext createSharedContext(
GridKernalContext kernalCtx,
Collection<CacheStoreSessionListener> storeSesLsnrs
@@ -5365,6 +5367,82 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
true)));
}
+ /** */
+ @Nullable private ClusterCacheGroupRecoveryData
restoreClusterCacheGroupRecoveryData(
+ ReadOnlyMetastorage metastorage
+ ) throws IgniteCheckedException {
+ return
(ClusterCacheGroupRecoveryData)metastorage.read(METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY);
+ }
+
+ /** */
+ private void
processCacheGroupRecoveryDataOnExchangeDone(GridDhtPartitionsExchangeFuture
fut) throws IgniteCheckedException {
+ if (sharedCtx.kernalContext().clientNode())
+ return;
+
+ if (fut.deactivateCluster())
+ persistClusterCacheGroupRecoveryData();
+ else if (fut.activateCluster() || fut.localJoinExchange())
+ clearClusterCacheGroupRecoveryData();
+ }
+
+ /** */
+ private void persistClusterCacheGroupRecoveryData() throws
IgniteCheckedException {
+ List<CacheGroupContext> persistenceEnabledGrps =
ctx.cache().cacheGroups().stream()
+ .filter(CacheGroupContext::persistenceEnabled)
+ .collect(Collectors.toList());
+
+ if (persistenceEnabledGrps.isEmpty())
+ return;
+
+ ClusterCacheGroupRecoveryData recData = new
ClusterCacheGroupRecoveryData(
+ ctx.state().clusterState().baselineTopology().version(),
+ persistenceEnabledGrps
+ );
+
+ sharedCtx.database().checkpointReadLock();
+
+ try {
+
sharedCtx.database().metaStorage().write(METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY,
recData);
+ }
+ finally {
+ sharedCtx.database().checkpointReadUnlock();
+ }
+
+ cachesInfo.clusterCacheGroupRecoveryData(recData);
+ }
+
+ /** */
+ private void clearClusterCacheGroupRecoveryData() throws
IgniteCheckedException {
+ if (cachesInfo.clusterCacheGroupRecoveryData() == null)
+ return;
+
+ sharedCtx.database().checkpointReadLock();
+
+ try {
+
sharedCtx.database().metaStorage().remove(METASTORAGE_CLUSTER_CACHE_GROUP_RECOVERY_DATA_KEY);
+ }
+ finally {
+ sharedCtx.database().checkpointReadUnlock();
+ }
+
+ cachesInfo.clusterCacheGroupRecoveryData(null);
+ }
+
+ /** */
+ public void applyCacheGroupRecoveryData() {
+ ClusterCacheGroupRecoveryData grpRecoveyData =
cachesInfo.clusterCacheGroupRecoveryData();
+
+ if (grpRecoveyData == null)
+ return;
+
+ for (CacheGroupContext grp : cacheGrps.values()) {
+ CacheGroupRecoveryState grpState =
grpRecoveyData.cacheGroupRecoveryState(grp.groupId());
+
+ if (grpState != null)
+ grp.applyRecoveryData(grpState);
+ }
+ }
+
/**
* Filter map by key.
*
@@ -5432,6 +5510,8 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage)
throws IgniteCheckedException {
CacheJoinNodeDiscoveryData data =
locCfgMgr.restoreCacheConfigurations();
+
data.clusterCacheGroupRecoveryData(restoreClusterCacheGroupRecoveryData(metastorage));
+
cachesInfo.onStart(data);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 1384a558e47..80858a8a106 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -19,6 +19,9 @@ package
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
*
@@ -89,6 +92,18 @@ public class CachePartitionFullCountersMap implements
Serializable {
updCntrs[p] = updCntr;
}
+ /** @return Set of partitions which update counter is zero. */
+ public Set<Integer> zeroUpdateCounterPartitions() {
+ Set<Integer> res = new HashSet<>();
+
+ for (int p = 0; p < updCntrs.length; p++) {
+ if (updCntrs[p] == 0)
+ res.add(p);
+ }
+
+ return res.isEmpty() ? Collections.emptySet() :
Collections.unmodifiableSet(res);
+ }
+
/**
* Clears full counters map.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0674e9de754..d9715338ea2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -187,9 +187,6 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
/** */
public static final String EXCHANGE_LATCH_ID = "exchange";
- /** */
- private static final String EXCHANGE_FREE_LATCH_ID = "exchange-free";
-
/** @see IgniteSystemProperties#IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT
*/
public static final int DFLT_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT = 30 *
60_000;
@@ -777,7 +774,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
/**
* @return {@code True} if deactivate cluster exchange.
*/
- private boolean deactivateCluster() {
+ public boolean deactivateCluster() {
return exchActions != null && exchActions.deactivate();
}
@@ -1753,12 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.kernalContext().pools().getSystemExecutorService(),
cctx.affinity().cacheGroups().values(),
desc -> {
- CacheGroupContext grp =
cctx.cache().cacheGroup(desc.groupId());
-
- GridDhtPartitionTopology top = grp != null ?
grp.topology() :
- cctx.exchange().clientTopology(desc.groupId(),
events().discoveryCache());
-
- top.beforeExchange(this, true, false); // Not
expecting new moving partitions.
+ partitionTopology(desc.groupId()).beforeExchange(this,
true, false); // Not expecting new moving partitions.
return null;
});
@@ -2467,7 +2459,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
comp.onDoneBeforeTopologyUnlock(this);
// Create and destroy caches and cache proxies.
- cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
+ cctx.cache().onExchangeDone(this, err);
Map<T2<Integer, Integer>, Long> locReserved =
partHistSuppliers.getReservations(cctx.localNodeId());
@@ -3163,7 +3155,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
}
boolean allReceived = false; // Received all expected messages.
- boolean updateSingleMap = false;
+ boolean isPartitionUpdateRequired = false;
FinishState finishState0 = null;
@@ -3188,7 +3180,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
assert crd.isLocal() : crd;
if (remaining.remove(nodeId)) {
- updateSingleMap = true;
+ isPartitionUpdateRequired = true;
pendingSingleUpdates++;
@@ -3236,11 +3228,12 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
return;
}
- if (updateSingleMap) {
+ if (isPartitionUpdateRequired) {
try {
- // Do not update partition map, in case cluster transitioning
to inactive state.
+ msgs.put(nodeId, msg);
+
if (!deactivateCluster())
- updatePartitionSingleMap(nodeId, msg);
+ updatePartitionSingleMap(msg);
}
finally {
synchronized (mux) {
@@ -3592,12 +3585,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.kernalContext().pools().getSystemExecutorService(),
cctx.affinity().cacheGroups().values(),
desc -> {
- CacheGroupContext grp =
cctx.cache().cacheGroup(desc.groupId());
-
- GridDhtPartitionTopology top = grp != null ?
grp.topology() :
- cctx.exchange().clientTopology(desc.groupId(),
events().discoveryCache());
-
- top.detectLostPartitions(resTopVer, this);
+
partitionTopology(desc.groupId()).detectLostPartitions(resTopVer, this);
return null;
});
@@ -3621,14 +3609,8 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.kernalContext().pools().getSystemExecutorService(),
cctx.affinity().caches().values(),
desc -> {
- if (cacheNames.contains(desc.cacheName())) {
- CacheGroupContext grp =
cctx.cache().cacheGroup(desc.groupId());
-
- GridDhtPartitionTopology top = grp != null ?
grp.topology() :
- cctx.exchange().clientTopology(desc.groupId(),
events().discoveryCache());
-
- top.resetLostPartitions(initialVersion());
- }
+ if (cacheNames.contains(desc.cacheName()))
+
partitionTopology(desc.groupId()).resetLostPartitions(initialVersion());
return null;
});
@@ -3759,7 +3741,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e
: mergedJoinExchMsgs.entrySet()) {
msgs.put(e.getKey(), e.getValue());
- updatePartitionSingleMap(e.getKey(), e.getValue());
+ updatePartitionSingleMap(e.getValue());
}
}
}
@@ -3778,12 +3760,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.kernalContext().pools().getSystemExecutorService(),
cctx.affinity().cacheGroups().values(),
desc -> {
- CacheGroupContext grp =
cctx.cache().cacheGroup(desc.groupId());
-
- GridDhtPartitionTopology top = grp != null ?
grp.topology() :
- cctx.exchange().clientTopology(desc.groupId(),
events().discoveryCache());
-
- top.beforeExchange(this, true, true);
+ partitionTopology(desc.groupId()).beforeExchange(this,
true, true);
return null;
});
@@ -3862,6 +3839,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
timeBag.finishGlobalStage("Apply update counters");
+ if (activateCluster())
+ cctx.cache().applyCacheGroupRecoveryData();
+
updateLastVersion(cctx.versions().last());
cctx.versions().onExchange(lastVer.get().order());
@@ -3970,24 +3950,6 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.kernalContext().state().onStateChangeError(exchangeGlobalExceptions, req);
}
- else {
- boolean hasMoving = !partsToReload.isEmpty();
-
- Set<Integer> waitGrps = cctx.affinity().waitGroups();
-
- if (!hasMoving) {
- for (CacheGroupContext grpCtx :
cctx.cache().cacheGroups()) {
- if (waitGrps.contains(grpCtx.groupId()) &&
grpCtx.topology().hasMovingPartitions()) {
- hasMoving = true;
-
- break;
- }
-
- }
- }
-
-
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
- }
if
(!cctx.kernalContext().state().clusterState().localBaselineAutoAdjustment()) {
ClusterState state = stateChangeErr ?
ClusterState.INACTIVE : req.state();
@@ -4030,20 +3992,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
GridDhtPartitionsSingleMessage msg,
Map<Integer, CacheGroupAffinityMessage> messageAccumulator
) {
- for (Map.Entry<Integer, GridDhtPartitionMap> e :
msg.partitions().entrySet()) {
- Integer grpId = e.getKey();
-
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
- GridDhtPartitionTopology top = grp != null
- ? grp.topology()
- : cctx.exchange().clientTopology(grpId,
events().discoveryCache());
-
- CachePartitionPartialCountersMap cntrs =
msg.partitionUpdateCounters(grpId);
-
- if (cntrs != null)
- top.collectUpdateCounters(cntrs);
- }
+ msg.partitionUpdateCounters().forEach((grpId, updCntrs) ->
partitionTopology(grpId).collectUpdateCounters(updCntrs));
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
@@ -4139,11 +4088,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.kernalContext().pools().getSystemExecutorService(),
cacheGroupDescriptors(),
grpDesc -> {
- CacheGroupContext grpCtx =
cctx.cache().cacheGroup(grpDesc.groupId());
-
- GridDhtPartitionTopology top = grpCtx != null
- ? grpCtx.topology()
- : cctx.exchange().clientTopology(grpDesc.groupId(),
events().discoveryCache());
+ GridDhtPartitionTopology top =
partitionTopology(grpDesc.groupId());
if (CU.isPersistentCache(grpDesc.config(),
cctx.gridConfig().getDataStorageConfiguration())) {
List<SupplyPartitionInfo> list;
@@ -4755,21 +4700,10 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
/**
* Updates partition map in all caches.
*
- * @param nodeId Node message received from.
* @param msg Partitions single message.
*/
- private void updatePartitionSingleMap(UUID nodeId,
GridDhtPartitionsSingleMessage msg) {
- msgs.put(nodeId, msg);
-
- for (Map.Entry<Integer, GridDhtPartitionMap> entry :
msg.partitions().entrySet()) {
- Integer grpId = entry.getKey();
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
- GridDhtPartitionTopology top = grp != null ? grp.topology() :
- cctx.exchange().clientTopology(grpId,
events().discoveryCache());
-
- top.update(exchId, entry.getValue(), false);
- }
+ private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) {
+ msg.partitions().forEach((grpId, partMap) ->
partitionTopology(grpId).update(exchId, partMap, false));
}
/**
@@ -5243,7 +5177,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
if (dynamicCacheStartExchange() && msg.getError() != null)
exchangeGlobalExceptions.put(e.getKey().id(),
msg.getError());
- updatePartitionSingleMap(e.getKey().id(), msg);
+ updatePartitionSingleMap(msg);
}
}
}
@@ -5366,6 +5300,15 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
return true;
}
+ /** @return Partiton topology for the group with specified ID. */
+ private GridDhtPartitionTopology partitionTopology(int grpId) {
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+ return grp != null
+ ? grp.topology()
+ : cctx.exchange().clientTopology(grpId, events().discoveryCache());
+ }
+
/** {@inheritDoc} */
@Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
return exchId.compareTo(fut.exchId);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index de334eaec82..d47f0f3312a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -203,6 +203,11 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
partCntrs.put(grpId, cntrMap);
}
+ /** @return Partition update counters per cache group. */
+ public Map<Integer, CachePartitionPartialCountersMap>
partitionUpdateCounters() {
+ return partCntrs == null ? Collections.emptyMap() :
Collections.unmodifiableMap(partCntrs);
+ }
+
/**
* @param grpId Cache group ID.
* @return Partition update counters.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 06531eefe49..9ca26f636ae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.CacheGroupRecoveryState;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -107,7 +108,7 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will
be ready to use. */
- private volatile GridDhtTopologyFuture topReadyFut;
+ private volatile GridDhtPartitionsExchangeFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -221,7 +222,7 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void updateTopologyVersion(
- GridDhtTopologyFuture exchFut,
+ GridDhtPartitionsExchangeFuture exchFut,
DiscoCache discoCache,
long updSeq,
boolean stopping
@@ -901,6 +902,11 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
// No-op on client topology.
}
+ /** {@inheritDoc} */
+ @Override public void applyRecoveryData(CacheGroupRecoveryState grpState) {
+ // No-op.
+ }
+
/**
* Method checks is new partition map more stale than current partition map
* New partition map is stale if topology version or update sequence are
less or equal than of current map
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 8efc3ee8ebb..0c2f415a7a1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -27,6 +27,7 @@ import
org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupRecoveryState;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
@@ -73,7 +74,7 @@ public interface GridDhtPartitionTopology {
* @throws IgniteInterruptedCheckedException If interrupted.
*/
public void updateTopologyVersion(
- GridDhtTopologyFuture exchFut,
+ GridDhtPartitionsExchangeFuture exchFut,
DiscoCache discoCache,
long updateSeq,
boolean stopping
@@ -336,6 +337,9 @@ public interface GridDhtPartitionTopology {
*/
public void applyUpdateCounters();
+ /** Restores cache group data after node restart if PDS is enabled. Called
on coordinator. */
+ public void applyRecoveryData(CacheGroupRecoveryState grpState);
+
/**
* Checks if there is at least one owner for each partition in the cache
topology for a local node.
* If not marks such a partition as LOST or OWNING depending on a policy.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 2c9d582eed9..b004053ca6f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -37,13 +36,14 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupRecoveryState;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -139,7 +139,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will
be ready to use. */
- private volatile GridDhtTopologyFuture topReadyFut;
+ private volatile GridDhtPartitionsExchangeFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -266,7 +266,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void updateTopologyVersion(
- GridDhtTopologyFuture exchFut,
+ GridDhtPartitionsExchangeFuture exchFut,
@NotNull DiscoCache discoCache,
long updSeq,
boolean stopping
@@ -701,10 +701,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
* @param updateSeq Update sequence to initialize full map.
*/
private void initializeFullMap(long updateSeq) {
- if (!(topReadyFut instanceof GridDhtPartitionsExchangeFuture))
- return;
-
- GridDhtPartitionsExchangeFuture exchFut =
(GridDhtPartitionsExchangeFuture)topReadyFut;
+ GridDhtPartitionsExchangeFuture exchFut = topReadyFut;
boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(),
grp.receivedFrom());
@@ -924,7 +921,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
loc.updateCounter(updCntr);
// Create a partition in lost state.
- if (lostParts != null && lostParts.contains(p))
+ if (F.contains(lostParts, p))
loc.markLost();
}
@@ -985,8 +982,6 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
if (!create)
return null;
- boolean created = false;
-
ctx.database().checkpointReadLock();
try {
@@ -1034,8 +1029,6 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
loc.resetUpdateCounter();
this.updateSeq.incrementAndGet();
-
- created = true;
}
}
finally {
@@ -1613,18 +1606,20 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
if (lostParts != null) {
this.lostParts = new HashSet<>(lostParts);
- for (Integer part : lostParts) {
- GridDhtLocalPartition locPart =
localPartition(part);
+ if (exchFut != null && !exchFut.activateCluster()) {
+ for (Integer part : lostParts) {
+ GridDhtLocalPartition locPart =
localPartition(part);
- // EVICTED partitions should not be marked
directly as LOST, or
- // part.clearFuture lifecycle will be broken after
resetting.
- // New partition should be created instead.
- if (locPart != null && locPart.state() != EVICTED)
{
- locPart.markLost();
+ // EVICTED partitions should not be marked
directly as LOST, or
+ // part.clearFuture lifecycle will be broken
after resetting.
+ // New partition should be created instead.
+ if (locPart != null && locPart.state() !=
EVICTED) {
+ locPart.markLost();
- GridDhtPartitionMap locMap =
partMap.get(ctx.localNodeId());
+ GridDhtPartitionMap locMap =
partMap.get(ctx.localNodeId());
- locMap.put(part, LOST);
+ locMap.put(part, LOST);
+ }
}
}
}
@@ -1773,7 +1768,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
U.nanosToMillis(acquiredNanos - nowNanos) + ']');
}
- if (stopping)
+ if (stopping && !isDeactivationInProgress(topReadyFut))
return;
for (int i = 0; i < cntrMap.size(); i++) {
@@ -1809,7 +1804,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
U.nanosToMillis(acquiredNanos - nowNanos) + ']');
}
- if (stopping)
+ if (stopping && !isDeactivationInProgress(topReadyFut))
return;
for (int i = 0; i < locParts.length(); i++) {
@@ -1855,6 +1850,20 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
}
}
+ /** {@inheritDoc} */
+ @Override public void applyRecoveryData(CacheGroupRecoveryState grpState) {
+ lock.writeLock().lock();
+
+ try {
+ restoreLostPartitions(grpState);
+
+ detectPartitionLossDuringInactivity(grpState);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
/**
* Method checks is new partition map more stale than current partition map
* New partition map is stale if topology version or update sequence are
less or equal than of current map
@@ -2037,8 +2046,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
lock.writeLock().lock();
try {
- assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) ||
-
assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion());
+ assert
assignment.topologyVersion().equals(topReadyFut.context().events().topologyVersion());
readyTopVer = lastTopChangeVer = assignment.topologyVersion();
@@ -2199,30 +2207,16 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
if (node2part == null)
return false;
- // Do not trigger lost partition events on activation.
- DiscoveryEvent discoEvt = fut.activateCluster() ? null :
fut.firstEvent();
-
- final GridClusterStateProcessor state =
grp.shared().kernalContext().state();
-
- boolean isInMemoryCluster = CU.isInMemoryCluster(
- grp.shared().kernalContext().discovery().allNodes(),
-
grp.shared().kernalContext().marshallerContext().jdkMarshaller(),
- U.resolveClassLoader(grp.shared().kernalContext().config())
- );
-
- boolean compatibleWithIgnorePlc = isInMemoryCluster
- && state.isBaselineAutoAdjustEnabled() &&
state.baselineAutoAdjustTimeout() == 0L;
-
// Calculate how data loss is handled.
- boolean safe = grp.config().getPartitionLossPolicy() != IGNORE
|| !compatibleWithIgnorePlc;
+ boolean isLossIgnored = isParitionLossIgnored();
- int parts = grp.affinity().partitions();
-
- Set<Integer> recentlyLost = null;
+ Set<Integer> recentlyLost = fut.activateCluster() &&
!F.isEmpty(lostParts)
+ ? new HashSet<>(lostParts)
+ : new HashSet<>();
boolean changed = false;
- for (int part = 0; part < parts; part++) {
+ for (int part = 0; part < grp.affinity().partitions(); part++)
{
boolean lost = F.contains(lostParts, part);
if (!lost) {
@@ -2240,27 +2234,10 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
if (!hasOwner) {
lost = true;
- // Do not detect and record lost partition in
IGNORE mode.
- if (safe) {
- if (lostParts == null)
- lostParts = new TreeSet<>();
-
- lostParts.add(part);
+ if (!isLossIgnored) {
+ registerLostPartition(part);
- if (discoEvt != null) {
- if (recentlyLost == null)
- recentlyLost = new HashSet<>();
-
- recentlyLost.add(part);
-
- if
(grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- grp.addRebalanceEvent(part,
- EVT_CACHE_REBALANCE_PART_DATA_LOST,
- discoEvt.eventNode(),
- discoEvt.type(),
- discoEvt.timestamp());
- }
- }
+ recentlyLost.add(part);
}
}
}
@@ -2274,7 +2251,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
final GridDhtPartitionState prevState =
locPart.state();
- changed = safe ? locPart.markLost() :
locPart.own();
+ changed = isLossIgnored ? locPart.own() :
locPart.markLost();
if (changed) {
long updSeq = updateSeq.incrementAndGet();
@@ -2295,17 +2272,12 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
GridDhtPartitionState p0 =
entry.getValue().get(part);
if (p0 != null && p0 != EVICTED)
- entry.getValue().put(part, safe ? LOST :
OWNING);
+ entry.getValue().put(part, isLossIgnored ?
OWNING : LOST);
}
}
}
- if (recentlyLost != null) {
- U.warn(log, "Detected lost partitions" + (!safe ? " (will
ignore)" : "")
- + " [grp=" + grp.cacheOrGroupName()
- + ", parts=" + S.toStringSortedDistinct(recentlyLost)
- + ", topVer=" + resTopVer + "]");
- }
+ recordPartitionLossEvents(fut, recentlyLost, isLossIgnored);
return changed;
}
@@ -2366,7 +2338,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
lock.readLock().lock();
try {
- return lostParts == null ? Collections.<Integer>emptySet() : new
HashSet<>(lostParts);
+ return lostParts == null ? Collections.emptySet() : new
HashSet<>(lostParts);
}
finally {
lock.readLock().unlock();
@@ -3273,6 +3245,96 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
// No-op.
}
+ /** */
+ private void restoreLostPartitions(CacheGroupRecoveryState grpState) {
+ grpState.lostParititons().forEach(this::registerLostPartition);
+ }
+
+ /** */
+ private void detectPartitionLossDuringInactivity(CacheGroupRecoveryState
grpState) {
+ Set<Integer> recoveryZeroParts =
grpState.zeroUpdateCounterParititons();
+
+ Set<Integer> detectedLostParts = new HashSet<>();
+
+ for (int zeroPart : cntrMap.zeroUpdateCounterPartitions()) {
+ if (!recoveryZeroParts.contains(zeroPart))
+ detectedLostParts.add(zeroPart);
+ }
+
+ detectedLostParts.forEach(this::registerLostPartition);
+
+ if (!detectedLostParts.isEmpty()) {
+ U.warn(log, "Some partitions data was not restored from the PDS
during cluster activation, but were present" +
+ " at the time of previous cluster deactivation. This may be
due to the Ignite PDS folder being" +
+ " cleared/temered on all primary and backup nodes for the
specified partitions during cluster inactivity." +
+ " [cacheGroup=" + grp.cacheOrGroupName() + ", partIds=" +
S.toStringSortedDistinct(detectedLostParts) + ']');
+ }
+ }
+
+ /** */
+ private boolean registerLostPartition(int partId) {
+ assert lock.isWriteLockedByCurrentThread();
+
+ if (lostParts == null)
+ lostParts = new HashSet<>();
+
+ return lostParts.add(partId);
+ }
+
+ /** */
+ private boolean isParitionLossIgnored() {
+ GridClusterStateProcessor state = grp.shared().kernalContext().state();
+
+ boolean isInMemoryCluster = CU.isInMemoryCluster(
+ grp.shared().kernalContext().discovery().allNodes(),
+ grp.shared().kernalContext().marshallerContext().jdkMarshaller(),
+ U.resolveClassLoader(grp.shared().kernalContext().config())
+ );
+
+ boolean isIgnorePlcSupported = isInMemoryCluster
+ && state.isBaselineAutoAdjustEnabled()
+ && state.baselineAutoAdjustTimeout() == 0L;
+
+ return isIgnorePlcSupported && grp.config().getPartitionLossPolicy()
== IGNORE;
+ }
+
+ /** */
+ private void recordPartitionLossEvents(
+ @Nullable GridDhtPartitionsExchangeFuture fut,
+ Collection<Integer> recentlyLost,
+ boolean isLossIgnored
+ ) {
+ if (fut == null || F.isEmpty(recentlyLost))
+ return;
+
+ if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+ DiscoveryEvent discoEvt = fut.firstEvent();
+
+ for (Integer part : recentlyLost) {
+ grp.addRebalanceEvent(part,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST,
+ discoEvt.eventNode(),
+ discoEvt.type(),
+ discoEvt.timestamp());
+ }
+ }
+
+ U.warn(log, "Detected lost partitions" + (isLossIgnored ? " (will
ignore)" : "")
+ + " [grp=" + grp.cacheOrGroupName()
+ + ", parts=" + S.toStringSortedDistinct(recentlyLost)
+ + ", topVer=" + fut.context().events().topologyVersion() + "]");
+ }
+
+ /** */
+ private boolean isDeactivationInProgress(GridDhtPartitionsExchangeFuture
exchFut) {
+ if (exchFut == null)
+ return false;
+
+ ExchangeActions exchangeActions = exchFut.exchangeActions();
+
+ return exchangeActions != null && exchangeActions.deactivate();
+ }
+
/**
* Collects states of local partitions.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
index fb7bb00a93c..1f71cb192a2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/BaselineTopology.java
@@ -205,6 +205,11 @@ public class BaselineTopology implements Serializable {
branchingHist.add(branchingPntHash);
}
+ /** */
+ public long version() {
+ return U.toLong(id, branchingHist.size());
+ }
+
/**
* @return id of this BaselineTopology.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index fc775b6b7aa..da3ecb539ef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -1676,12 +1676,6 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
return exchActs;
}
- /** {@inheritDoc} */
- @Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture
exchangeFuture,
- boolean hasMovingPartitions) {
- // no-op
- }
-
/** {@inheritDoc} */
@Override public boolean evictionsAllowed() {
return true;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
index f4f3d622096..fbd1df1bb9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
@@ -141,12 +141,6 @@ public interface IGridClusterStateProcessor extends
GridProcessor {
*/
void onBaselineTopologyChanged(BaselineTopology blt,
BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException;
- /**
- * @param exchangeFuture Exchange future.
- * @param hasMovingPartitions {@code True} if there are moving partitions.
- */
- void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchangeFuture,
boolean hasMovingPartitions);
-
/**
* @return {@code True} if partition evictions are allowed in current
state.
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteLostPartitionsRecoveryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteLostPartitionsRecoveryTest.java
new file mode 100644
index 00000000000..c5a1beeef8e
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteLostPartitionsRecoveryTest.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * The following tests are intended to check Ignite behaviour in case PDS is
enabled and some partitions are "lost"
+ * during cluster inactivity (either it is deactivated or completly stopped).
"Lost" means that the partition data is no
+ * longer available after the subsequent cluster activation - more formally,
the partition update counter has become
+ * zero. Tests use 4 nodes and 2 "cells". Each "cell" nodes store all copies
of some paritions. If all nodes in a "cell"
+ * lose their data, this will certainly result in data loss for some
partitions.
+ */
+public class IgniteLostPartitionsRecoveryTest extends GridCommonAbstractTest {
+ /** */
+ private static final int SERVER_NODES_CNT = 4;
+
+ /** */
+ private static final int CACHE_KEYS_CNT = 100;
+
+ /** */
+ private static final String SERVER_CACHE = "server-partitioned";
+
+ /** */
+ private static final String CLIENT_CACHE = "client-partitioned";
+
+ /** */
+ private final ListeningTestLogger listeningLogger = new
ListeningTestLogger(log);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+
.setWorkDirectory(workDirectory(getTestIgniteInstanceIndex(igniteInstanceName)).toString())
+ .setClusterStateOnStart(INACTIVE)
+ .setUserAttributes(singletonMap("CELL", "CELL-" +
(getTestIgniteInstanceIndex(igniteInstanceName)) % 2))
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setGridLogger(listeningLogger)
+ .setCacheConfiguration(createCacheConfiguration(SERVER_CACHE))
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration()
+ .setPersistenceEnabled(true))
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ for (int i = 0; i < SERVER_NODES_CNT + 1; i++)
+ U.delete(workDirectory(i));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ for (int i = 0; i < SERVER_NODES_CNT + 1; i++)
+ U.delete(workDirectory(i));
+
+ super.afterTest();
+ }
+
+ /** */
+ @Test
+ public void testPartitionLossDetectionOnActivation() throws Exception {
+ prepareCluster();
+
+ fillCaches();
+
+ grid(0).cluster().state(INACTIVE);
+
+ restartCellWithDataCleared();
+
+ LogListener logLsnr = LogListener.matches("Some partitions data was
not restored from the PDS during cluster activation")
+ .times(2)
+ .build();
+
+ listeningLogger.registerListener(logLsnr);
+
+ logLsnr.reset();
+
+ grid(0).cluster().state(ACTIVE);
+
+ logLsnr.check(getTestTimeout());
+
+ Collection<Integer> srvCacheLostParts =
checkCacheLostParitionsDetected(SERVER_CACHE);
+ Collection<Integer> cliCacheLostParts =
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+ grid(0).resetLostPartitions(grid(0).cacheNames());
+
+ checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+ checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+ }
+
+ /** */
+ @Test
+ public void testLostPartitionsRestoredAfterClusterRestart() throws
Exception {
+ prepareCluster();
+
+ fillCaches();
+
+ grid(0).cluster().state(INACTIVE);
+
+ restartCellWithDataCleared();
+
+ grid(0).cluster().state(ACTIVE);
+
+ checkCacheLostParitionsDetected(SERVER_CACHE);
+ checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+ grid(0).cluster().state(INACTIVE);
+
+ stopAllGrids();
+
+ prepareCluster();
+
+ Collection<Integer> srvCacheLostParts =
checkCacheLostParitionsDetected(SERVER_CACHE);
+ Collection<Integer> cliCacheLostParts =
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+ grid(0).resetLostPartitions(grid(0).cacheNames());
+
+ checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+ checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+ }
+
+
+ /** */
+ @Test
+ public void testLostPartitionsRestoredAfterInactivity() throws Exception {
+ prepareCluster();
+
+ fillCaches();
+
+ stopCell();
+
+ grid(0).cluster().state(INACTIVE);
+
+ startCell();
+
+ grid(0).cluster().state(ACTIVE);
+
+ grid(0).cacheNames().forEach(this::checkCacheLostParitionsDetected);
+
+ grid(0).resetLostPartitions(grid(0).cacheNames());
+
+ for (Ignite grid : G.allGrids()) {
+ for (String cacheName : grid(0).cacheNames()) {
+ for (int i = 0; i < CACHE_KEYS_CNT; i++)
+ assertEquals(i, grid.cache(cacheName).get(i));
+ }
+ }
+ }
+
+ /** */
+ @Test
+ public void testNodeJoinAfterActivation() throws Exception {
+ prepareCluster();
+
+ fillCaches();
+
+ grid(0).cluster().state(INACTIVE);
+
+ stopGrid(2);
+
+ grid(0).cluster().state(ACTIVE);
+ grid(0).cluster().state(INACTIVE);
+
+ restartCellWithDataCleared();
+
+ grid(0).cluster().state(ACTIVE);
+
+ startGrid(2);
+
+ Collection<Integer> srvCacheLostParts =
checkCacheLostParitionsDetected(SERVER_CACHE);
+ Collection<Integer> cliCacheLostParts =
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+ grid(0).resetLostPartitions(grid(0).cacheNames());
+
+ checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+ checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+ }
+
+ /** */
+ @Test
+ public void testNodeJoinDuringClusterStateTransition() throws Exception {
+ prepareCluster();
+
+ fillCaches();
+
+ grid(0).cluster().state(INACTIVE);
+
+ stopGrid(2);
+
+ grid(0).cluster().state(ACTIVE);
+ grid(0).cluster().state(INACTIVE);
+
+ restartCellWithDataCleared();
+
+ spi(grid(1)).blockMessages(GridDhtPartitionsSingleMessage.class,
grid(0).name());
+
+ CountDownLatch node2JoinedEvtListenedLatch = new CountDownLatch(3);
+
+ for (Integer nodeIdx : Arrays.asList(0, 1, 3)) {
+ grid(nodeIdx).events().localListen(event -> {
+ if (Objects.equals(getTestIgniteInstanceName(2),
((DiscoveryEvent)event).eventNode().attribute(ATTR_IGNITE_INSTANCE_NAME)))
+ node2JoinedEvtListenedLatch.countDown();
+
+ return true;
+ }, EVT_NODE_JOINED);
+ }
+
+ IgniteInternalFuture<Object> activationFut = runAsync(() ->
grid(0).cluster().state(ACTIVE));
+
+ spi(grid(1)).waitForBlocked();
+
+ runAsync(() -> startGrid(2));
+
+ assertTrue(node2JoinedEvtListenedLatch.await(getTestTimeout(),
MILLISECONDS));
+
+ spi(grid(1)).stopBlock();
+
+ activationFut.get(getTestTimeout());
+
+ Collection<Integer> srvCacheLostParts =
checkCacheLostParitionsDetected(SERVER_CACHE);
+ Collection<Integer> cliCacheLostParts =
checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+ grid(0).resetLostPartitions(grid(0).cacheNames());
+
+ checkReadAvailableFromLostPartitions(SERVER_CACHE, srvCacheLostParts);
+ checkReadAvailableFromLostPartitions(CLIENT_CACHE, cliCacheLostParts);
+ }
+
+ /** */
+ @Test
+ public void testClusterRestartWthEmptyPartitions() throws Exception {
+ prepareCluster();
+
+
grid(SERVER_NODES_CNT).createCache(createCacheConfiguration(CLIENT_CACHE));
+
+ writeKeyToParition(SERVER_CACHE, 0);
+ writeKeyToParition(SERVER_CACHE, 2);
+
+ writeKeyToParition(CLIENT_CACHE, 1);
+ writeKeyToParition(CLIENT_CACHE, 3);
+
+ forceCheckpoint();
+
+ grid(0).cluster().state(INACTIVE);
+ grid(0).cluster().state(ACTIVE);
+
+ checkNoLostPartitionsDetected();
+
+ grid(0).cluster().state(INACTIVE);
+
+ stopAllGrids();
+
+ prepareCluster();
+ }
+
+ /** */
+ @Test
+ public void testNodeJoinWithStaleCacheGroupRecoveryData() throws Exception
{
+ prepareCluster();
+
+ fillCaches();
+
+ grid(0).cluster().state(INACTIVE);
+
+ restartCellWithDataCleared();
+
+ grid(0).cluster().state(ACTIVE);
+
+ checkCacheLostParitionsDetected(SERVER_CACHE);
+ checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+ grid(0).cluster().state(INACTIVE);
+
+ stopGrid(2);
+
+ grid(0).cluster().state(ACTIVE);
+
+ grid(0).resetLostPartitions(grid(0).cacheNames());
+
+ checkNoLostPartitionsDetected();
+
+ grid(0).cluster().state(INACTIVE);
+
+ startGrid(2);
+
+ grid(0).cluster().state(ACTIVE);
+
+ checkNoLostPartitionsDetected();
+
+ grid(0).cluster().state(INACTIVE);
+
+ stopAllGrids();
+
+ startGrid(2);
+ startGrid(0);
+ startGrid(1);
+ startGrid(3);
+
+ grid(0).cluster().state(ACTIVE);
+
+ checkNoLostPartitionsDetected();
+ }
+
+ /** */
+ @Test
+ public void testCoordinatorWithMissingCacheGroupRecoveryData() throws
Exception {
+ prepareCluster();
+
+ fillCaches();
+
+ stopGrid(0);
+
+ grid(1).cluster().state(INACTIVE);
+
+ stopAllGrids();
+
+ startGrid(0);
+ startGrid(2);
+ startNodeWithPdsCleared(1);
+ startNodeWithPdsCleared(3);
+ startClientGrid(SERVER_NODES_CNT);
+
+ grid(0).cluster().state(ACTIVE);
+
+ checkCacheLostParitionsDetected(SERVER_CACHE);
+ checkCacheLostParitionsDetected(CLIENT_CACHE);
+
+ grid(0).resetLostPartitions(grid(0).cacheNames());
+
+ checkNoLostPartitionsDetected();
+
+ grid(0).cluster().state(INACTIVE);
+ grid(0).cluster().state(ACTIVE);
+
+ checkNoLostPartitionsDetected();
+ }
+
+ /** */
+ private void prepareCluster() throws Exception {
+ startGrids(SERVER_NODES_CNT);
+ startClientGrid(SERVER_NODES_CNT);
+
+ grid(0).cluster().state(ACTIVE);
+ }
+
+ /** */
+ private void checkNoLostPartitionsDetected() {
+ for (Ignite grid : G.allGrids()) {
+ for (String cacheName : grid.cacheNames())
+ assertTrue(F.isEmpty(grid.cache(cacheName).lostPartitions()));
+ }
+ }
+
+ /** */
+ private void checkReadAvailableFromLostPartitions(String cacheName,
Collection<Integer> parts) {
+ assertFalse(parts.isEmpty());
+
+ for (Ignite grid : G.allGrids()) {
+ for (Integer part : parts)
+ assertNull(readKeyFromPartition(grid, cacheName, part));
+ }
+ }
+
+ /** */
+ private Collection<Integer> checkCacheLostParitionsDetected(String
cacheName) {
+ List<Collection<Integer>> nodeLostParts = new ArrayList<>();
+
+ for (Ignite grid : G.allGrids())
+ nodeLostParts.add(grid.cache(cacheName).lostPartitions());
+
+ assertFalse(nodeLostParts.isEmpty());
+
+ Collection<Integer> lostPars = nodeLostParts.get(0);
+
+ for (int i = 1; i < nodeLostParts.size(); i++)
+ assertEquals(lostPars, nodeLostParts.get(i));
+
+ for (Ignite grid : G.allGrids()) {
+ for (int part : lostPars) {
+ assertThrowsAnyCause(
+ log,
+ () -> readKeyFromPartition(grid, cacheName, part),
+ CacheInvalidStateException.class,
+ "Failed to execute the cache operation (all partition
owners have left the grid");
+ }
+ }
+
+ return lostPars;
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Integer>
createCacheConfiguration(String name) {
+ return new CacheConfiguration<Integer, Integer>()
+ .setName(name)
+ .setPartitionLossPolicy(READ_ONLY_SAFE)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction()
+ .setAffinityBackupFilter(new
ClusterNodeAttributeColocatedBackupFilter("CELL"))
+ .setPartitions(10));
+ }
+
+ /** */
+ private void writeKeyToParition(String cacheName, int part) {
+ for (int key = 0; key < CACHE_KEYS_CNT; key++) {
+ if (grid(0).affinity(cacheName).partition(key) == part) {
+ grid(0).<Integer, Integer>cache(cacheName).put(key, key);
+
+ return;
+ }
+ }
+
+ throw new IllegalStateException();
+ }
+
+ /** */
+ private Integer readKeyFromPartition(Ignite grid, String cacheName, int
part) {
+ for (int key = 0; key < CACHE_KEYS_CNT; key++) {
+ if (grid(0).affinity(cacheName).partition(key) == part)
+ return grid.<Integer, Integer>cache(cacheName).get(key);
+ }
+
+ throw new IllegalStateException();
+ }
+
+ /** */
+ private void startNodeWithPdsCleared(int nodeIdx) throws Exception {
+ U.delete(workDirectory(nodeIdx));
+
+ startGrid(nodeIdx);
+ }
+
+ /** */
+ private void fillCaches() throws Exception {
+
grid(SERVER_NODES_CNT).createCache(createCacheConfiguration(CLIENT_CACHE));
+
+ for (String cacheName : grid(0).cacheNames()) {
+ for (int i = 0; i < CACHE_KEYS_CNT; i++)
+ grid(0).cache(cacheName).put(i, i);
+ }
+
+ forceCheckpoint();
+ }
+
+ /** */
+ private Path workDirectory(int nodeIdx) throws Exception {
+ return Paths.get(U.defaultWorkDirectory(),
U.maskForFileName(getTestIgniteInstanceName(nodeIdx)));
+ }
+
+ /** */
+ private void restartCellWithDataCleared() throws Exception {
+ stopCell();
+
+ startNodeWithPdsCleared(1);
+ startNodeWithPdsCleared(3);
+ }
+
+ /** */
+ private void stopCell() {
+ stopGrid(1);
+ stopGrid(3);
+ }
+
+ /** */
+ private void startCell() throws Exception {
+ startGrid(1);
+ startGrid(3);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
index 222161fac11..38ab77dcf7b 100755
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Movi
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PreloadingRestartWhileClearingPartitionTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.RentingPartitionIsOwnedDuringEvictionTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgniteLostPartitionsOnLeaveBaselineSelfTest;
+import
org.apache.ignite.internal.processors.cache.persistence.IgniteLostPartitionsRecoveryTest;
import
org.apache.ignite.internal.processors.cache.transactions.AtomicPartitionCounterStateConsistencyHistoryRebalanceTest;
import
org.apache.ignite.internal.processors.cache.transactions.AtomicPartitionCounterStateConsistencyTest;
import
org.apache.ignite.internal.processors.cache.transactions.AtomicVolatilePartitionCounterStateConsistencyTest;
@@ -79,6 +80,7 @@ public class IgniteCacheTestSuite12 {
GridTestUtils.addTestIfNeeded(suite,
CachePartitionLostAfterSupplierHasLeftTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CachePartitionLossWithPersistenceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteLostPartitionsOnLeaveBaselineSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
IgniteLostPartitionsRecoveryTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
TxCrossCacheMapOnInvalidTopologyTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
TxCrossCacheRemoteMultiplePartitionReservationTest.class, ignoredTests);