This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fcd8d562e3 IGNITE-25979 Support 
DisasterRecoveryManager#restartPartitionWithCleanup in Colocation track (#6458)
8fcd8d562e3 is described below

commit 8fcd8d562e3cad946439f8f256e082478179a783
Author: Cyrill <[email protected]>
AuthorDate: Tue Aug 26 14:37:06 2025 +0300

    IGNITE-25979 Support DisasterRecoveryManager#restartPartitionWithCleanup in 
Colocation track (#6458)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 .../PartitionReplicaLifecycleManager.java          |  54 ++++
 .../disaster/DisasterRecoveryManager.java          |  40 +++
 .../disaster/ManualGroupRestartRequest.java        | 267 +++++++++-------
 .../disaster/ItDisasterRecoveryManagerTest.java    | 346 ++++++++++++++++++++-
 4 files changed, 586 insertions(+), 121 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 4e96023c632..02b6e0217a0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -1211,6 +1211,22 @@ public class PartitionReplicaLifecycleManager extends
         );
     }
 
+    /**
+     * Stops all resources associated with a given partition and destroys its 
storage, like replicas and partition trackers.
+     * Calls {@link ReplicaManager#weakStopReplica} in order to change the 
replica state.
+     *
+     * @param zonePartitionId Partition ID.
+     * @param revision Revision.
+     * @return Future that will be completed after all resources have been 
closed and destroyed.
+     */
+    private CompletableFuture<Void> 
stopPartitionAndDestroyForRestart(ZonePartitionId zonePartitionId, long 
revision) {
+        return replicaMgr.weakStopReplica(
+                zonePartitionId,
+                WeakReplicaStopReason.RESTART,
+                () -> stopAndDestroyPartition(zonePartitionId, revision)
+        );
+    }
+
     private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry 
