This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 23cb3ea085e IGNITE-25823 Increase test coverage for restarting
partitions with cleanup feature (#6414)
23cb3ea085e is described below
commit 23cb3ea085ebae05ada3128dd3a762ec20db3b0a
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Aug 19 18:14:49 2025 +0400
IGNITE-25823 Increase test coverage for restarting partitions with cleanup
feature (#6414)
---
.../internal/replicator/ReplicaStateManager.java | 1 +
.../disaster/ManualGroupRestartRequest.java | 7 +-
.../disaster/DisasterRecoveryTestUtil.java | 88 ++++++
.../disaster/ItDisasterRecoveryManagerTest.java | 316 +++++++++++++++++++--
.../ItDisasterRecoveryReconfigurationTest.java | 75 +----
5 files changed, 405 insertions(+), 82 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
index da446ed54cb..6301912057d 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
@@ -280,6 +280,7 @@ class ReplicaStateManager {
if (context.reservedForPrimary) {
// If is primary, turning off the primary first.
context.replicaState = ReplicaState.RESTART_PLANNED;
+
return replicaManager.stopLeaseProlongation(groupId, null)
.thenCompose(unused ->
planDeferredReplicaStop(groupId, context, stopOperation));
} else {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 11743c51146..92c0447fde4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -192,6 +192,11 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
revision,
assignmentsTimestamp
));
+ } else {
+ restartFutures.add(CompletableFuture.failedFuture(
+ new
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
+ + "to perform reset with clean
up.")
+ ));
}
} else {
restartFutures.add(
@@ -210,7 +215,7 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
assignmentsTimestamp
);
} else {
- throw new
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node "
+ throw new
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive nodes "
+ "to perform reset with
clean up.");
}
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
new file mode 100644
index 00000000000..0d78cb38abf
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster;
+
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.stablePartitionAssignmentsKey;
+import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
+
+import java.util.List;
+import java.util.function.BiPredicate;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.OperationType;
+import org.apache.ignite.internal.metastorage.dsl.Statement;
+import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+
+/**
+ * Utility class for disaster recovery tests.
+ */
+class DisasterRecoveryTestUtil {
+ static void blockMessage(Cluster cluster, BiPredicate<String,
NetworkMessage> predicate) {
+
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
+ BiPredicate<String, NetworkMessage> oldPredicate =
node.dropMessagesPredicate();
+
+ if (oldPredicate == null) {
+ node.dropMessages(predicate);
+ } else {
+ node.dropMessages(oldPredicate.or(predicate));
+ }
+ });
+ }
+
+ static boolean stableKeySwitchMessage(
+ NetworkMessage msg,
+ PartitionGroupId partId,
+ Assignments blockedAssignments
+ ) {
+ if (msg instanceof WriteActionRequest) {
+ var writeActionRequest = (WriteActionRequest) msg;
+ WriteCommand command = writeActionRequest.deserializedCommand();
+
+ if (command instanceof MultiInvokeCommand) {
+ MultiInvokeCommand multiInvokeCommand = (MultiInvokeCommand)
command;
+
+ Statement andThen = multiInvokeCommand.iif().andThen();
+
+ if (andThen instanceof UpdateStatement) {
+ UpdateStatement updateStatement = (UpdateStatement)
andThen;
+ List<Operation> operations =
updateStatement.update().operations();
+
+ ByteArray stablePartAssignmentsKey =
stablePartitionAssignmentsKey(partId);
+
+ for (Operation operation : operations) {
+ ByteArray opKey = new
ByteArray(toByteArray(operation.key()));
+
+ if (operation.type() == OperationType.PUT &&
opKey.equals(stablePartAssignmentsKey)) {
+ return
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
+ }
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 18aef6f393a..148a0ed7c19 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -23,6 +23,9 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
+import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getDefaultZone;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
@@ -32,6 +35,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -44,14 +48,20 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -76,9 +86,13 @@ import
org.apache.ignite.internal.table.distributed.disaster.exceptions.Disaster
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
/** For {@link DisasterRecoveryManager} integration testing. */
// TODO https://issues.apache.org/jira/browse/IGNITE-22332 Add test cases.
@@ -181,48 +195,300 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
assertInstanceOf(DisasterRecoveryException.class,
exception.getCause());
- assertThat(exception.getCause().getMessage(), is("Not enough alive
node to perform reset with clean up."));
+ assertThat(exception.getCause().getMessage(), is("Not enough alive
nodes to perform reset with clean up."));
}
@WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "false")
@Test
- void testRestartTablePartitionsWithCleanUp() throws Exception {
+ void testRestartHaTablePartitionsWithCleanUpFails() {
IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
- IgniteImpl node1 = unwrapIgniteImpl(cluster.startNode(1));
- IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
String testZone = "TEST_ZONE";
- createZone(node.catalogManager(), testZone, 1, 3);
+ createZone(node.catalogManager(), testZone, 1, 1, null, null,
ConsistencyMode.HIGH_AVAILABILITY);
- String tableName2 = "TABLE_NAME_2";
+ String tableName = "TABLE_NAME";
node.sql().executeScript(String.format(
"CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE
TEST_ZONE",
- tableName2
+ tableName
));
- insert(0, 0, tableName2);
-
- assertValueOnSpecificNodes(tableName2, Set.of(node, node1, node2), 0,
0);
-
int partitionId = 0;
CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
Set.of(node.name()),
testZone,
SqlCommon.DEFAULT_SCHEMA_NAME,
- tableName2,
+ tableName,
Set.of(partitionId)
);
+ ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> restartPartitionsWithCleanupFuture.get(10_000,
MILLISECONDS)
+ );
+
+ assertInstanceOf(DisasterRecoveryException.class,
exception.getCause());
+
+ assertThat(exception.getCause().getMessage(), is("Not enough alive
nodes to perform reset with clean up."));
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "false")
+ @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1},
raftLeader={2}")
+ @CsvSource({
+ "STRONG_CONSISTENCY, true, false",
+ "STRONG_CONSISTENCY, false, true",
+ "STRONG_CONSISTENCY, false, false",
+ "HIGH_AVAILABILITY, true, false",
+ "HIGH_AVAILABILITY, false, true",
+ "HIGH_AVAILABILITY, false, false"
+ })
+ void testRestartTablePartitionsWithCleanUp(
+ ConsistencyMode consistencyMode,
+ boolean primaryReplica,
+ boolean raftLeader
+ ) throws Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+ cluster.startNode(1);
+
+ String testZone = "TEST_ZONE";
+
+ if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+ createZone(node.catalogManager(), testZone, 1, 2, null, null,
ConsistencyMode.HIGH_AVAILABILITY);
+ } else {
+ cluster.startNode(2);
+
+ createZone(node.catalogManager(), testZone, 1, 3);
+ }
+
+ Set<IgniteImpl> runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ String tableName = "TABLE_NAME";
+
+ node.sql().executeScript(String.format(
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE
TEST_ZONE",
+ tableName
+ ));
+
+ insert(0, 0, tableName);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ IgniteImpl nodeToCleanup = findNodeConformingOptions(tableName,
primaryReplica, raftLeader);
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
+ Set.of(nodeToCleanup.name()),
+ testZone,
+ SqlCommon.DEFAULT_SCHEMA_NAME,
+ tableName,
+ Set.of(0)
+ );
+
assertThat(restartPartitionsWithCleanupFuture,
willCompleteSuccessfully());
- assertThat(awaitPrimaryReplicaForNow(node, new
TablePartitionId(tableId(node), partitionId)), willCompleteSuccessfully());
- insert(1, 1, tableName2);
+ insert(1, 1, tableName);
- assertValueOnSpecificNodes(tableName2, Set.of(node, node1, node2), 0,
0);
- assertValueOnSpecificNodes(tableName2, Set.of(node, node1, node2), 1,
1);
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 1, 1);
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "false")
+ @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1}")
+ @CsvSource({
+ "STRONG_CONSISTENCY, true",
+ "STRONG_CONSISTENCY, false",
+ "HIGH_AVAILABILITY, true",
+ "HIGH_AVAILABILITY, false",
+ })
+ void testRestartTablePartitionsWithCleanUpTxRollback(ConsistencyMode
consistencyMode, boolean primaryReplica) throws Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ cluster.startNode(1);
+
+ String testZone = "TEST_ZONE";
+
+ if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
+ createZone(node.catalogManager(), testZone, 1, 2, null, null,
ConsistencyMode.HIGH_AVAILABILITY);
+ } else {
+ cluster.startNode(2);
+
+ createZone(node.catalogManager(), testZone, 1, 3);
+ }
+
+ Set<IgniteImpl> runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ String tableName = "TABLE_NAME";
+
+ node.sql().executeScript(String.format(
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE
TEST_ZONE",
+ tableName
+ ));
+
+ insert(0, 0, tableName);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ IgniteImpl primaryNode = unwrapIgniteImpl(findPrimaryIgniteNode(node,
new TablePartitionId(tableId(node, tableName), 0)));
+
+ IgniteImpl nodeToCleanup;
+
+ if (primaryReplica) {
+ nodeToCleanup = primaryNode;
+ } else {
+ nodeToCleanup = cluster.runningNodes()
+ .filter(n -> !n.name().equals(primaryNode.name()))
+ .map(TestWrappers::unwrapIgniteImpl)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No node
found that is not a primary replica."));
+ }
+
+ Transaction tx = nodeToCleanup.transactions().begin();
+
+ nodeToCleanup.sql().execute(tx, "INSERT INTO TABLE_NAME VALUES (2,
2)");
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
+
nodeToCleanup.disasterRecoveryManager().restartTablePartitionsWithCleanup(
+ Set.of(nodeToCleanup.name()),
+ testZone,
+ SqlCommon.DEFAULT_SCHEMA_NAME,
+ tableName,
+ Set.of(0)
+ );
+
+ assertThat(restartPartitionsWithCleanupFuture,
willCompleteSuccessfully());
+
+ if (primaryReplica) {
+ // We expect here that tx will be rolled back because we have
restarted primary replica. This is ensured by the fact that we
+ // use ReplicaManager.weakStopReplica(RESTART) in
restartTablePartitionsWithCleanup, and this mechanism
+ // waits for replica expiration and stops lease prolongation. As a
result, the transaction will not be able to commit
+ // because the primary replica has expired.
+ assertThrows(TransactionException.class, tx::commit, "Primary
replica has expired, transaction will be rolled back");
+ } else {
+ tx.commit();
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 2, 2);
+ }
+ }
+
+ @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "false")
+ @Test
+ void testRestartTablePartitionsWithCleanUpConcurrentRebalance() throws
Exception {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ unwrapIgniteImpl(cluster.startNode(1));
+
+ String testZone = "TEST_ZONE";
+
+ createZone(node.catalogManager(), testZone, 1, 2);
+
+ Set<IgniteImpl> runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ String tableName = "TABLE_NAME";
+
+ node.sql().executeScript(String.format(
+ "CREATE TABLE %s (id INT PRIMARY KEY, valInt INT) ZONE
TEST_ZONE",
+ tableName
+ ));
+
+ insert(0, 0, tableName);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ IgniteImpl node2 = unwrapIgniteImpl(cluster.startNode(2));
+
+ int catalogVersion = node.catalogManager().latestCatalogVersion();
+
+ long timestamp = node.catalogManager().catalog(catalogVersion).time();
+
+ Assignments assignmentPending = Assignments.of(timestamp,
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name())
+ );
+
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableId(node, tableName), 0);
+
+ AtomicBoolean blocked = new AtomicBoolean(true);
+
+ AtomicBoolean reached = new AtomicBoolean(false);
+
+ blockMessage(cluster, (nodeName, msg) -> {
+ reached.set(true);
+ return blocked.get() && stableKeySwitchMessage(msg,
replicationGroupId, assignmentPending);
+ });
+
+ alterZone(node.catalogManager(), testZone, 3);
+
+ waitForCondition(reached::get, 10_000L);
+
+ CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
+ Set.of(node2.name()),
+ testZone,
+ SqlCommon.DEFAULT_SCHEMA_NAME,
+ tableName,
+ Set.of(0)
+ );
+
+ assertThat(restartPartitionsWithCleanupFuture,
willCompleteSuccessfully());
+
+ insert(1, 1, tableName);
+
+ blocked.set(false);
+
+ runningNodes =
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).collect(Collectors.toSet());
+
+ assertEquals(3, runningNodes.size(), "Expected 3 running nodes after
zone alteration");
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);
+
+ assertValueOnSpecificNodes(tableName, runningNodes, 1, 1);
+ }
+
+ private IgniteImpl findNodeConformingOptions(String tableName, boolean
primaryReplica, boolean raftLeader) throws InterruptedException {
+ Ignite nodeToCleanup;
+
+ IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
+
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableId(ignite, tableName), 0);
+
+ String primaryNodeName = findPrimaryNodeName(ignite,
replicationGroupId);
+
+ String raftLeaderNodeName =
cluster.leaderServiceFor(replicationGroupId).getServerId().getConsistentId();
+
+ if (primaryReplica) {
+ nodeToCleanup = findPrimaryIgniteNode(ignite, replicationGroupId);
+ } else if (raftLeader) {
+ nodeToCleanup = cluster.runningNodes()
+ .filter(node -> node.name().equals(raftLeaderNodeName))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No node
found that is a raft leader for the specified options."));
+ } else {
+ nodeToCleanup = cluster.runningNodes()
+ .filter(node -> !node.name().equals(raftLeaderNodeName) &&
!node.name().equals(primaryNodeName))
+ .findFirst()
+ .orElse(cluster.aliveNode());
+ }
+
+ return unwrapIgniteImpl(nodeToCleanup);
+ }
+
+ private String findPrimaryNodeName(IgniteImpl ignite, TablePartitionId
replicationGroupId) {
+ assertThat(awaitPrimaryReplicaForNow(ignite, replicationGroupId),
willCompleteSuccessfully());
+
+ CompletableFuture<ReplicaMeta> primary =
ignite.placementDriver().getPrimaryReplica(replicationGroupId,
ignite.clock().now());
+
+ assertThat(primary, willCompleteSuccessfully());
+
+ return primary.join().getLeaseholder();
+ }
+
+ private Ignite findPrimaryIgniteNode(IgniteImpl ignite, TablePartitionId
replicationGroupId) {
+ return cluster.runningNodes()
+ .filter(node -> node.name().equals(findPrimaryNodeName(ignite,
replicationGroupId)))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No node found
that is a primary replica for the specified options."));
}
private static void assertValueOnSpecificNodes(String tableName,
Set<IgniteImpl> nodes, int id, int val) throws Exception {
@@ -237,11 +503,15 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
Row keyValueRow0 = createKeyValueRow(id, val);
Row keyRow0 = createKeyRow(id);
- CompletableFuture<BinaryRow> getFut = internalTable.get(keyRow0,
node.clock().now(), node.node());
-
- assertThat(getFut, willCompleteSuccessfully());
+ assertTrue(waitForCondition(() -> {
+ try {
+ CompletableFuture<BinaryRow> getFut =
internalTable.get(keyRow0, node.clock().now(), node.node());
- assertTrue(compareRows(getFut.get(), keyValueRow0));
+ return compareRows(getFut.get(), keyValueRow0);
+ } catch (Exception e) {
+ return false;
+ }
+ }, 10_000), "Row comparison failed within the timeout.");
}
@Test
@@ -518,7 +788,11 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
}
private static int tableId(IgniteImpl node) {
- return ((Wrapper)
node.tables().table(TABLE_NAME)).unwrap(TableImpl.class).tableId();
+ return tableId(node, TABLE_NAME);
+ }
+
+ private static int tableId(IgniteImpl node, String tableName) {
+ return ((Wrapper)
node.tables().table(tableName)).unwrap(TableImpl.class).tableId();
}
private static int zoneId(IgniteImpl node) {
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index f140896f991..2e27670b739 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -27,6 +27,8 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
+import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.pendingPartitionAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.plannedPartitionAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.stablePartitionAssignmentsKey;
@@ -39,7 +41,6 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
-import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
@@ -69,13 +70,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterConfiguration.Builder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
-import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
@@ -84,14 +83,8 @@ import
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
-import org.apache.ignite.internal.metastorage.dsl.OperationType;
-import org.apache.ignite.internal.metastorage.dsl.Statement;
-import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
import org.apache.ignite.internal.network.NetworkMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
@@ -106,7 +99,6 @@ import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -133,7 +125,6 @@ import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
-import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -455,7 +446,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
// Blocking stable switch to the first phase or reset,
// so that we'll have force pending assignments unexecuted.
- blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId,
assignment0));
+ blockMessage(cluster, (nodeName, msg) -> stableKeySwitchMessage(msg,
partitionGroupId(partId), assignment0));
// Init reset:
// pending = [0, force]
@@ -536,7 +527,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
// Blocking stable switch to the first phase or reset,
// so that we'll have force pending assignments unexecuted.
- blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId,
assignmentPending));
+ blockMessage(cluster, (nodeName, msg) -> stableKeySwitchMessage(msg,
partitionGroupId(partId), assignmentPending));
// Stop 3. Nodes 0 and 1 survived.
stopNode(3);
@@ -1044,7 +1035,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
blockedNodes.add(followerNodes.remove(node0IndexInFollowers == -1 ? 0
: node0IndexInFollowers));
logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
- blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg,
partitionGroupId, blockedNodes));
+ blockMessage(cluster, (nodeName, msg) ->
dataReplicateMessage(nodeName, msg, partitionGroupId, blockedNodes));
// Write data(2) to 6 nodes.
errors = insertValues(table, partId, 10);
@@ -1191,7 +1182,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
blockedNodes.add(blockedNode);
logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
- blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg,
partitionGroupId, blockedNodes));
+ blockMessage(cluster, (nodeName, msg) ->
dataReplicateMessage(nodeName, msg, partitionGroupId, blockedNodes));
// Write data(2) to 6 nodes.
errors = insertValues(table, partId, 10);
@@ -1380,7 +1371,10 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
AtomicBoolean blockedLink = new AtomicBoolean(true);
// Block stable switch to check that we initially add reset phase 1
assignments to the chain.
- blockMessage((nodeName, msg) -> blockedLink.get() &&
stableKeySwitchMessage(msg, partId, resetAssignments));
+ blockMessage(
+ cluster,
+ (nodeName, msg) -> blockedLink.get() &&
stableKeySwitchMessage(msg, partitionGroupId(partId), resetAssignments)
+ );
stopNodesInParallel(1, 2);
@@ -1496,7 +1490,10 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
AtomicBoolean blockedLink2 = new AtomicBoolean(true);
// Block stable switch to check that we initially add reset phase 1
assignments to the chain.
- blockMessage((nodeName, msg) -> blockedLink2.get() &&
stableKeySwitchMessage(msg, partId, link2Assignments));
+ blockMessage(
+ cluster,
+ (nodeName, msg) -> blockedLink2.get() &&
stableKeySwitchMessage(msg, partitionGroupId(partId), link2Assignments)
+ );
logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3,
4, 5, 6}));
@@ -1776,18 +1773,6 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
return candidates.get(0);
}
- private void blockMessage(BiPredicate<String, NetworkMessage> predicate) {
-
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
- BiPredicate<String, NetworkMessage> oldPredicate =
node.dropMessagesPredicate();
-
- if (oldPredicate == null) {
- node.dropMessages(predicate);
- } else {
- node.dropMessages(oldPredicate.or(predicate));
- }
- });
- }
-
private static boolean dataReplicateMessage(
String nodeName,
NetworkMessage msg,
@@ -1807,37 +1792,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
* Block rebalance, so stable won't be switched to specified pending.
*/
private void blockRebalanceStableSwitch(int partId, Assignments
assignment) {
- blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId,
assignment));
- }
-
- private boolean stableKeySwitchMessage(NetworkMessage msg, int partId,
Assignments blockedAssignments) {
- if (msg instanceof WriteActionRequest) {
- var writeActionRequest = (WriteActionRequest) msg;
- WriteCommand command = writeActionRequest.deserializedCommand();
-
- if (command instanceof MultiInvokeCommand) {
- MultiInvokeCommand multiInvokeCommand = (MultiInvokeCommand)
command;
-
- Statement andThen = multiInvokeCommand.iif().andThen();
-
- if (andThen instanceof UpdateStatement) {
- UpdateStatement updateStatement = (UpdateStatement)
andThen;
- List<Operation> operations =
updateStatement.update().operations();
-
- ByteArray stablePartAssignmentsKey =
stablePartitionAssignmentsKey(partitionGroupId(partId));
-
- for (Operation operation : operations) {
- ByteArray opKey = new
ByteArray(toByteArray(operation.key()));
-
- if (operation.type() == OperationType.PUT &&
opKey.equals(stablePartAssignmentsKey)) {
- return
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
- }
- }
- }
- }
- }
-
- return false;
+ blockMessage(cluster, (nodeName, msg) -> stableKeySwitchMessage(msg,
partitionGroupId(partId), assignment));
}
private void waitForPartitionState(IgniteImpl node0, int partId,
GlobalPartitionStateEnum expectedState) throws InterruptedException {