This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 561254108b3 IGNITE-26035 Add rebalancing test via raft snapshot and 
low watermark change (#6338)
561254108b3 is described below

commit 561254108b3bb5d10d327c9d0975339ff182c815
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jul 30 11:36:46 2025 +0300

    IGNITE-26035 Add rebalancing test via raft snapshot and low watermark 
change (#6338)
---
 modules/raft/build.gradle                          |   3 +
 .../raftsnapshot/ItParallelRaftSnapshotsTest.java  | 121 ++++++++++++++++-----
 2 files changed, 95 insertions(+), 29 deletions(-)

diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index 66346fcfe1c..fae0fc6fe22 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -106,6 +106,9 @@ dependencies {
     integrationTestImplementation project(':ignite-partition-replicator')
     integrationTestImplementation project(':ignite-replicator')
     integrationTestImplementation project(':ignite-sql-engine')
+    integrationTestImplementation project(':ignite-low-watermark')
+    integrationTestImplementation project(':ignite-schema')
+    integrationTestImplementation project(':ignite-configuration-root')
     integrationTestImplementation libs.awaitility
     integrationTestImplementation libs.dropwizard.metrics
     integrationTestImplementation libs.disruptor
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
index aac8fa586b6..65c4ab30987 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
@@ -22,6 +22,8 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.event.EventListener.fromConsumer;
+import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.awaitility.Awaitility.await;
@@ -29,23 +31,31 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.IntPredicate;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.Member;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest {
-    private static final String TEST_ZONE_NAME = "TEST_ZONE";
+    private static final String ZONE_NAME = "TEST_ZONE";
 
-    private static final String TEST_TABLE_NAME = "TEST_TABLE";
+    private static final String TABLE_NAME = "TEST_TABLE";
 
     @Override
     protected int initialNodes() {
@@ -58,44 +68,40 @@ class ItParallelRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
      */
     @Test
     void testSnapshotStreamingToMultipleNodes() {
-        String zoneSql = String.format(
-                "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=1, 
REPLICAS=5",
-                TEST_ZONE_NAME, DEFAULT_STORAGE_PROFILE
-        );
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, initialNodes());
 
-        executeSql(zoneSql);
-
-        String tableSql = String.format(
-                "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(20)) ZONE 
%s ",
-                TEST_TABLE_NAME, TEST_ZONE_NAME
-        );
-
-        executeSql(tableSql);
-
-        ReplicationGroupId groupId = cluster.solePartitionId(TEST_ZONE_NAME, 
TEST_TABLE_NAME);
+        ReplicationGroupId groupId = cluster.solePartitionId(ZONE_NAME, 
TABLE_NAME);
 
         int primaryReplicaIndex = primaryReplicaIndex(groupId);
 
         // Stop two nodes that are neither primary replicas for the test table 
nor for the metastorage.
-        List<Integer> nodesToKill = cluster.aliveNodesWithIndices().stream()
-                .map(IgniteBiTuple::get1)
-                .filter(index -> index != primaryReplicaIndex && index != 0)
-                .limit(2)
-                .collect(toList());
+        int[] nodesToKill = nodeToKillIndexes(nodeIndex -> nodeIndex != 
primaryReplicaIndex && nodeIndex != 0, 2);
 
-        nodesToKill.parallelStream().forEach(cluster::stopNode);
+        Arrays.stream(nodesToKill).parallel().forEach(cluster::stopNode);
+
+        int tableSize = 100;
 
         // After the nodes have been stopped, insert some data and truncate 
the Raft log on primary replica.
-        executeSql(primaryReplicaIndex, String.format("INSERT INTO %s VALUES 
(1, 'one')", TEST_TABLE_NAME));
+        insertIntoTable(primaryReplicaIndex, TABLE_NAME, tableSize);
 
-        validateTableData(primaryReplicaIndex);
+        validateTableData(primaryReplicaIndex, tableSize);
 
         truncateLog(primaryReplicaIndex, groupId);
 
         // Start the nodes in parallel, Raft leader is expected to start 
streaming snapshots on them.
-        nodesToKill.parallelStream().forEach(cluster::startNode);
+        Arrays.stream(nodesToKill).parallel().forEach(cluster::startNode);
 
-        nodesToKill.forEach(this::validateTableData);
+        Arrays.stream(nodesToKill).forEach(nodeIndex -> 
validateTableData(nodeIndex, tableSize));
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26034";)
+    void testInstallRaftSnapshotAfterUpdateLowWatermark() {
+        updateLowWatermarkUpdateInterval(1_000);
+
+        assertThat(awaitUpdateLowWatermarkAsync(cluster.nodes()), 
willCompleteSuccessfully());
+
+        testSnapshotStreamingToMultipleNodes();
     }
 
     private int primaryReplicaIndex(ReplicationGroupId groupId) {
@@ -124,8 +130,8 @@ class ItParallelRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
         assertThat(truncateFuture, willCompleteSuccessfully());
     }
 
-    private void validateTableData(int nodeIndex) {
-        Table table = cluster.node(nodeIndex).tables().table(TEST_TABLE_NAME);
+    private void validateTableData(int nodeIndex, long expTableSize) {
+        Table table = cluster.node(nodeIndex).tables().table(TABLE_NAME);
 
         InternalTable internalTable = unwrapTableImpl(table).internalTable();
 
@@ -133,6 +139,63 @@ class ItParallelRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
 
         assertThat(mvPartition, is(notNullValue()));
 
-        await().until(mvPartition::estimatedSize, is(1L));
+        await().until(mvPartition::estimatedSize, is(expTableSize));
+    }
+
+    private void createZoneAndTable(String zoneName, String tableName, int 
partitions, int replicas) {
+        executeSql(String.format(
+                "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=%s, 
REPLICAS=%s",
+                zoneName, DEFAULT_STORAGE_PROFILE, partitions, replicas
+        ));
+
+        executeSql(String.format(
+                "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(256)) ZONE 
%s ",
+                tableName, zoneName
+        ));
+    }
+
+    private int[] nodeToKillIndexes(IntPredicate indexFilter, int limit) {
+        return cluster.aliveNodesWithIndices().stream()
+                .mapToInt(IgniteBiTuple::get1)
+                .filter(Objects::nonNull)
+                .filter(indexFilter)
+                .limit(limit)
+                .toArray();
+    }
+
+    private void insertIntoTable(int nodeIndex, String tableName, int count) {
+        String insertDml = String.format("INSERT INTO %s (key, val) VALUES (?, 
?)", tableName);
+
+        for (int i = 0; i < count; i++) {
+            executeSql(nodeIndex, insertDml, i, "n_" + count);
+        }
+    }
+
+    private void updateLowWatermarkUpdateInterval(long intervalMillis) {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        CompletableFuture<Void> updateConfigFuture = 
node.clusterConfiguration().getConfiguration(GcExtensionConfiguration.KEY)
+                .gc()
+                .lowWatermark()
+                .updateIntervalMillis()
+                .update(intervalMillis);
+
+        assertThat(updateConfigFuture, willCompleteSuccessfully());
+    }
+
+    private static CompletableFuture<Void> 
awaitUpdateLowWatermarkAsync(List<Ignite> nodes) {
+        List<CompletableFuture<Void>> futures = nodes.stream()
+                .map(TestWrappers::unwrapIgniteImpl)
+                .map(IgniteImpl::lowWatermark)
+                .map(lowWatermark -> {
+                    var future = new CompletableFuture<Void>();
+
+                    lowWatermark.listen(LOW_WATERMARK_CHANGED, fromConsumer(p 
-> future.complete(null)));
+
+                    return future;
+                })
+                .collect(toList());
+
+        return CompletableFutures.allOf(futures);
     }
 }

Reply via email to