pendingAssignmentsEntry, long revision, boolean isRecovery) {
         if (pendingAssignmentsEntry.value() == null || 
pendingAssignmentsEntry.empty()) {
             return nullCompletedFuture();
@@ -1720,6 +1736,44 @@ public class PartitionReplicaLifecycleManager extends
         return zoneResourcesManager.removeTableResources(zonePartitionId, 
tableId);
     }
 
+    /**
+     * Restarts the zone's partition including the replica and raft node with 
storage cleanup.
+     *
+     * @param zonePartitionId Zone's partition that needs to be restarted.
+     * @param revision Metastore revision.
+     * @param assignmentsTimestamp Timestamp of the assignments.
+     * @return Operation future.
+     */
+    public CompletableFuture<?> restartPartitionWithCleanUp(ZonePartitionId 
zonePartitionId, long revision, long assignmentsTimestamp) {
+        return inBusyLockAsync(busyLock, () -> 
stopPartitionAndDestroyForRestart(zonePartitionId, 
revision).thenComposeAsync(unused -> {
+            Assignments stableAssignments = 
zoneStableAssignmentsGetLocally(metaStorageMgr, zonePartitionId, revision);
+
+            assert stableAssignments != null : "zonePartitionId=" + 
zonePartitionId + ", revision=" + revision;
+
+            return 
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused2 -> 
inBusyLockAsync(busyLock, () -> {
+                Assignment localAssignment = 
localAssignment(stableAssignments);
+
+                if (localAssignment == null) {
+                    // (0) in case if node not in the assignments
+                    return nullCompletedFuture();
+                }
+
+                CatalogZoneDescriptor zoneDescriptor = 
zoneDescriptorAt(zonePartitionId.zoneId(), assignmentsTimestamp);
+
+                return createZonePartitionReplicationNode(
+                        zonePartitionId,
+                        localAssignment,
+                        stableAssignments,
+                        revision,
+                        zoneDescriptor.partitions(),
+                        isVolatileZone(zoneDescriptor),
+                        false,
+                        false
+                );
+            }));
+        }, ioExecutor));
+    }
+
     /**
      * Restarts the zone's partition including the replica and raft node.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index decbfa23a84..81e144c4d42 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -644,6 +644,46 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         }
     }
 
+    /**
+     * Restart partitions of a zone with cleanup. This method destroys 
partition storage during restart.
+     *
+     * @param nodeNames Names of nodes to restart partitions on. If empty, 
restart on all nodes.
+     * @param zoneName Zone name. Case-sensitive, without quotes.
+     * @param partitionIds IDs of partitions to restart. If empty, restart all 
zone's partitions.
+     * @return Future that completes when partitions are restarted.
+     */
+    public CompletableFuture<Void> restartPartitionsWithCleanup(
+            Set<String> nodeNames,
+            String zoneName,
+            Set<Integer> partitionIds
+    ) {
+        try {
+            // Validates passed node names.
+            getNodes(nodeNames);
+
+            Catalog catalog = catalogLatestVersion();
+
+            CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+            checkPartitionsRange(partitionIds, Set.of(zone));
+
+            return processNewRequest(new ManualGroupRestartRequest(
+                    UUID.randomUUID(),
+                    zone.id(),
+                    // We pass here -1 as table id because it is not used for 
zone-based partitions.
+                    // We expect that the field will be removed once 
colocation track is finished.
+                    // TODO: https://issues.apache.org/jira/browse/IGNITE-22522
+                    -1,
+                    partitionIds,
+                    nodeNames,
+                    catalog.time(),
+                    true
+            ));
+        } catch (Throwable t) {
+            return failedFuture(t);
+        }
+    }
+
     /**
      * Returns states of partitions in the cluster. Result is a mapping of 
{@link ZonePartitionId} to the mapping between a node name and a
      * partition state.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 92c0447fde4..7356d30733c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -19,12 +19,12 @@ package 
org.apache.ignite.internal.table.distributed.disaster;
 
 import static java.util.Collections.emptySet;
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableStableAssignments;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneStableAssignments;
 import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.tableState;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneState;
 import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestType.MULTI_NODE;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GroupUpdateRequestHandler.getAliveNodesWithData;
-import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.RESTART_WITH_CLEAN_UP_ERR;
 
@@ -33,15 +33,17 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.Catalog;
-import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
 import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -123,153 +125,194 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
             return nullCompletedFuture();
         }
 
+        Catalog catalog = 
disasterRecoveryManager.catalogManager.activeCatalog(timestamp.longValue());
+        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
         var restartFutures = new ArrayList<CompletableFuture<?>>();
 
-        if (cleanUp) {
-            restartPartitionWithCleanup(disasterRecoveryManager, revision, 
timestamp, restartFutures);
-        } else {
-            restartPartition(disasterRecoveryManager, revision, 
restartFutures);
-        }
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            if (shouldProcessPartition(replicationGroupId)) {
+                if (cleanUp) {
+                    restartFutures.add(
+                            
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId, 
revision, zoneDescriptor, catalog)
+                    );
+                } else {
+                    
restartFutures.add(createRestartFuture(disasterRecoveryManager, 
replicationGroupId, revision));
+                }
+            }
+        });
 
         return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
     }
 
-    private void restartPartition(
+    private boolean shouldProcessPartition(ReplicationGroupId 
replicationGroupId) {
+        if (replicationGroupId instanceof TablePartitionId) {
+            TablePartitionId groupId = (TablePartitionId) replicationGroupId;
+            return groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId());
+        } else if (replicationGroupId instanceof ZonePartitionId) {
+            ZonePartitionId groupId = (ZonePartitionId) replicationGroupId;
+            return groupId.zoneId() == zoneId && 
partitionIds.contains(groupId.partitionId());
+        }
+        return false;
+    }
+
+    private CompletableFuture<?> createRestartFuture(
             DisasterRecoveryManager disasterRecoveryManager,
-            long revision,
-            ArrayList<CompletableFuture<?>> restartFutures
+            ReplicationGroupId replicationGroupId,
+            long revision
     ) {
-        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
-            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
-
-            if (replicationGroupId instanceof TablePartitionId) {
-                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+        if (replicationGroupId instanceof TablePartitionId) {
+            return disasterRecoveryManager.tableManager.restartPartition(
+                    (TablePartitionId) replicationGroupId,
+                    revision,
+                    assignmentsTimestamp
+            );
+        } else if (replicationGroupId instanceof ZonePartitionId) {
+            return 
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartition(
+                    (ZonePartitionId) replicationGroupId,
+                    revision,
+                    assignmentsTimestamp
+            );
+        }
+        throw new IllegalStateException("Unexpected replication group id: " + 
replicationGroupId);
+    }
 
-                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
-                    
restartFutures.add(disasterRecoveryManager.tableManager.restartPartition(groupId,
 revision, assignmentsTimestamp));
-                }
-            } else {
-                if (replicationGroupId instanceof ZonePartitionId) {
-                    ZonePartitionId groupId = (ZonePartitionId) 
replicationGroupId;
-
-                    if (groupId.zoneId() == zoneId && 
partitionIds.contains(groupId.partitionId())) {
-                        restartFutures.add(
-                                
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartition(
-                                        groupId,
-                                        revision,
-                                        assignmentsTimestamp
-                                )
-                        );
-                    }
-                }
-            }
-        });
+    private CompletableFuture<?> createCleanupRestartFuture(
+            DisasterRecoveryManager disasterRecoveryManager,
+            ReplicationGroupId replicationGroupId,
+            long revision
+    ) {
+        if (replicationGroupId instanceof TablePartitionId) {
+            return 
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                    (TablePartitionId) replicationGroupId,
+                    revision,
+                    assignmentsTimestamp
+            );
+        } else if (replicationGroupId instanceof ZonePartitionId) {
+            return 
disasterRecoveryManager.partitionReplicaLifecycleManager.restartPartitionWithCleanUp(
+                    (ZonePartitionId) replicationGroupId,
+                    revision,
+                    assignmentsTimestamp
+            );
+        }
+        throw new IllegalStateException("Unexpected replication group id: " + 
replicationGroupId);
     }
 
-    private void restartPartitionWithCleanup(
+    private CompletableFuture<?> createRestartWithCleanupFuture(
             DisasterRecoveryManager disasterRecoveryManager,
+            ReplicationGroupId replicationGroupId,
             long revision,
-            HybridTimestamp timestamp,
-            ArrayList<CompletableFuture<?>> restartFutures
+            CatalogZoneDescriptor zoneDescriptor,
+            Catalog catalog
     ) {
-        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
-            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
-
-            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
-
-            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
-
-            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
-
-            if (replicationGroupId instanceof TablePartitionId) {
-                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
-
-                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
-                    if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
-                        if (zoneDescriptor.replicas() >= 2) {
-                            
restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
-                                    groupId,
-                                    revision,
-                                    assignmentsTimestamp
-                            ));
-                        } else {
-                            restartFutures.add(CompletableFuture.failedFuture(
-                                    new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
-                                            + "to perform reset with clean 
up.")
-                            ));
-                        }
-                    } else {
-                        restartFutures.add(
-                                enoughAliveNodesToRestartWithCleanUp(
-                                    disasterRecoveryManager,
-                                    revision,
-                                    replicationGroupId,
-                                    zoneDescriptor,
-                                    catalog
-                                ).thenCompose(
-                                    enoughNodes -> {
-                                        if (enoughNodes) {
-                                            return 
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
-                                                    groupId,
-                                                    revision,
-                                                    assignmentsTimestamp
-                                            );
-                                        } else {
-                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
-                                                    + "to perform reset with 
clean up.");
-                                        }
-                                    }
-                                )
-                        );
-                    }
-                }
+        if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
+            if (zoneDescriptor.replicas() >= 2) {
+                return createCleanupRestartFuture(disasterRecoveryManager, 
replicationGroupId, revision);
             } else {
-                if (replicationGroupId instanceof ZonePartitionId) { // NOPMD
-                    // todo support zone partitions 
https://issues.apache.org/jira/browse/IGNITE-25979
-                }
+                return notEnoughAliveNodes();
             }
-        });
+        } else {
+            if (zoneDescriptor.replicas() <= 2) {
+                return notEnoughAliveNodes();
+            }
+
+            return enoughAliveNodesToRestartWithCleanUp(
+                    disasterRecoveryManager,
+                    revision,
+                    replicationGroupId,
+                    zoneDescriptor,
+                    catalog
+            ).thenCompose(enoughNodes -> {
+                if (enoughNodes) {
+                    return createCleanupRestartFuture(disasterRecoveryManager, 
replicationGroupId, revision);
+                } else {
+                    return notEnoughAliveNodes();
+                }
+            });
+        }
+    }
+
+    private static <U> CompletableFuture<U> notEnoughAliveNodes() {
+        return CompletableFuture.failedFuture(
+                new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not 
enough alive nodes "
+                        + "to perform reset with clean up.")
+        );
     }
 
-    private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp(
+    private static CompletableFuture<Boolean> 
enoughAliveNodesToRestartWithCleanUp(
             DisasterRecoveryManager disasterRecoveryManager,
             long msRevision,
             ReplicationGroupId replicationGroupId,
             CatalogZoneDescriptor zoneDescriptor,
             Catalog catalog
     ) {
-        if (zoneDescriptor.replicas() <= 2) {
-            return falseCompletedFuture();
+        if (replicationGroupId instanceof TablePartitionId) {
+            TablePartitionId tablePartitionId = (TablePartitionId) 
replicationGroupId;
+
+            return checkPartitionAliveNodes(
+                    disasterRecoveryManager,
+                    tablePartitionId,
+                    zoneDescriptor,
+                    catalog,
+                    msRevision,
+                    tableState(),
+                    tableStableAssignments(
+                            disasterRecoveryManager.metaStorageManager,
+                            tablePartitionId.tableId(),
+                            new int[]{tablePartitionId.partitionId()}
+                    )
+            );
+        } else if (replicationGroupId instanceof ZonePartitionId) {
+            ZonePartitionId zonePartitionId = (ZonePartitionId) 
replicationGroupId;
+
+            return checkPartitionAliveNodes(
+                    disasterRecoveryManager,
+                    zonePartitionId,
+                    zoneDescriptor,
+                    catalog,
+                    msRevision,
+                    zoneState(),
+                    zoneStableAssignments(
+                            disasterRecoveryManager.metaStorageManager,
+                            zonePartitionId.zoneId(),
+                            new int[]{zonePartitionId.partitionId()}
+                    )
+            );
+        } else {
+            throw new IllegalArgumentException("Unsupported replication group 
type: " + replicationGroupId.getClass());
         }
+    }
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-25979 do proper 
casting for ZonePartitionId
-        TablePartitionId tablePartitionId = (TablePartitionId) 
replicationGroupId;
-
-        MetaStorageManager metaStorageManager = 
disasterRecoveryManager.metaStorageManager;
-
+    private static <T extends PartitionGroupId> CompletableFuture<Boolean> 
checkPartitionAliveNodes(
+            DisasterRecoveryManager disasterRecoveryManager,
+            T partitionGroupId,
+            CatalogZoneDescriptor zoneDescriptor,
+            Catalog catalog,
+            long msRevision,
+            Function<LocalPartitionStateMessage, T> keyExtractor,
+            CompletableFuture<Map<Integer, Assignments>> stableAssignments
+    ) {
         Set<String> aliveNodesConsistentIds = 
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
                 .stream()
                 .map(NodeWithAttributes::nodeName)
-                .collect(toSet());
+                .collect(Collectors.toSet());
 
-        CompletableFuture<Map<TablePartitionId, 
LocalPartitionStateMessageByNode>> localStatesFuture =
+        CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> 
localStatesFuture =
                 disasterRecoveryManager.localPartitionStatesInternal(
                         Set.of(zoneDescriptor.name()),
                         emptySet(),
-                        Set.of(tablePartitionId.partitionId()),
+                        Set.of(partitionGroupId.partitionId()),
                         catalog,
-                        tableState()
+                        keyExtractor
                 );
 
-        CompletableFuture<Map<Integer, Assignments>> stableAssignments =
-                tableStableAssignments(metaStorageManager, tableId, new 
int[]{tablePartitionId.partitionId()});
-
         return localStatesFuture.thenCombine(stableAssignments, 
(localPartitionStatesMap, currentAssignments) -> {
-            LocalPartitionStateMessageByNode localPartitionStateMessageByNode 
= localPartitionStatesMap.get(tablePartitionId);
+            LocalPartitionStateMessageByNode localPartitionStateMessageByNode 
= localPartitionStatesMap.get(partitionGroupId);
 
             Set<Assignment> partAssignments = 
getAliveNodesWithData(aliveNodesConsistentIds, 
localPartitionStateMessageByNode);
-
-            Set<Assignment> currentStableAssignments = 
currentAssignments.get(tablePartitionId.partitionId()).nodes();
+            Set<Assignment> currentStableAssignments = 
currentAssignments.get(partitionGroupId.partitionId()).nodes();
 
             Set<Assignment> aliveStableNodes = 
CollectionUtils.intersect(currentStableAssignments, partAssignments);
 
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index ac8ae4bdb57..18baa912533 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -263,8 +263,9 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         String tableName = "TABLE_NAME";
 
         node.sql().executeScript(String.format(
-                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE 
TEST_ZONE",
-                tableName
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+                tableName,
+                testZone
         ));
 
         insert(0, 0, tableName);
@@ -318,8 +319,9 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         String tableName = "TABLE_NAME";
 
         node.sql().executeScript(String.format(
-                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE 
TEST_ZONE",
-                tableName
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+                tableName,
+                testZone
         ));
 
         insert(0, 0, tableName);
@@ -342,7 +344,7 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
 
         Transaction tx = nodeToCleanup.transactions().begin();
 
-        nodeToCleanup.sql().execute(tx, "INSERT INTO TABLE_NAME VALUES (2, 
2)");
+        nodeToCleanup.sql().execute(tx, String.format("INSERT INTO %s VALUES 
(2, 2)", tableName));
 
         CompletableFuture<Void> restartPartitionsWithCleanupFuture =
                 
nodeToCleanup.disasterRecoveryManager().restartTablePartitionsWithCleanup(
@@ -385,8 +387,9 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         String tableName = "TABLE_NAME";
 
         node.sql().executeScript(String.format(
-                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE 
TEST_ZONE",
-                tableName
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+                tableName,
+                testZone
         ));
 
         insert(0, 0, tableName);
@@ -471,7 +474,7 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         return unwrapIgniteImpl(nodeToCleanup);
     }
 
-    private String findPrimaryNodeName(IgniteImpl ignite, TablePartitionId 
replicationGroupId) {
+    private static String findPrimaryNodeName(IgniteImpl ignite, 
ReplicationGroupId replicationGroupId) {
         assertThat(awaitPrimaryReplicaForNow(ignite, replicationGroupId), 
willCompleteSuccessfully());
 
         CompletableFuture<ReplicaMeta> primary = 
ignite.placementDriver().getPrimaryReplica(replicationGroupId, 
ignite.clock().now());
@@ -481,7 +484,7 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         return primary.join().getLeaseholder();
     }
 
-    private Ignite findPrimaryIgniteNode(IgniteImpl ignite, TablePartitionId 
replicationGroupId) {
+    private Ignite findPrimaryIgniteNode(IgniteImpl ignite, ReplicationGroupId 
replicationGroupId) {
         return cluster.runningNodes()
                 .filter(node -> node.name().equals(findPrimaryNodeName(ignite, 
replicationGroupId)))
                 .findFirst()
@@ -769,10 +772,39 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         return ((Wrapper) 
node.tables().table(tableName)).unwrap(TableImpl.class).tableId();
     }
 
+    private static int zoneId(CatalogManager catalogManager, String zoneName) {
+        return 
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
+    }
+
     private static int zoneId(IgniteImpl node) {
         return 
node.catalogManager().catalog(node.catalogManager().latestCatalogVersion()).zone(ZONE_NAME).id();
     }
 
+    private IgniteImpl findZoneNodeConformingOptions(String testZone, boolean 
primaryReplica, boolean raftLeader)
+            throws InterruptedException {
+        Ignite nodeToCleanup;
+        IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
+        ZonePartitionId replicationGroupId = new 
ZonePartitionId(zoneId(ignite.catalogManager(), testZone), 0);
+        String primaryNodeName = findPrimaryNodeName(ignite, 
replicationGroupId);
+        String raftLeaderNodeName = 
cluster.leaderServiceFor(replicationGroupId).getServerId().getConsistentId();
+
+        if (primaryReplica) {
+            nodeToCleanup = findPrimaryIgniteNode(ignite, replicationGroupId);
+        } else if (raftLeader) {
+            nodeToCleanup = cluster.runningNodes()
+                    .filter(node -> node.name().equals(raftLeaderNodeName))
+                    .findFirst()
+                    .orElseThrow(() -> new IllegalStateException("No node 
found that is a raft leader for the specified options."));
+        } else {
+            nodeToCleanup = cluster.runningNodes()
+                    .filter(node -> !node.name().equals(raftLeaderNodeName) && 
!node.name().equals(primaryNodeName))
+                    .findFirst()
+                    .orElseThrow(() -> new IllegalStateException("No node 
found that is not a primary replica and not a raft leader."));
+        }
+
+        return unwrapIgniteImpl(nodeToCleanup);
+    }
+
     private static CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaForNow(IgniteImpl node, ReplicationGroupId 
replicationGroupId) {
         return node.placementDriver().awaitPrimaryReplica(replicationGroupId, 
node.clock().now(), 60, SECONDS);
     }
@@ -797,4 +829,300 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
     private static void assertValueOnSpecificNodes(String tableName, 
Set<IgniteImpl> nodes, int id, int val) throws Exception {
         DisasterRecoveryTestUtil.assertValueOnSpecificNodes(tableName, nodes, 
id, val, SCHEMA);
     }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+    @Test
+    void testRestartPartitionsWithCleanUpFails() throws Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        insert(0, 0);
+        insert(1, 1);
+
+        Set<IgniteImpl> runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        assertValueOnSpecificNodes(TABLE_NAME, runningNodes, 0, 0);
+        assertValueOnSpecificNodes(TABLE_NAME, runningNodes, 1, 1);
+
+        int partitionId = 0;
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+                Set.of(node.name()),
+                ZONE_NAME,
+                Set.of(partitionId)
+        );
+
+        ExecutionException exception = assertThrows(
+                ExecutionException.class,
+                () -> restartPartitionsWithCleanupFuture.get(10_000, 
MILLISECONDS)
+        );
+
+        assertInstanceOf(DisasterRecoveryException.class, 
exception.getCause());
+
+        assertThat(exception.getCause().getMessage(), is("Not enough alive 
nodes to perform reset with clean up."));
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+    @Test
+    void testRestartHaPartitionsWithCleanUpFails() {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        String testZone = "TEST_ZONE";
+
+        createZone(node.catalogManager(), testZone, 1, 1, null, null, 
ConsistencyMode.HIGH_AVAILABILITY);
+
+        String tableName = "TABLE_NAME";
+
+        node.sql().executeScript(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+                tableName,
+                testZone
+        ));
+
+        int partitionId = 0;
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+                Set.of(node.name()),
+                testZone,
+                Set.of(partitionId)
+        );
+
+        ExecutionException exception = assertThrows(
+                ExecutionException.class,
+                () -> restartPartitionsWithCleanupFuture.get(10_000, 
MILLISECONDS)
+        );
+
+        assertInstanceOf(DisasterRecoveryException.class, 
exception.getCause());
+
+        assertThat(exception.getCause().getMessage(), is("Not enough alive 
nodes to perform reset with clean up."));
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+    @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1}, 
raftLeader={2}")
+    @CsvSource({
+            "STRONG_CONSISTENCY, true, false",
+            "STRONG_CONSISTENCY, false, true",
+            "STRONG_CONSISTENCY, false, false",
+            "HIGH_AVAILABILITY, true, false",
+            "HIGH_AVAILABILITY, false, true",
+            "HIGH_AVAILABILITY, false, false"
+    })
+    void testRestartPartitionsWithCleanUp(
+            ConsistencyMode consistencyMode,
+            boolean primaryReplica,
+            boolean raftLeader
+    ) throws Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+        cluster.startNode(1);
+
+        String testZone = "TEST_ZONE";
+
+        if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+            createZone(node.catalogManager(), testZone, 1, 2, null, null, 
ConsistencyMode.HIGH_AVAILABILITY);
+        } else {
+            cluster.startNode(2);
+
+            createZone(node.catalogManager(), testZone, 1, 3);
+        }
+
+        Set<IgniteImpl> runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        String tableName = "TABLE_NAME";
+
+        node.sql().executeScript(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+                tableName,
+                testZone
+        ));
+
+        insert(0, 0, tableName);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        IgniteImpl nodeToCleanup = findZoneNodeConformingOptions(testZone, 
primaryReplica, raftLeader);
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+                Set.of(nodeToCleanup.name()),
+                testZone,
+                Set.of(0)
+        );
+
+        assertThat(restartPartitionsWithCleanupFuture, 
willCompleteSuccessfully());
+
+        insert(1, 1, tableName);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 1, 1);
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+    @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1}")
+    @CsvSource({
+            "STRONG_CONSISTENCY, true",
+            "STRONG_CONSISTENCY, false",
+            "HIGH_AVAILABILITY, true",
+            "HIGH_AVAILABILITY, false",
+    })
+    void testRestartPartitionsWithCleanUpTxRollback(ConsistencyMode 
consistencyMode, boolean primaryReplica) throws Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        cluster.startNode(1);
+
+        String testZone = "TEST_ZONE";
+
+        if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+            createZone(node.catalogManager(), testZone, 1, 2, null, null, 
ConsistencyMode.HIGH_AVAILABILITY);
+        } else {
+            cluster.startNode(2);
+
+            createZone(node.catalogManager(), testZone, 1, 3);
+        }
+
+        Set<IgniteImpl> runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        String tableName = "TABLE_NAME";
+
+        node.sql().executeScript(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+                tableName,
+                testZone
+        ));
+
+        insert(0, 0, tableName);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        IgniteImpl primaryNode =
+                unwrapIgniteImpl(findPrimaryIgniteNode(node, new 
ZonePartitionId(zoneId(node.catalogManager(), testZone), 0)));
+
+        IgniteImpl nodeToCleanup;
+
+        if (primaryReplica) {
+            nodeToCleanup = primaryNode;
+        } else {
+            nodeToCleanup = cluster.runningNodes()
+                    .filter(n -> !n.name().equals(primaryNode.name()))
+                    .map(TestWrappers::unwrapIgniteImpl)
+                    .findFirst()
+                    .orElseThrow(() -> new IllegalStateException("No node 
found that is not a primary replica."));
+        }
+
+        Transaction tx = nodeToCleanup.transactions().begin();
+
+        nodeToCleanup.sql().execute(tx, String.format("INSERT INTO %s VALUES 
(2, 2)", tableName));
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture =
+                
nodeToCleanup.disasterRecoveryManager().restartPartitionsWithCleanup(
+                        Set.of(nodeToCleanup.name()),
+                        testZone,
+                        Set.of(0)
+                );
+
+        assertThat(restartPartitionsWithCleanupFuture, 
willCompleteSuccessfully());
+
+        if (primaryReplica) {
+            // We expect here that tx will be rolled back because we have 
restarted primary replica. This is ensured by the fact that we
+            // use ReplicaManager.weakStopReplica(RESTART) in 
restartPartitionsWithCleanup, and this mechanism
+            // waits for replica expiration and stops lease prolongation. As a 
result, the transaction will not be able to commit
+            // because the primary replica has expired.
+            assertThrows(TransactionException.class, tx::commit, "Primary 
replica has expired, transaction will be rolled back");
+
+            assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+            for (IgniteImpl igniteImpl : runningNodes) {
+                assertEquals(1L, igniteImpl.sql().execute(null, "SELECT 
count(*) as cnt FROM TABLE_NAME").next().longValue("cnt"));
+            }
+        } else {
+            tx.commit();
+
+            assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+            assertValueOnSpecificNodes(tableName, runningNodes, 2, 2);
+
+            for (IgniteImpl igniteImpl : runningNodes) {
+                assertEquals(2L, igniteImpl.sql().execute(null, "SELECT 
count(*) as cnt FROM TABLE_NAME").next().longValue("cnt"));
+            }
+        }
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26271";)
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
+    @Test
+    void testRestartPartitionsWithCleanUpConcurrentRebalance() throws 
Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        unwrapIgniteImpl(cluster.startNode(1));
+
+        String testZone = "TEST_ZONE";
+
+        createZone(node.catalogManager(), testZone, 1, 2);
+
+        Set<IgniteImpl> runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        String tableName = "TABLE_NAME";
+
+        node.sql().executeScript(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE %s",
+                tableName,
+                testZone
+        ));
+
+        insert(0, 0, tableName);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
+
+        int catalogVersion = node.catalogManager().latestCatalogVersion();
+
+        long timestamp = node.catalogManager().catalog(catalogVersion).time();
+
+        Assignments assignmentPending = Assignments.of(timestamp,
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        );
+
+        ZonePartitionId replicationGroupId = new 
ZonePartitionId(zoneId(node.catalogManager(), testZone), 0);
+
+        AtomicBoolean blocked = new AtomicBoolean(true);
+
+        AtomicBoolean reached = new AtomicBoolean(false);
+
+        blockMessage(cluster, (nodeName, msg) -> {
+            reached.set(true);
+            return blocked.get() && stableKeySwitchMessage(msg, 
replicationGroupId, assignmentPending);
+        });
+
+        alterZone(node.catalogManager(), testZone, 3);
+
+        waitForCondition(reached::get, 10_000L);
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartPartitionsWithCleanup(
+                Set.of(node2.name()),
+                testZone,
+                Set.of(0)
+        );
+
+        assertThat(restartPartitionsWithCleanupFuture, 
willCompleteSuccessfully());
+
+        blocked.set(false);
+
+        waitForCondition(() -> {
+            Set<IgniteImpl> newRunningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+            try {
+                assertValueOnSpecificNodes(tableName, newRunningNodes, 0, 0);
+
+                return true;
+            } catch (AssertionError | Exception e) {
+                return false;
+            }
+        }, 10_000L);
+
+        insert(1, 1, tableName);
+
+        Set<IgniteImpl> finalNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        assertValueOnSpecificNodes(tableName, finalNodes, 1, 1);
+    }
 }


Reply via email to