This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8fcd8d562e3 IGNITE-25979 Support
DisasterRecoveryManager#restartPartitionWithCleanup in Colocation track (#6458)
8fcd8d562e3 is described below
commit 8fcd8d562e3cad946439f8f256e082478179a783
Author: Cyrill <[email protected]>
AuthorDate: Tue Aug 26 14:37:06 2025 +0300
IGNITE-25979 Support DisasterRecoveryManager#restartPartitionWithCleanup in
Colocation track (#6458)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../PartitionReplicaLifecycleManager.java | 54 ++++
.../disaster/DisasterRecoveryManager.java | 40 +++
.../disaster/ManualGroupRestartRequest.java | 267 +++++++++-------
.../disaster/ItDisasterRecoveryManagerTest.java | 346 ++++++++++++++++++++-
4 files changed, 586 insertions(+), 121 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 4e96023c632..02b6e0217a0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -1211,6 +1211,22 @@ public class PartitionReplicaLifecycleManager extends
);
}
+ /**
+ * Stops all resources associated with a given partition and destroys its
storage, like replicas and partition trackers.
+ * Calls {@link ReplicaManager#weakStopReplica} in order to change the
replica state.
+ *
+ * @param zonePartitionId Partition ID.
+ * @param revision Revision.
+ * @return Future that will be completed after all resources have been
closed and destroyed.
+ */
+ private CompletableFuture<Void>
stopPartitionAndDestroyForRestart(ZonePartitionId zonePartitionId, long
revision) {
+ return replicaMgr.weakStopReplica(
+ zonePartitionId,
+ WeakReplicaStopReason.RESTART,
+ () -> stopAndDestroyPartition(zonePartitionId, revision)
+ );
+ }
+
private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry
pendingAssignmentsEntry, long revision, boolean isRecovery) {
if (pendingAssignmentsEntry.value() == null ||
pendingAssignmentsEntry.empty()) {
return nullCompletedFuture();
@@ -1720,6 +1736,44 @@ public class PartitionReplicaLifecycleManager extends
return zoneResourcesManager.removeTableResources(zonePartitionId,
tableId);
}
+ /**
+ * Restarts the zone's partition including the replica and raft node with
storage cleanup.
+ *
+ * @param zonePartitionId Zone's partition that needs to be restarted.
+ * @param revision Metastore revision.
+ * @param assignmentsTimestamp Timestamp of the assignments.
+ * @return Operation future.
+ */
+ public CompletableFuture<?> restartPartitionWithCleanUp(ZonePartitionId
zonePartitionId, long revision, long assignmentsTimestamp) {
+ return inBusyLockAsync(busyLock, () ->
stopPartitionAndDestroyForRestart(zonePartitionId,
revision).thenComposeAsync(unused -> {
+ Assignments stableAssignments =
zoneStableAssignmentsGetLocally(metaStorageMgr, zonePartitionId, revision);
+
+ assert stableAssignments != null : "zonePartitionId=" +
zonePartitionId + ", revision=" + revision;
+
+ return
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused2 ->
inBusyLockAsync(busyLock, () -> {
+ Assignment localAssignment =
localAssignment(stableAssignments);
+
+ if (localAssignment == null) {
+ // (0) in case if node not in the assignments
+ return nullCompletedFuture();
+ }
+
+ CatalogZoneDescriptor zoneDescriptor =
zoneDescriptorAt(zonePartitionId.zoneId(), assignmentsTimestamp);
+
+ return createZonePartitionReplicationNode(
+ zonePartitionId,
+ localAssignment,
+ stableAssignments,
+ revision,
+ zoneDescriptor.partitions(),
+ isVolatileZone(zoneDescriptor),
+ false,
+ false
+ );
+ }));
+ }, ioExecutor));
+ }
+
/**
* Restarts the zone's partition including the replica and raft node.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index decbfa23a84..81e144c4d42 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -644,6 +644,46 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
}
}
+ /**
+ * Restart partitions of a zone with cleanup. This method destroys
partition storage during restart.
+ *
+ * @param nodeNames Names of nodes to restart partitions on. If empty,
restart on all nodes.
+ * @param zoneName Zone name. Case-sensitive, without quotes.
+ * @param partitionIds IDs of partitions to restart. If empty, restart all
zone's partitions.
+ * @return Future that completes when partitions are restarted.
+ */
+ public CompletableFuture<Void> restartPartitionsWithCleanup(
+ Set<String> nodeNames,
+ String zoneName,
+ Set<Integer> partitionIds
+ ) {
+ try {
+ // Validates passed node names.
+ getNodes(nodeNames);
+
+ Catalog catalog = catalogLatestVersion();
+
+ CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+ checkPartitionsRange(partitionIds, Set.of(zone));
+
+ return processNewRequest(new ManualGroupRestartRequest(
+ UUID.randomUUID(),
+ zone.id(),
+ // We pass here -1 as table id because it is not used for
zone-based partitions.
+ // We expect that the field will be removed once
colocation track is finished.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22522
+ -1,
+ partitionIds,
+ nodeNames,
+ catalog.time(),
+ true
+ ));
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ }
+
/**
* Returns states of partitions in the cluster. Result is a mapping of
{@link ZonePartitionId} to the mapping between a node name and a
* partition state.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 92c0447fde4..7356d30733c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -19,12 +19,12 @@ package
org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableStableAssignments;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneStableAssignments;
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.tableState;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneState;
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestType.MULTI_NODE;
import static
org.apache.ignite.internal.table.distributed.disaster.GroupUpdateRequestHandler.getAliveNodesWithData;
-import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.RESTART_WITH_CLEAN_UP_ERR;
@@ -33,15 +33,17 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
-import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -123,153 +125,194 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
return nullCompletedFuture();
}
+ Catalog catalog =
disasterRecoveryManager.catalogManager.activeCatalog(timestamp.longValue());
+ CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
var restartFutures = new ArrayList<CompletableFuture<?>>();
- if (cleanUp) {
- restartPartitionWithCleanup(disasterRecoveryManager, revision,
timestamp, restartFutures);
- } else {
- restartPartition(disasterRecoveryManager, revision,
restartFutures);
- }
+ disasterRecoveryManager.raftManager.forEach((raftNodeId,
raftGroupService) -> {
+ ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+ if (shouldProcessPartition(replicationGroupId)) {
+ if (cleanUp) {
+ restartFutures.add(
+
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId,
revision, zoneDescriptor, catalog)
+ );
+ } else {
+
restartFutures.add(createRestartFuture(disasterRecoveryManager,
replicationGroupId, revision));
+ }
+ }
+ });
return restartFutures.isEmpty() ? nullCompletedFuture() :
allOf(restartFutures.toArray(CompletableFuture[]::new));
}
- private void restartPartition(
+ private boolean shouldProcessPartition(ReplicationGroupId
replicationGroupId) {
+ if (replicationGroupId instanceof TablePartitionId) {
+ TablePartitionId groupId = (TablePartitionId) replicationGroupId;
+ return groupId.tableId() == tableId &&
partitionIds.contains(groupId.partitionId());
+ } else if (replicationGroupId instanceof ZonePartitionId) {
+ ZonePartitionId groupId = (ZonePartitionId) replicationGroupId;
+ return groupId.zoneId() == zoneId &&
partitionIds.contains(groupId.partitionId());
+ }
+ return false;
+ }
+
+ private CompletableFuture<?> createRestartFuture(
DisasterRecoveryManager disasterRecoveryManager,
- long revision,
- ArrayList<CompletableFuture<?>> restartFutures
+ ReplicationGroupId replicationGroupId,
+ long revision
) {
- disasterRecoveryManager.raftManager.forEach((raftNodeId,
raftGroupService) -> {
- ReplicationGroupId replicationGroupId = raftNodeId.groupId();
-
- if (replicationGroupId instanceof TablePartitionId) {
- TablePartitionId groupId = (TablePartitionId)
replicationGroupId;
+ if (replicationGroupId instanceof TablePartitionId) {
+ return disasterRecoveryManager.tableManager.restartPartition(
+ (TablePartitionId) replicationGroupId,
+ revision,
+ assignmentsTimestamp
+ );
+ } else if (replicationGroupId instanceof ZonePartitionId) {
+ return
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartition(
+ (ZonePartitionId) replicationGroupId,
+ revision,
+ assignmentsTimestamp
+ );
+ }
+ throw new IllegalStateException("Unexpected replication group id: " +
replicationGroupId);
+ }
- if (groupId.tableId() == tableId &&
partitionIds.contains(groupId.partitionId())) {
-
restartFutures.add(disasterRecoveryManager.tableManager.restartPartition(groupId,
revision, assignmentsTimestamp));
- }
- } else {
- if (replicationGroupId instanceof ZonePartitionId) {
- ZonePartitionId groupId = (ZonePartitionId)
replicationGroupId;
-
- if (groupId.zoneId() == zoneId &&
partitionIds.contains(groupId.partitionId())) {
- restartFutures.add(
-
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartition(
- groupId,
- revision,
- assignmentsTimestamp
- )
- );
- }
- }
- }
- });
+ private CompletableFuture<?> createCleanupRestartFuture(
+ DisasterRecoveryManager disasterRecoveryManager,
+ ReplicationGroupId replicationGroupId,
+ long revision
+ ) {
+ if (replicationGroupId instanceof TablePartitionId) {
+ return
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+ (TablePartitionId) replicationGroupId,
+ revision,
+ assignmentsTimestamp
+ );
+ } else if (replicationGroupId instanceof ZonePartitionId) {
+ return
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartitionWithCleanUp(
+ (ZonePartitionId) replicationGroupId,
+ revision,
+ assignmentsTimestamp
+ );
+ }
+ throw new IllegalStateException("Unexpected replication group id: " +
replicationGroupId);
}
- private void restartPartitionWithCleanup(
+ private CompletableFuture<?> createRestartWithCleanupFuture(
DisasterRecoveryManager disasterRecoveryManager,
+ ReplicationGroupId replicationGroupId,
long revision,
- HybridTimestamp timestamp,
- ArrayList<CompletableFuture<?>> restartFutures
+ CatalogZoneDescriptor zoneDescriptor,
+ Catalog catalog
) {
- disasterRecoveryManager.raftManager.forEach((raftNodeId,
raftGroupService) -> {
- ReplicationGroupId replicationGroupId = raftNodeId.groupId();
-
- CatalogManager catalogManager =
disasterRecoveryManager.catalogManager;
-
- Catalog catalog =
catalogManager.activeCatalog(timestamp.longValue());
-
- CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
-
- if (replicationGroupId instanceof TablePartitionId) {
- TablePartitionId groupId = (TablePartitionId)
replicationGroupId;
-
- if (groupId.tableId() == tableId &&
partitionIds.contains(groupId.partitionId())) {
- if (zoneDescriptor.consistencyMode() ==
ConsistencyMode.HIGH_AVAILABILITY) {
- if (zoneDescriptor.replicas() >= 2) {
-
restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
- groupId,
- revision,
- assignmentsTimestamp
- ));
- } else {
- restartFutures.add(CompletableFuture.failedFuture(
- new
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
- + "to perform reset with clean
up.")
- ));
- }
- } else {
- restartFutures.add(
- enoughAliveNodesToRestartWithCleanUp(
- disasterRecoveryManager,
- revision,
- replicationGroupId,
- zoneDescriptor,
- catalog
- ).thenCompose(
- enoughNodes -> {
- if (enoughNodes) {
- return
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
- groupId,
- revision,
- assignmentsTimestamp
- );
- } else {
- throw new
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
- + "to perform reset with
clean up.");
- }
- }
- )
- );
- }
- }
+ if (zoneDescriptor.consistencyMode() ==
ConsistencyMode.HIGH_AVAILABILITY) {
+ if (zoneDescriptor.replicas() >= 2) {
+ return createCleanupRestartFuture(disasterRecoveryManager,
replicationGroupId, revision);
} else {
- if (replicationGroupId instanceof ZonePartitionId) { // NOPMD
- // todo support zone partitions
https://issues.apache.org/jira/browse/IGNITE-25979
- }
+ return notEnoughAliveNodes();
}
- });
+ } else {
+ if (zoneDescriptor.replicas() <= 2) {
+ return notEnoughAliveNodes();
+ }
+
+ return enoughAliveNodesToRestartWithCleanUp(
+ disasterRecoveryManager,
+ revision,
+ replicationGroupId,
+ zoneDescriptor,
+ catalog
+ ).thenCompose(enoughNodes -> {
+ if (enoughNodes) {
+ return createCleanupRestartFuture(disasterRecoveryManager,
replicationGroupId, revision);
+ } else {
+ return notEnoughAliveNodes();
+ }
+ });
+ }
+ }
+
+ private static <U> CompletableFuture<U> notEnoughAliveNodes() {
+ return CompletableFuture.failedFuture(
+ new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not
enough alive nodes "
+ + "to perform reset with clean up.")
+ );
}
- private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp(
+ private static CompletableFuture<Boolean>
enoughAliveNodesToRestartWithCleanUp(
DisasterRecoveryManager disasterRecoveryManager,
long msRevision,
ReplicationGroupId replicationGroupId,
CatalogZoneDescriptor zoneDescriptor,
Catalog catalog
) {
- if (zoneDescriptor.replicas() <= 2) {
- return falseCompletedFuture();
+ if (replicationGroupId instanceof TablePartitionId) {
+ TablePartitionId tablePartitionId = (TablePartitionId)
replicationGroupId;
+
+ return checkPartitionAliveNodes(
+ disasterRecoveryManager,
+ tablePartitionId,
+ zoneDescriptor,
+ catalog,
+ msRevision,
+ tableState(),
+ tableStableAssignments(
+ disasterRecoveryManager.metaStorageManager,
+ tablePartitionId.tableId(),
+ new int[]{tablePartitionId.partitionId()}
+ )
+ );
+ } else if (replicationGroupId instanceof ZonePartitionId) {
+ ZonePartitionId zonePartitionId = (ZonePartitionId)
replicationGroupId;
+
+ return checkPartitionAliveNodes(
+ disasterRecoveryManager,
+ zonePartitionId,
+ zoneDescriptor,
+ catalog,
+ msRevision,
+ zoneState(),
+ zoneStableAssignments(
+ disasterRecoveryManager.metaStorageManager,
+ zonePartitionId.zoneId(),
+ new int[]{zonePartitionId.partitionId()}
+ )
+ );
+ } else {
+ throw new IllegalArgumentException("Unsupported replication group
type: " + replicationGroupId.getClass());
}
+ }
- // TODO: https://issues.apache.org/jira/browse/IGNITE-25979 do proper
casting for ZonePartitionId
- TablePartitionId tablePartitionId = (TablePartitionId)
replicationGroupId;
-
- MetaStorageManager metaStorageManager =
disasterRecoveryManager.metaStorageManager;
-
+ private static <T extends PartitionGroupId> CompletableFuture<Boolean>
checkPartitionAliveNodes(
+ DisasterRecoveryManager disasterRecoveryManager,
+ T partitionGroupId,
+ CatalogZoneDescriptor zoneDescriptor,
+ Catalog catalog,
+ long msRevision,
+ Function<LocalPartitionStateMessage, T> keyExtractor,
+ CompletableFuture<Map<Integer, Assignments>> stableAssignments
+ ) {
Set<String> aliveNodesConsistentIds =
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
.stream()
.map(NodeWithAttributes::nodeName)
- .collect(toSet());
+ .collect(Collectors.toSet());
- CompletableFuture<Map<TablePartitionId,
LocalPartitionStateMessageByNode>> localStatesFuture =
+ CompletableFuture<Map<T, LocalPartitionStateMessageByNode>>
localStatesFuture =
disasterRecoveryManager.localPartitionStatesInternal(
Set.of(zoneDescriptor.name()),
emptySet(),
- Set.of(tablePartitionId.partitionId()),
+ Set.of(partitionGroupId.partitionId()),
catalog,
- tableState()
+ keyExtractor
);
- CompletableFuture<Map<Integer, Assignments>> stableAssignments =
- tableStableAssignments(metaStorageManager, tableId, new
int[]{tablePartitionId.partitionId()});
-
return localStatesFuture.thenCombine(stableAssignments,
(localPartitionStatesMap, currentAssignments) -> {
- LocalPartitionStateMessageByNode localPartitionStateMessageByNode
= localPartitionStatesMap.get(tablePartitionId);
+ LocalPartitionStateMessageByNode localPartitionStateMessageByNode
= localPartitionStatesMap.get(partitionGroupId);
Set<Assignment> partAssignments =
getAliveNodesWithData(aliveNodesConsistentIds,
localPartitionStateMessageByNode);
-
- Set<Assignment> currentStableAssignments =
currentAssignments.get(tablePartitionId.partitionId()).nodes();
+ Set<Assignment> currentStableAssignments =
currentAssignments.get(partitionGroupId.partitionId()).nodes();
Set<Assignment> aliveStableNodes =
CollectionUtils.intersect(currentStableAssignments, partAssignments);
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index ac8ae4bdb57..18baa912533 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -263,8 +263,9 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
String tableName = "TABLE_NAME";
node.sql().executeScript(String.format(
- "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE
TEST_ZONE",
- tableName
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+ tableName,
+ testZone
));
insert(0, 0, tableName);
@@ -318,8 +319,9 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
String tableName = "TABLE_NAME";
node.sql().executeScript(String.format(
- "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE
TEST_ZONE",
- tableName
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+ tableName,
+ testZone
));
insert(0, 0, tableName);
@@ -342,7 +344,7 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
Transaction tx = nodeToCleanup.transactions().begin();
- nodeToCleanup.sql().execute(tx, "INSERT INTO TABLE_NAME VALUES (2,
2)");
+ nodeToCleanup.sql().execute(tx, String.format("INSERT INTO %s VALUES
(2, 2)", tableName));
CompletableFuture<Void> restartPartitionsWithCleanupFuture =
nodeToCleanup.disasterRecoveryManager().restartTablePartitionsWithCleanup(
@@ -385,8 +387,9 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
String tableName = "TABLE_NAME";
node.sql().executeScript(String.format(
- "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE
TEST_ZONE",
- tableName
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+ tableName,
+ testZone
));
insert(0, 0, tableName);
@@ -471,7 +474,7 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
return unwrapIgniteImpl(nodeToCleanup);
}
- private String findPrimaryNodeName(IgniteImpl ignite, TablePartitionId
replicationGroupId) {
+ private static String findPrimaryNodeName(IgniteImpl ignite,
ReplicationGroupId replicationGroupId) {
assertThat(awaitPrimaryReplicaForNow(ignite, replicationGroupId),
willCompleteSuccessfully());
CompletableFuture<ReplicaMeta> primary =
ignite.placementDriver().getPrimaryReplica(replicationGroupId,
ignite.clock().now());
@@ -481,7 +484,7 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
return primary.join().getLeaseholder();
}
- private Ignite findPrimaryIgniteNode(IgniteImpl ignite, TablePartitionId
replicationGroupId) {
+ private Ignite findPrimaryIgniteNode(IgniteImpl ignite, ReplicationGroupId
replicationGroupId) {
return cluster.runningNodes()
.filter(node -> node.name().equals(findPrimaryNodeName(ignite,
replicationGroupId)))
.findFirst()
@@ -769,10 +772,39 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
return ((Wrapper)
node.tables().table(tableName)).unwrap(TableImpl.class).tableId();
}
+ private static int zoneId(CatalogManager catalogManager, String zoneName) {
+ return
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
+ }
+
private static int zoneId(IgniteImpl node) {
return
node.catalogManager().catalog(node.catalogManager().latestCatalogVersion()).zone(ZONE_NAME).id();
}
+ private IgniteImpl findZoneNodeConformingOptions(String testZone, boolean
primaryReplica, boolean raftLeader)
+ throws InterruptedException {
+ Ignite nodeToCleanup;
+ IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
+ ZonePartitionId replicationGroupId = new
ZonePartitionId(zoneId(ignite.catalogManager(), testZone), 0);
+ String primaryNodeName = findPrimaryNodeName(ignite,
replicationGroupId);
+ String raftLeaderNodeName =
cluster.leaderServiceFor(replicationGroupId).getServerId().getConsistentId();
+
+ if (primaryReplica) {
+ nodeToCleanup = findPrimaryIgniteNode(ignite, replicationGroupId);
+ } else if (raftLeader) {
+ nodeToCleanup = cluster.runningNodes()
+ .filter(node -> node.name().equals(raftLeaderNodeName))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No node
found that is a raft leader for the specified options."));
+ } else {
+ nodeToCleanup = cluster.runningNodes()
+ .filter(node -> !node.name().equals(raftLeaderNodeName) &&
!node.name().equals(primaryNodeName))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No node
found that is not a primary replica and not a raft leader."));
+ }
+
+ return unwrapIgniteImpl(nodeToCleanup);
+ }
+
private static CompletableFuture<ReplicaMeta>
awaitPrimaryReplicaForNow(IgniteImpl node, ReplicationGroupId
replicationGroupId) {
return node.placementDriver().awaitPrimaryReplica(replicationGroupId,
node.clock().now(), 60, SECONDS);
}
@@ -797,4 +829,300 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
private static void assertValueOnSpecificNodes(String tableName,
Set<IgniteImpl> nodes, int id, int val) throws Exception {
DisasterRecoveryTestUtil.assertValueOnSpecificNodes(tableName, nodes,
id, val, SCHEMA);
}
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+ @Test
+ void testRestartPartitionsWithCleanUpFails() throws Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ insert(0, 0);
+ insert(1, 1);
+
+ Set<IgniteImpl> runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ assertValueOnSpecificNodes(TABLE_NAME, runningNodes, 0, 0);
+ assertValueOnSpecificNodes(TABLE_NAME, runningNodes, 1, 1);
+
+ int partitionId = 0;
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+ Set.of(node.name()),
+ ZONE_NAME,
+ Set.of(partitionId)
+ );
+
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> restartPartitionsWithCleanupFuture.get(10_000,
MILLISECONDS)
+ );
+
+ assertInstanceOf(DisasterRecoveryException.class,
exception.getCause());
+
+ assertThat(exception.getCause().getMessage(), is("Not enough alive
nodes to perform reset with clean up."));
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+ @Test
+ void testRestartHaPartitionsWithCleanUpFails() {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ String testZone = "TEST_ZONE";
+
+ createZone(node.catalogManager(), testZone, 1, 1, null, null,
ConsistencyMode.HIGH_AVAILABILITY);
+
+ String tableName = "TABLE_NAME";
+
+ node.sql().executeScript(String.format(
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+ tableName,
+ testZone
+ ));
+
+ int partitionId = 0;
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+ Set.of(node.name()),
+ testZone,
+ Set.of(partitionId)
+ );
+
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> restartPartitionsWithCleanupFuture.get(10_000,
MILLISECONDS)
+ );
+
+ assertInstanceOf(DisasterRecoveryException.class,
exception.getCause());
+
+ assertThat(exception.getCause().getMessage(), is("Not enough alive
nodes to perform reset with clean up."));
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+ @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1},
raftLeader={2}")
+ @CsvSource({
+ "STRONG_CONSISTENCY, true, false",
+ "STRONG_CONSISTENCY, false, true",
+ "STRONG_CONSISTENCY, false, false",
+ "HIGH_AVAILABILITY, true, false",
+ "HIGH_AVAILABILITY, false, true",
+ "HIGH_AVAILABILITY, false, false"
+ })
+ void testRestartPartitionsWithCleanUp(
+ ConsistencyMode consistencyMode,
+ boolean primaryReplica,
+ boolean raftLeader
+ ) throws Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+ cluster.startNode(1);
+
+ String testZone = "TEST_ZONE";
+
+ if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+ createZone(node.catalogManager(), testZone, 1, 2, null, null,
ConsistencyMode.HIGH_AVAILABILITY);
+ } else {
+ cluster.startNode(2);
+
+ createZone(node.catalogManager(), testZone, 1, 3);
+ }
+
+ Set<IgniteImpl> runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ String tableName = "TABLE_NAME";
+
+ node.sql().executeScript(String.format(
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+ tableName,
+ testZone
+ ));
+
+ insert(0, 0, tableName);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ IgniteImpl nodeToCleanup = findZoneNodeConformingOptions(testZone,
primaryReplica, raftLeader);
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+ Set.of(nodeToCleanup.name()),
+ testZone,
+ Set.of(0)
+ );
+
+ assertThat(restartPartitionsWithCleanupFuture,
willCompleteSuccessfully());
+
+ insert(1, 1, tableName);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 1, 1);
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+ @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1}")
+ @CsvSource({
+ "STRONG_CONSISTENCY, true",
+ "STRONG_CONSISTENCY, false",
+ "HIGH_AVAILABILITY, true",
+ "HIGH_AVAILABILITY, false",
+ })
+ void testRestartPartitionsWithCleanUpTxRollback(ConsistencyMode
consistencyMode, boolean primaryReplica) throws Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ cluster.startNode(1);
+
+ String testZone = "TEST_ZONE";
+
+ if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+ createZone(node.catalogManager(), testZone, 1, 2, null, null,
ConsistencyMode.HIGH_AVAILABILITY);
+ } else {
+ cluster.startNode(2);
+
+ createZone(node.catalogManager(), testZone, 1, 3);
+ }
+
+ Set<IgniteImpl> runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ String tableName = "TABLE_NAME";
+
+ node.sql().executeScript(String.format(
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+ tableName,
+ testZone
+ ));
+
+ insert(0, 0, tableName);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ IgniteImpl primaryNode =
+ unwrapIgniteImpl(findPrimaryIgniteNode(node, new
ZonePartitionId(zoneId(node.catalogManager(), testZone), 0)));
+
+ IgniteImpl nodeToCleanup;
+
+ if (primaryReplica) {
+ nodeToCleanup = primaryNode;
+ } else {
+ nodeToCleanup = cluster.runningNodes()
+ .filter(n -> !n.name().equals(primaryNode.name()))
+ .map(TestWrappers::unwrapIgniteImpl)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No node
found that is not a primary replica."));
+ }
+
+ Transaction tx = nodeToCleanup.transactions().begin();
+
+ nodeToCleanup.sql().execute(tx, String.format("INSERT INTO %s VALUES
(2, 2)", tableName));
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
+
nodeToCleanup.disasterRecoveryManager().restartPartitionsWithCleanup(
+ Set.of(nodeToCleanup.name()),
+ testZone,
+ Set.of(0)
+ );
+
+ assertThat(restartPartitionsWithCleanupFuture,
willCompleteSuccessfully());
+
+ if (primaryReplica) {
+ // We expect here that tx will be rolled back because we have
restarted primary replica. This is ensured by the fact that we
+ // use ReplicaManager.weakStopReplica(RESTART) in
restartPartitionsWithCleanup, and this mechanism
+ // waits for replica expiration and stops lease prolongation. As a
result, the transaction will not be able to commit
+ // because the primary replica has expired.
+ assertThrows(TransactionException.class, tx::commit, "Primary
replica has expired, transaction will be rolled back");
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ for (IgniteImpl igniteImpl : runningNodes) {
+ assertEquals(1L, igniteImpl.sql().execute(null, "SELECT
count(*) as cnt FROM TABLE_NAME").next().longValue("cnt"));
+ }
+ } else {
+ tx.commit();
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+ assertValueOnSpecificNodes(tableName, runningNodes, 2, 2);
+
+ for (IgniteImpl igniteImpl : runningNodes) {
+ assertEquals(2L, igniteImpl.sql().execute(null, "SELECT
count(*) as cnt FROM TABLE_NAME").next().longValue("cnt"));
+ }
+ }
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26271")
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
+ @Test
+ void testRestartPartitionsWithCleanUpConcurrentRebalance() throws
Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ unwrapIgniteImpl(cluster.startNode(1));
+
+ String testZone = "TEST_ZONE";
+
+ createZone(node.catalogManager(), testZone, 1, 2);
+
+ Set<IgniteImpl> runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ String tableName = "TABLE_NAME";
+
+ node.sql().executeScript(String.format(
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+ tableName,
+ testZone
+ ));
+
+ insert(0, 0, tableName);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
+
+ int catalogVersion = node.catalogManager().latestCatalogVersion();
+
+ long timestamp = node.catalogManager().catalog(catalogVersion).time();
+
+ Assignments assignmentPending = Assignments.of(timestamp,
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name())
+ );
+
+ ZonePartitionId replicationGroupId = new
ZonePartitionId(zoneId(node.catalogManager(), testZone), 0);
+
+ AtomicBoolean blocked = new AtomicBoolean(true);
+
+ AtomicBoolean reached = new AtomicBoolean(false);
+
+ blockMessage(cluster, (nodeName, msg) -> {
+ reached.set(true);
+ return blocked.get() && stableKeySwitchMessage(msg,
replicationGroupId, assignmentPending);
+ });
+
+ alterZone(node.catalogManager(), testZone, 3);
+
+ waitForCondition(reached::get, 10_000L);
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+ Set.of(node2.name()),
+ testZone,
+ Set.of(0)
+ );
+
+ assertThat(restartPartitionsWithCleanupFuture,
willCompleteSuccessfully());
+
+ blocked.set(false);
+
+ waitForCondition(() -> {
+ Set<IgniteImpl> newRunningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ try {
+ assertValueOnSpecificNodes(tableName, newRunningNodes, 0, 0);
+
+ return true;
+ } catch (AssertionError | Exception e) {
+ return false;
+ }
+ }, 10_000L);
+
+ insert(1, 1, tableName);
+
+ Set<IgniteImpl> finalNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ assertValueOnSpecificNodes(tableName, finalNodes, 1, 1);
+ }
}