This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 23cb3ea085e IGNITE-25823 Increase test coverage for restarting 
partitions with cleanup feature (#6414)
23cb3ea085e is described below

commit 23cb3ea085ebae05ada3128dd3a762ec20db3b0a
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Aug 19 18:14:49 2025 +0400

    IGNITE-25823 Increase test coverage for restarting partitions with cleanup 
feature (#6414)
---
 .../internal/replicator/ReplicaStateManager.java   |   1 +
 .../disaster/ManualGroupRestartRequest.java        |   7 +-
 .../disaster/DisasterRecoveryTestUtil.java         |  88 ++++++
 .../disaster/ItDisasterRecoveryManagerTest.java    | 316 +++++++++++++++++++--
 .../ItDisasterRecoveryReconfigurationTest.java     |  75 +----
 5 files changed, 405 insertions(+), 82 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
index da446ed54cb..6301912057d 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
@@ -280,6 +280,7 @@ class ReplicaStateManager {
                 if (context.reservedForPrimary) {
                     // If is primary, turning off the primary first.
                     context.replicaState = ReplicaState.RESTART_PLANNED;
+
                     return replicaManager.stopLeaseProlongation(groupId, null)
                             .thenCompose(unused -> 
planDeferredReplicaStop(groupId, context, stopOperation));
                 } else {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 11743c51146..92c0447fde4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -192,6 +192,11 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
                                     revision,
                                     assignmentsTimestamp
                             ));
+                        } else {
+                            restartFutures.add(CompletableFuture.failedFuture(
+                                    new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
+                                            + "to perform reset with clean 
up.")
+                            ));
                         }
                     } else {
                         restartFutures.add(
@@ -210,7 +215,7 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
                                                     assignmentsTimestamp
                                             );
                                         } else {
-                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node "
+                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
                                                     + "to perform reset with 
clean up.");
                                         }
                                     }
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
new file mode 100644
index 00000000000..0d78cb38abf
--- /dev/null
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster;
+
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.stablePartitionAssignmentsKey;
+import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
+
+import java.util.List;
+import java.util.function.BiPredicate;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.OperationType;
+import org.apache.ignite.internal.metastorage.dsl.Statement;
+import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+
+/**
+ * Utility class for disaster recovery tests.
+ */
+class DisasterRecoveryTestUtil {
+    static void blockMessage(Cluster cluster, BiPredicate<String, 
NetworkMessage> predicate) {
+        
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
+            BiPredicate<String, NetworkMessage> oldPredicate = 
node.dropMessagesPredicate();
+
+            if (oldPredicate == null) {
+                node.dropMessages(predicate);
+            } else {
+                node.dropMessages(oldPredicate.or(predicate));
+            }
+        });
+    }
+
+    static boolean stableKeySwitchMessage(
+            NetworkMessage msg,
+            PartitionGroupId partId,
+            Assignments blockedAssignments
+    ) {
+        if (msg instanceof WriteActionRequest) {
+            var writeActionRequest = (WriteActionRequest) msg;
+            WriteCommand command = writeActionRequest.deserializedCommand();
+
+            if (command instanceof MultiInvokeCommand) {
+                MultiInvokeCommand multiInvokeCommand = (MultiInvokeCommand) 
command;
+
+                Statement andThen = multiInvokeCommand.iif().andThen();
+
+                if (andThen instanceof UpdateStatement) {
+                    UpdateStatement updateStatement = (UpdateStatement) 
andThen;
+                    List<Operation> operations = 
updateStatement.update().operations();
+
+                    ByteArray stablePartAssignmentsKey = 
stablePartitionAssignmentsKey(partId);
+
+                    for (Operation operation : operations) {
+                        ByteArray opKey = new 
ByteArray(toByteArray(operation.key()));
+
+                        if (operation.type() == OperationType.PUT && 
opKey.equals(stablePartAssignmentsKey)) {
+                            return 
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
+                        }
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+}
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 18aef6f393a..148a0ed7c19 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -23,6 +23,9 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
+import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
@@ -32,6 +35,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -44,14 +48,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -76,9 +86,13 @@ import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.Disaster
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 /** For {@link DisasterRecoveryManager} integration testing. */
 // TODO https://issues.apache.org/jira/browse/IGNITE-22332 Add test cases.
@@ -181,48 +195,300 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
 
         assertInstanceOf(DisasterRecoveryException.class, 
exception.getCause());
 
-        assertThat(exception.getCause().getMessage(), is("Not enough alive 
node to perform reset with clean up."));
+        assertThat(exception.getCause().getMessage(), is("Not enough alive 
nodes to perform reset with clean up."));
     }
 
     @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "false")
     @Test
-    void testRestartTablePartitionsWithCleanUp() throws Exception {
+    void testRestartHaTablePartitionsWithCleanUpFails() {
         IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
-        IgniteImpl node1 = unwrapIgniteImpl(cluster.startNode(1));
-        IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
 
         String testZone = "TEST_ZONE";
 
-        createZone(node.catalogManager(), testZone, 1, 3);
+        createZone(node.catalogManager(), testZone, 1, 1, null, null, 
ConsistencyMode.HIGH_AVAILABILITY);
 
-        String tableName2 = "TABLE_NAME_2";
+        String tableName = "TABLE_NAME";
 
         node.sql().executeScript(String.format(
                 "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE 
TEST_ZONE",
-                tableName2
+                tableName
         ));
 
-        insert(0, 0, tableName2);
-
-        assertValueOnSpecificNodes(tableName2, Set.of(node, node1, node2), 0, 
0);
-
         int partitionId = 0;
 
         CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
                 Set.of(node.name()),
                 testZone,
                 SqlCommon.DEFAULT_SCHEMA_NAME,
-                tableName2,
+                tableName,
                 Set.of(partitionId)
         );
 
+        ExecutionException exception = assertThrows(
+                ExecutionException.class,
+                () -> restartPartitionsWithCleanupFuture.get(10_000, 
MILLISECONDS)
+        );
+
+        assertInstanceOf(DisasterRecoveryException.class, 
exception.getCause());
+
+        assertThat(exception.getCause().getMessage(), is("Not enough alive 
nodes to perform reset with clean up."));
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "false")
+    @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1}, 
raftLeader={2}")
+    @CsvSource({
+            "STRONG_CONSISTENCY, true, false",
+            "STRONG_CONSISTENCY, false, true",
+            "STRONG_CONSISTENCY, false, false",
+            "HIGH_AVAILABILITY, true, false",
+            "HIGH_AVAILABILITY, false, true",
+            "HIGH_AVAILABILITY, false, false"
+    })
+    void testRestartTablePartitionsWithCleanUp(
+            ConsistencyMode consistencyMode,
+            boolean primaryReplica,
+            boolean raftLeader
+    ) throws Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+        cluster.startNode(1);
+
+        String testZone = "TEST_ZONE";
+
+        if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+            createZone(node.catalogManager(), testZone, 1, 2, null, null, 
ConsistencyMode.HIGH_AVAILABILITY);
+        } else {
+            cluster.startNode(2);
+
+            createZone(node.catalogManager(), testZone, 1, 3);
+        }
+
+        Set<IgniteImpl> runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        String tableName = "TABLE_NAME";
+
+        node.sql().executeScript(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE 
TEST_ZONE",
+                tableName
+        ));
+
+        insert(0, 0, tableName);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        IgniteImpl nodeToCleanup = findNodeConformingOptions(tableName, 
primaryReplica, raftLeader);
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
+                Set.of(nodeToCleanup.name()),
+                testZone,
+                SqlCommon.DEFAULT_SCHEMA_NAME,
+                tableName,
+                Set.of(0)
+        );
+
         assertThat(restartPartitionsWithCleanupFuture, 
willCompleteSuccessfully());
-        assertThat(awaitPrimaryReplicaForNow(node, new 
TablePartitionId(tableId(node), partitionId)), willCompleteSuccessfully());
 
-        insert(1, 1, tableName2);
+        insert(1, 1, tableName);
 
-        assertValueOnSpecificNodes(tableName2, Set.of(node, node1, node2), 0, 
0);
-        assertValueOnSpecificNodes(tableName2, Set.of(node, node1, node2), 1, 
1);
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 1, 1);
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "false")
+    @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1}")
+    @CsvSource({
+            "STRONG_CONSISTENCY, true",
+            "STRONG_CONSISTENCY, false",
+            "HIGH_AVAILABILITY, true",
+            "HIGH_AVAILABILITY, false",
+    })
+    void testRestartTablePartitionsWithCleanUpTxRollback(ConsistencyMode 
consistencyMode, boolean primaryReplica) throws Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        cluster.startNode(1);
+
+        String testZone = "TEST_ZONE";
+
+        if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+            createZone(node.catalogManager(), testZone, 1, 2, null, null, 
ConsistencyMode.HIGH_AVAILABILITY);
+        } else {
+            cluster.startNode(2);
+
+            createZone(node.catalogManager(), testZone, 1, 3);
+        }
+
+        Set<IgniteImpl> runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        String tableName = "TABLE_NAME";
+
+        node.sql().executeScript(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE 
TEST_ZONE",
+                tableName
+        ));
+
+        insert(0, 0, tableName);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        IgniteImpl primaryNode = unwrapIgniteImpl(findPrimaryIgniteNode(node, 
new TablePartitionId(tableId(node, tableName), 0)));
+
+        IgniteImpl nodeToCleanup;
+
+        if (primaryReplica) {
+            nodeToCleanup = primaryNode;
+        } else {
+            nodeToCleanup = cluster.runningNodes()
+                    .filter(n -> !n.name().equals(primaryNode.name()))
+                    .map(TestWrappers::unwrapIgniteImpl)
+                    .findFirst()
+                    .orElseThrow(() -> new IllegalStateException("No node 
found that is not a primary replica."));
+        }
+
+        Transaction tx = nodeToCleanup.transactions().begin();
+
+        nodeToCleanup.sql().execute(tx, "INSERT INTO TABLE_NAME VALUES (2, 
2)");
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture =
+                
nodeToCleanup.disasterRecoveryManager().restartTablePartitionsWithCleanup(
+                        Set.of(nodeToCleanup.name()),
+                        testZone,
+                        SqlCommon.DEFAULT_SCHEMA_NAME,
+                        tableName,
+                        Set.of(0)
+                );
+
+        assertThat(restartPartitionsWithCleanupFuture, 
willCompleteSuccessfully());
+
+        if (primaryReplica) {
+            // We expect here that tx will be rolled back because we have 
restarted primary replica. This is ensured by the fact that we
+            // use ReplicaManager.weakStopReplica(RESTART) in 
restartTablePartitionsWithCleanup, and this mechanism
+            // waits for replica expiration and stops lease prolongation. As a 
result, the transaction will not be able to commit
+            // because the primary replica has expired.
+            assertThrows(TransactionException.class, tx::commit, "Primary 
replica has expired, transaction will be rolled back");
+        } else {
+            tx.commit();
+
+            assertValueOnSpecificNodes(tableName, runningNodes, 2, 2);
+        }
+    }
+
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "false")
+    @Test
+    void testRestartTablePartitionsWithCleanUpConcurrentRebalance() throws 
Exception {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        unwrapIgniteImpl(cluster.startNode(1));
+
+        String testZone = "TEST_ZONE";
+
+        createZone(node.catalogManager(), testZone, 1, 2);
+
+        Set<IgniteImpl> runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        String tableName = "TABLE_NAME";
+
+        node.sql().executeScript(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE 
TEST_ZONE",
+                tableName
+        ));
+
+        insert(0, 0, tableName);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
+
+        int catalogVersion = node.catalogManager().latestCatalogVersion();
+
+        long timestamp = node.catalogManager().catalog(catalogVersion).time();
+
+        Assignments assignmentPending = Assignments.of(timestamp,
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        );
+
+        TablePartitionId replicationGroupId = new 
TablePartitionId(tableId(node, tableName), 0);
+
+        AtomicBoolean blocked = new AtomicBoolean(true);
+
+        AtomicBoolean reached = new AtomicBoolean(false);
+
+        blockMessage(cluster, (nodeName, msg) -> {
+            reached.set(true);
+            return blocked.get() && stableKeySwitchMessage(msg, 
replicationGroupId, assignmentPending);
+        });
+
+        alterZone(node.catalogManager(), testZone, 3);
+
+        waitForCondition(reached::get, 10_000L);
+
+        CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
+                Set.of(node2.name()),
+                testZone,
+                SqlCommon.DEFAULT_SCHEMA_NAME,
+                tableName,
+                Set.of(0)
+        );
+
+        assertThat(restartPartitionsWithCleanupFuture, 
willCompleteSuccessfully());
+
+        insert(1, 1, tableName);
+
+        blocked.set(false);
+
+        runningNodes = 
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+        assertEquals(3, runningNodes.size(), "Expected 3 running nodes after 
zone alteration");
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+        assertValueOnSpecificNodes(tableName, runningNodes, 1, 1);
+    }
+
+    private IgniteImpl findNodeConformingOptions(String tableName, boolean 
primaryReplica, boolean raftLeader) throws InterruptedException {
+        Ignite nodeToCleanup;
+
+        IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
+
+        TablePartitionId replicationGroupId = new 
TablePartitionId(tableId(ignite, tableName), 0);
+
+        String primaryNodeName = findPrimaryNodeName(ignite, 
replicationGroupId);
+
+        String raftLeaderNodeName = 
cluster.leaderServiceFor(replicationGroupId).getServerId().getConsistentId();
+
+        if (primaryReplica) {
+            nodeToCleanup = findPrimaryIgniteNode(ignite, replicationGroupId);
+        } else if (raftLeader) {
+            nodeToCleanup = cluster.runningNodes()
+                    .filter(node -> node.name().equals(raftLeaderNodeName))
+                    .findFirst()
+                    .orElseThrow(() -> new IllegalStateException("No node 
found that is a raft leader for the specified options."));
+        } else {
+            nodeToCleanup = cluster.runningNodes()
+                    .filter(node -> !node.name().equals(raftLeaderNodeName) && 
!node.name().equals(primaryNodeName))
+                    .findFirst()
+                    .orElse(cluster.aliveNode());
+        }
+
+        return unwrapIgniteImpl(nodeToCleanup);
+    }
+
+    private String findPrimaryNodeName(IgniteImpl ignite, TablePartitionId 
replicationGroupId) {
+        assertThat(awaitPrimaryReplicaForNow(ignite, replicationGroupId), 
willCompleteSuccessfully());
+
+        CompletableFuture<ReplicaMeta> primary = 
ignite.placementDriver().getPrimaryReplica(replicationGroupId, 
ignite.clock().now());
+
+        assertThat(primary, willCompleteSuccessfully());
+
+        return primary.join().getLeaseholder();
+    }
+
+    private Ignite findPrimaryIgniteNode(IgniteImpl ignite, TablePartitionId 
replicationGroupId) {
+        return cluster.runningNodes()
+                .filter(node -> node.name().equals(findPrimaryNodeName(ignite, 
replicationGroupId)))
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("No node found 
that is a primary replica for the specified options."));
     }
 
     private static void assertValueOnSpecificNodes(String tableName, 
Set<IgniteImpl> nodes, int id, int val) throws Exception {
@@ -237,11 +503,15 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         Row keyValueRow0 = createKeyValueRow(id, val);
         Row keyRow0 = createKeyRow(id);
 
-        CompletableFuture<BinaryRow> getFut = internalTable.get(keyRow0, 
node.clock().now(), node.node());
-
-        assertThat(getFut, willCompleteSuccessfully());
+        assertTrue(waitForCondition(() -> {
+            try {
+                CompletableFuture<BinaryRow> getFut = 
internalTable.get(keyRow0, node.clock().now(), node.node());
 
-        assertTrue(compareRows(getFut.get(), keyValueRow0));
+                return compareRows(getFut.get(), keyValueRow0);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 10_000), "Row comparison failed within the timeout.");
     }
 
     @Test
@@ -518,7 +788,11 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
     }
 
     private static int tableId(IgniteImpl node) {
-        return ((Wrapper) 
node.tables().table(TABLE_NAME)).unwrap(TableImpl.class).tableId();
+        return tableId(node, TABLE_NAME);
+    }
+
+    private static int tableId(IgniteImpl node, String tableName) {
+        return ((Wrapper) 
node.tables().table(tableName)).unwrap(TableImpl.class).tableId();
     }
 
     private static int zoneId(IgniteImpl node) {
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index f140896f991..2e27670b739 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -27,6 +27,8 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
+import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.pendingPartitionAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.plannedPartitionAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.stablePartitionAssignmentsKey;
@@ -39,7 +41,6 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
-import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anEmptyMap;
@@ -69,13 +70,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterConfiguration.Builder;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
-import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
@@ -84,14 +83,8 @@ import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
 import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.RunnableX;
 import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
-import org.apache.ignite.internal.metastorage.dsl.OperationType;
-import org.apache.ignite.internal.metastorage.dsl.Statement;
-import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
 import org.apache.ignite.internal.network.NetworkMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
@@ -106,7 +99,6 @@ import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -133,7 +125,6 @@ import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.entity.LogId;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
-import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -455,7 +446,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         // Blocking stable switch to the first phase or reset,
         // so that we'll have force pending assignments unexecuted.
-        blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId, 
assignment0));
+        blockMessage(cluster, (nodeName, msg) -> stableKeySwitchMessage(msg, 
partitionGroupId(partId), assignment0));
 
         // Init reset:
         // pending = [0, force]
@@ -536,7 +527,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         // Blocking stable switch to the first phase or reset,
         // so that we'll have force pending assignments unexecuted.
-        blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId, 
assignmentPending));
+        blockMessage(cluster, (nodeName, msg) -> stableKeySwitchMessage(msg, 
partitionGroupId(partId), assignmentPending));
 
         // Stop 3. Nodes 0 and 1 survived.
         stopNode(3);
@@ -1044,7 +1035,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         blockedNodes.add(followerNodes.remove(node0IndexInFollowers == -1 ? 0 
: node0IndexInFollowers));
         logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
 
-        blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg, 
partitionGroupId, blockedNodes));
+        blockMessage(cluster, (nodeName, msg) -> 
dataReplicateMessage(nodeName, msg, partitionGroupId, blockedNodes));
 
         // Write data(2) to 6 nodes.
         errors = insertValues(table, partId, 10);
