This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 561254108b3 IGNITE-26035 Add rebalancing test via raft snapshot and
low watermark change (#6338)
561254108b3 is described below
commit 561254108b3bb5d10d327c9d0975339ff182c815
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jul 30 11:36:46 2025 +0300
IGNITE-26035 Add rebalancing test via raft snapshot and low watermark
change (#6338)
---
modules/raft/build.gradle | 3 +
.../raftsnapshot/ItParallelRaftSnapshotsTest.java | 121 ++++++++++++++++-----
2 files changed, 95 insertions(+), 29 deletions(-)
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index 66346fcfe1c..fae0fc6fe22 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -106,6 +106,9 @@ dependencies {
integrationTestImplementation project(':ignite-partition-replicator')
integrationTestImplementation project(':ignite-replicator')
integrationTestImplementation project(':ignite-sql-engine')
+ integrationTestImplementation project(':ignite-low-watermark')
+ integrationTestImplementation project(':ignite-schema')
+ integrationTestImplementation project(':ignite-configuration-root')
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.dropwizard.metrics
integrationTestImplementation libs.disruptor
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
index aac8fa586b6..65c4ab30987 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
@@ -22,6 +22,8 @@ import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.event.EventListener.fromConsumer;
+import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.awaitility.Awaitility.await;
@@ -29,23 +31,31 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.function.IntPredicate;
+import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.Member;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import
org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest {
- private static final String TEST_ZONE_NAME = "TEST_ZONE";
+ private static final String ZONE_NAME = "TEST_ZONE";
- private static final String TEST_TABLE_NAME = "TEST_TABLE";
+ private static final String TABLE_NAME = "TEST_TABLE";
@Override
protected int initialNodes() {
@@ -58,44 +68,40 @@ class ItParallelRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
*/
@Test
void testSnapshotStreamingToMultipleNodes() {
- String zoneSql = String.format(
- "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=1,
REPLICAS=5",
- TEST_ZONE_NAME, DEFAULT_STORAGE_PROFILE
- );
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, initialNodes());
- executeSql(zoneSql);
-
- String tableSql = String.format(
- "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(20)) ZONE
%s ",
- TEST_TABLE_NAME, TEST_ZONE_NAME
- );
-
- executeSql(tableSql);
-
- ReplicationGroupId groupId = cluster.solePartitionId(TEST_ZONE_NAME,
TEST_TABLE_NAME);
+ ReplicationGroupId groupId = cluster.solePartitionId(ZONE_NAME,
TABLE_NAME);
int primaryReplicaIndex = primaryReplicaIndex(groupId);
// Stop two nodes that are neither primary replicas for the test table
nor for the metastorage.
- List<Integer> nodesToKill = cluster.aliveNodesWithIndices().stream()
- .map(IgniteBiTuple::get1)
- .filter(index -> index != primaryReplicaIndex && index != 0)
- .limit(2)
- .collect(toList());
+ int[] nodesToKill = nodeToKillIndexes(nodeIndex -> nodeIndex !=
primaryReplicaIndex && nodeIndex != 0, 2);
- nodesToKill.parallelStream().forEach(cluster::stopNode);
+ Arrays.stream(nodesToKill).parallel().forEach(cluster::stopNode);
+
+ int tableSize = 100;
// After the nodes have been stopped, insert some data and truncate
the Raft log on primary replica.
- executeSql(primaryReplicaIndex, String.format("INSERT INTO %s VALUES
(1, 'one')", TEST_TABLE_NAME));
+ insertIntoTable(primaryReplicaIndex, TABLE_NAME, tableSize);
- validateTableData(primaryReplicaIndex);
+ validateTableData(primaryReplicaIndex, tableSize);
truncateLog(primaryReplicaIndex, groupId);
// Start the nodes in parallel, Raft leader is expected to start
streaming snapshots on them.
- nodesToKill.parallelStream().forEach(cluster::startNode);
+ Arrays.stream(nodesToKill).parallel().forEach(cluster::startNode);
- nodesToKill.forEach(this::validateTableData);
+ Arrays.stream(nodesToKill).forEach(nodeIndex ->
validateTableData(nodeIndex, tableSize));
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26034")
+ void testInstallRaftSnapshotAfterUpdateLowWatermark() {
+ updateLowWatermarkUpdateInterval(1_000);
+
+ assertThat(awaitUpdateLowWatermarkAsync(cluster.nodes()),
willCompleteSuccessfully());
+
+ testSnapshotStreamingToMultipleNodes();
}
private int primaryReplicaIndex(ReplicationGroupId groupId) {
@@ -124,8 +130,8 @@ class ItParallelRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
assertThat(truncateFuture, willCompleteSuccessfully());
}
- private void validateTableData(int nodeIndex) {
- Table table = cluster.node(nodeIndex).tables().table(TEST_TABLE_NAME);
+ private void validateTableData(int nodeIndex, long expTableSize) {
+ Table table = cluster.node(nodeIndex).tables().table(TABLE_NAME);
InternalTable internalTable = unwrapTableImpl(table).internalTable();
@@ -133,6 +139,63 @@ class ItParallelRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
assertThat(mvPartition, is(notNullValue()));
- await().until(mvPartition::estimatedSize, is(1L));
+ await().until(mvPartition::estimatedSize, is(expTableSize));
+ }
+
+ private void createZoneAndTable(String zoneName, String tableName, int
partitions, int replicas) {
+ executeSql(String.format(
+ "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=%s,
REPLICAS=%s",
+ zoneName, DEFAULT_STORAGE_PROFILE, partitions, replicas
+ ));
+
+ executeSql(String.format(
+ "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(256)) ZONE
%s ",
+ tableName, zoneName
+ ));
+ }
+
+ private int[] nodeToKillIndexes(IntPredicate indexFilter, int limit) {
+ return cluster.aliveNodesWithIndices().stream()
+ .mapToInt(IgniteBiTuple::get1)
+ .filter(Objects::nonNull)
+ .filter(indexFilter)
+ .limit(limit)
+ .toArray();
+ }
+
+ private void insertIntoTable(int nodeIndex, String tableName, int count) {
+ String insertDml = String.format("INSERT INTO %s (key, val) VALUES (?,
?)", tableName);
+
+ for (int i = 0; i < count; i++) {
+ executeSql(nodeIndex, insertDml, i, "n_" + count);
+ }
+ }
+
+ private void updateLowWatermarkUpdateInterval(long intervalMillis) {
+ IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+ CompletableFuture<Void> updateConfigFuture =
node.clusterConfiguration().getConfiguration(GcExtensionConfiguration.KEY)
+ .gc()
+ .lowWatermark()
+ .updateIntervalMillis()
+ .update(intervalMillis);
+
+ assertThat(updateConfigFuture, willCompleteSuccessfully());
+ }
+
+ private static CompletableFuture<Void>
awaitUpdateLowWatermarkAsync(List<Ignite> nodes) {
+ List<CompletableFuture<Void>> futures = nodes.stream()
+ .map(TestWrappers::unwrapIgniteImpl)
+ .map(IgniteImpl::lowWatermark)
+ .map(lowWatermark -> {
+ var future = new CompletableFuture<Void>();
+
+ lowWatermark.listen(LOW_WATERMARK_CHANGED, fromConsumer(p
-> future.complete(null)));
+
+ return future;
+ })
+ .collect(toList());
+
+ return CompletableFutures.allOf(futures);
}
}