@@ -1191,7 +1182,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         blockedNodes.add(blockedNode);
         logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
 
-        blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg, 
partitionGroupId, blockedNodes));
+        blockMessage(cluster, (nodeName, msg) -> 
dataReplicateMessage(nodeName, msg, partitionGroupId, blockedNodes));
 
         // Write data(2) to 6 nodes.
         errors = insertValues(table, partId, 10);
@@ -1380,7 +1371,10 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         AtomicBoolean blockedLink = new AtomicBoolean(true);
 
         // Block stable switch to check that we initially add reset phase 1 
assignments to the chain.
-        blockMessage((nodeName, msg) -> blockedLink.get() && 
stableKeySwitchMessage(msg, partId, resetAssignments));
+        blockMessage(
+                cluster,
+                (nodeName, msg) -> blockedLink.get() && 
stableKeySwitchMessage(msg, partitionGroupId(partId), resetAssignments)
+        );
 
         stopNodesInParallel(1, 2);
 
@@ -1496,7 +1490,10 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         AtomicBoolean blockedLink2 = new AtomicBoolean(true);
 
         // Block stable switch to check that we initially add reset phase 1 
assignments to the chain.
-        blockMessage((nodeName, msg) -> blockedLink2.get() && 
stableKeySwitchMessage(msg, partId, link2Assignments));
+        blockMessage(
+                cluster,
+                (nodeName, msg) -> blockedLink2.get() && 
stableKeySwitchMessage(msg, partitionGroupId(partId), link2Assignments)
+        );
 
         logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3, 
4, 5, 6}));
 
@@ -1776,18 +1773,6 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         return candidates.get(0);
     }
 
-    private void blockMessage(BiPredicate<String, NetworkMessage> predicate) {
-        
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
-            BiPredicate<String, NetworkMessage> oldPredicate = 
node.dropMessagesPredicate();
-
-            if (oldPredicate == null) {
-                node.dropMessages(predicate);
-            } else {
-                node.dropMessages(oldPredicate.or(predicate));
-            }
-        });
-    }
-
     private static boolean dataReplicateMessage(
             String nodeName,
             NetworkMessage msg,
@@ -1807,37 +1792,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
      * Block rebalance, so stable won't be switched to specified pending.
      */
     private void blockRebalanceStableSwitch(int partId, Assignments 
assignment) {
-        blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId, 
assignment));
-    }
-
-    private boolean stableKeySwitchMessage(NetworkMessage msg, int partId, 
Assignments blockedAssignments) {
-        if (msg instanceof WriteActionRequest) {
-            var writeActionRequest = (WriteActionRequest) msg;
-            WriteCommand command = writeActionRequest.deserializedCommand();
-
-            if (command instanceof MultiInvokeCommand) {
-                MultiInvokeCommand multiInvokeCommand = (MultiInvokeCommand) 
command;
-
-                Statement andThen = multiInvokeCommand.iif().andThen();
-
-                if (andThen instanceof UpdateStatement) {
-                    UpdateStatement updateStatement = (UpdateStatement) 
andThen;
-                    List<Operation> operations = 
updateStatement.update().operations();
-
-                    ByteArray stablePartAssignmentsKey = 
stablePartitionAssignmentsKey(partitionGroupId(partId));
-
-                    for (Operation operation : operations) {
-                        ByteArray opKey = new 
ByteArray(toByteArray(operation.key()));
-
-                        if (operation.type() == OperationType.PUT && 
opKey.equals(stablePartAssignmentsKey)) {
-                            return 
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
-                        }
-                    }
-                }
-            }
-        }
-
-        return false;
+        blockMessage(cluster, (nodeName, msg) -> stableKeySwitchMessage(msg, 
partitionGroupId(partId), assignment));
     }
 
     private void waitForPartitionState(IgniteImpl node0, int partId, 
GlobalPartitionStateEnum expectedState) throws InterruptedException {


Reply via email to