This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26035
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit aeac92411701f371cd4d942bffb07a4939cfe439
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jul 30 08:51:53 2025 +0300

    IGNITE-26035 wip
---
 .../internal/lowwatermark/LowWatermarkImpl.java    |  91 +++++-----------
 modules/raft/build.gradle                          |   3 +
 .../raftsnapshot/ItParallelRaftSnapshotsTest.java  | 121 ++++++++++++++++-----
 3 files changed, 122 insertions(+), 93 deletions(-)

diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index 4119871b2d9..de8c264d6f9 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.lowwatermark;
 
-import static 
org.apache.ignite.configuration.notifications.ConfigurationListener.fromConsumer;
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
@@ -37,9 +36,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.event.AbstractEventProducer;
@@ -51,7 +48,6 @@ import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import 
org.apache.ignite.internal.lowwatermark.ScheduledUpdateLowWatermarkTask.State;
 import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
 import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEventParameters;
@@ -128,11 +124,6 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     private final Map<UUID, LowWatermarkLock> locks = new 
ConcurrentHashMap<>();
 
-    /** Guarded by {@link #scheduleUpdateLowWatermarkTaskLock}. */
-    private @Nullable ScheduledUpdateLowWatermarkTask 
lastScheduledUpdateLowWatermarkTask;
-
-    private final Lock scheduleUpdateLowWatermarkTaskLock = new 
ReentrantLock();
-
     /**
      * Constructor.
      *
@@ -168,7 +159,12 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
             messagingService.addMessageHandler(LowWatermarkMessageGroup.class, 
this::onReceiveNetworkMessage);
 
-            lowWatermarkConfig.updateIntervalMillis().listen(fromConsumer(ctx 
-> scheduleUpdates()));
+            // TODO: IGNITE-26035 убрать
+            lowWatermarkConfig.updateIntervalMillis().listen(ctx -> {
+                scheduleUpdates();
+
+                return nullCompletedFuture();
+            });
 
             return nullCompletedFuture();
         });
@@ -223,57 +219,24 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
     }
 
     private void scheduleUpdateLowWatermarkBusy() {
-        scheduleUpdateLowWatermarkTaskLock.lock();
-
-        try {
-            ScheduledUpdateLowWatermarkTask lastTask = 
lastScheduledUpdateLowWatermarkTask;
-            ScheduledUpdateLowWatermarkTask newTask = new 
ScheduledUpdateLowWatermarkTask(this, State.NEW);
-
-            State lastTaskState = lastTask == null ? State.COMPLETED : 
lastTask.state();
+        // TODO: IGNITE-26035 Убрать синхранизацию и прочий код
+        synchronized (lastScheduledTaskFuture) {
+            ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
 
-            switch (lastTaskState) {
-                case NEW:
-                    if (lastTask.tryCancel()) {
-                        lastScheduledUpdateLowWatermarkTask = newTask;
-
-                        scheduleUpdateLowWatermarkTaskBusy(newTask);
-                    }
-
-                    break;
-                case IN_PROGRESS:
-                    // In this case we don't need to schedule a new task 
because the current task that is in progress will schedule a new
-                    // task when it finishes.
-                    break;
-                case COMPLETED:
-                    lastScheduledUpdateLowWatermarkTask = newTask;
-
-                    scheduleUpdateLowWatermarkTaskBusy(newTask);
-
-                    break;
-                default:
-                    throw new AssertionError("Unknown state: " + 
lastTaskState);
+            if (previousScheduledFuture != null && 
!previousScheduledFuture.isDone()) {
+                previousScheduledFuture.cancel(true);
             }
-        } finally {
-            scheduleUpdateLowWatermarkTaskLock.unlock();
-        }
-    }
-
-    private void 
scheduleUpdateLowWatermarkTaskBusy(ScheduledUpdateLowWatermarkTask task) {
-        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
-
-        if (previousScheduledFuture != null && 
!previousScheduledFuture.isDone()) {
-            previousScheduledFuture.cancel(true);
-        }
 
-        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
-                task,
-                lowWatermarkConfig.updateIntervalMillis().value(),
-                TimeUnit.MILLISECONDS
-        );
+            ScheduledFuture<?> newScheduledFuture = 
scheduledThreadPool.schedule(
+                    () -> updateLowWatermark(createNewLowWatermarkCandidate()),
+                    lowWatermarkConfig.updateIntervalMillis().value(),
+                    TimeUnit.MILLISECONDS
+            );
 
-        boolean casResult = 
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, 
newScheduledFuture);
+            boolean casResult = 
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, 
newScheduledFuture);
 
-        assert casResult : "only one scheduled task is expected";
+            assert casResult : "only one scheduled task is expected";
+        }
     }
 
     HybridTimestamp createNewLowWatermarkCandidate() {
@@ -319,11 +282,7 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     @Override
     public void updateLowWatermark(HybridTimestamp newLowWatermark) {
-        updateLowWatermarkAsync(newLowWatermark);
-    }
-
-    CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp 
newLowWatermark) {
-        return inBusyLockAsync(busyLock, () -> {
+        inBusyLock(busyLock, () -> {
             LowWatermarkCandidate newLowWatermarkCandidate = new 
LowWatermarkCandidate(newLowWatermark, new CompletableFuture<>());
             LowWatermarkCandidate oldLowWatermarkCandidate;
 
@@ -332,12 +291,12 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
                 // If another candidate contains a higher low watermark, then 
there is no need to update.
                 if 
(oldLowWatermarkCandidate.lowWatermark().compareTo(newLowWatermark) >= 0) {
-                    return nullCompletedFuture();
+                    return;
                 }
             } while 
(!lowWatermarkCandidate.compareAndSet(oldLowWatermarkCandidate, 
newLowWatermarkCandidate));
 
             // We will start the update as soon as the previous one finishes.
-            return oldLowWatermarkCandidate.updateFuture()
+            oldLowWatermarkCandidate.updateFuture()
                     .thenComposeAsync(unused -> 
updateAndNotify(newLowWatermark), scheduledThreadPool)
                     .whenComplete((unused, throwable) -> {
                         if (throwable != null) {
@@ -392,12 +351,16 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
                             .whenCompleteAsync((unused, throwable) -> {
                                 if (throwable != null) {
                                     if (!(hasCause(throwable, 
NodeStoppingException.class))) {
-                                        LOG.error("Failed to update low 
watermark: {}", throwable, newLowWatermark);
+                                        LOG.error("Failed to update low 
watermark, will schedule again: {}", throwable, newLowWatermark);
 
                                         failureManager.process(new 
FailureContext(CRITICAL_ERROR, throwable));
+
+                                        inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
                                     }
                                 } else {
                                     LOG.info("Successful low watermark update: 
{}", newLowWatermark);
+
+                                    inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
                                 }
                             }, scheduledThreadPool);
                 }
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index 66346fcfe1c..fae0fc6fe22 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -106,6 +106,9 @@ dependencies {
     integrationTestImplementation project(':ignite-partition-replicator')
     integrationTestImplementation project(':ignite-replicator')
     integrationTestImplementation project(':ignite-sql-engine')
+    integrationTestImplementation project(':ignite-low-watermark')
+    integrationTestImplementation project(':ignite-schema')
+    integrationTestImplementation project(':ignite-configuration-root')
     integrationTestImplementation libs.awaitility
     integrationTestImplementation libs.dropwizard.metrics
     integrationTestImplementation libs.disruptor
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
index aac8fa586b6..65c4ab30987 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
@@ -22,6 +22,8 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.event.EventListener.fromConsumer;
+import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.awaitility.Awaitility.await;
@@ -29,23 +31,31 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.IntPredicate;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.Member;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest {
-    private static final String TEST_ZONE_NAME = "TEST_ZONE";
+    private static final String ZONE_NAME = "TEST_ZONE";
 
-    private static final String TEST_TABLE_NAME = "TEST_TABLE";
+    private static final String TABLE_NAME = "TEST_TABLE";
 
     @Override
     protected int initialNodes() {
@@ -58,44 +68,40 @@ class ItParallelRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
      */
     @Test
     void testSnapshotStreamingToMultipleNodes() {
-        String zoneSql = String.format(
-                "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=1, 
REPLICAS=5",
-                TEST_ZONE_NAME, DEFAULT_STORAGE_PROFILE
-        );
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, initialNodes());
 
-        executeSql(zoneSql);
-
-        String tableSql = String.format(
-                "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(20)) ZONE 
%s ",
-                TEST_TABLE_NAME, TEST_ZONE_NAME
-        );
-
-        executeSql(tableSql);
-
-        ReplicationGroupId groupId = cluster.solePartitionId(TEST_ZONE_NAME, 
TEST_TABLE_NAME);
+        ReplicationGroupId groupId = cluster.solePartitionId(ZONE_NAME, 
TABLE_NAME);
 
         int primaryReplicaIndex = primaryReplicaIndex(groupId);
 
         // Stop two nodes that are neither primary replicas for the test table 
nor for the metastorage.
-        List<Integer> nodesToKill = cluster.aliveNodesWithIndices().stream()
-                .map(IgniteBiTuple::get1)
-                .filter(index -> index != primaryReplicaIndex && index != 0)
-                .limit(2)
-                .collect(toList());
+        int[] nodesToKill = nodeToKillIndexes(nodeIndex -> nodeIndex != 
primaryReplicaIndex && nodeIndex != 0, 2);
 
-        nodesToKill.parallelStream().forEach(cluster::stopNode);
+        Arrays.stream(nodesToKill).parallel().forEach(cluster::stopNode);
+
+        int tableSize = 100;
 
         // After the nodes have been stopped, insert some data and truncate 
the Raft log on primary replica.
-        executeSql(primaryReplicaIndex, String.format("INSERT INTO %s VALUES 
(1, 'one')", TEST_TABLE_NAME));
+        insertIntoTable(primaryReplicaIndex, TABLE_NAME, tableSize);
 
-        validateTableData(primaryReplicaIndex);
+        validateTableData(primaryReplicaIndex, tableSize);
 
         truncateLog(primaryReplicaIndex, groupId);
 
         // Start the nodes in parallel, Raft leader is expected to start 
streaming snapshots on them.
-        nodesToKill.parallelStream().forEach(cluster::startNode);
+        Arrays.stream(nodesToKill).parallel().forEach(cluster::startNode);
 
-        nodesToKill.forEach(this::validateTableData);
+        Arrays.stream(nodesToKill).forEach(nodeIndex -> 
validateTableData(nodeIndex, tableSize));
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26034";)
+    void testInstallRaftSnapshotAfterUpdateLowWatermark() {
+        updateLowWatermarkUpdateInterval(1_000);
+
+        assertThat(awaitUpdateLowWatermarkAsync(cluster.nodes()), 
willCompleteSuccessfully());
+
+        testSnapshotStreamingToMultipleNodes();
     }
 
     private int primaryReplicaIndex(ReplicationGroupId groupId) {
@@ -124,8 +130,8 @@ class ItParallelRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
         assertThat(truncateFuture, willCompleteSuccessfully());
     }
 
-    private void validateTableData(int nodeIndex) {
-        Table table = cluster.node(nodeIndex).tables().table(TEST_TABLE_NAME);
+    private void validateTableData(int nodeIndex, long expTableSize) {
+        Table table = cluster.node(nodeIndex).tables().table(TABLE_NAME);
 
         InternalTable internalTable = unwrapTableImpl(table).internalTable();
 
@@ -133,6 +139,63 @@ class ItParallelRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
 
         assertThat(mvPartition, is(notNullValue()));
 
-        await().until(mvPartition::estimatedSize, is(1L));
+        await().until(mvPartition::estimatedSize, is(expTableSize));
+    }
+
+    private void createZoneAndTable(String zoneName, String tableName, int 
partitions, int replicas) {
+        executeSql(String.format(
+                "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=%s, 
REPLICAS=%s",
+                zoneName, DEFAULT_STORAGE_PROFILE, partitions, replicas
+        ));
+
+        executeSql(String.format(
+                "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(256)) ZONE 
%s ",
+                tableName, zoneName
+        ));
+    }
+
+    private int[] nodeToKillIndexes(IntPredicate indexFilter, int limit) {
+        return cluster.aliveNodesWithIndices().stream()
+                .mapToInt(IgniteBiTuple::get1)
+                .filter(Objects::nonNull)
+                .filter(indexFilter)
+                .limit(limit)
+                .toArray();
+    }
+
+    private void insertIntoTable(int nodeIndex, String tableName, int count) {
+        String insertDml = String.format("INSERT INTO %s (key, val) VALUES (?, 
?)", tableName);
+
+        for (int i = 0; i < count; i++) {
+            executeSql(nodeIndex, insertDml, i, "n_" + count);
+        }
+    }
+
+    private void updateLowWatermarkUpdateInterval(long intervalMillis) {
+        IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode());
+
+        CompletableFuture<Void> updateConfigFuture = 
node.clusterConfiguration().getConfiguration(GcExtensionConfiguration.KEY)
+                .gc()
+                .lowWatermark()
+                .updateIntervalMillis()
+                .update(intervalMillis);
+
+        assertThat(updateConfigFuture, willCompleteSuccessfully());
+    }
+
+    private static CompletableFuture<Void> 
awaitUpdateLowWatermarkAsync(List<Ignite> nodes) {
+        List<CompletableFuture<Void>> futures = nodes.stream()
+                .map(TestWrappers::unwrapIgniteImpl)
+                .map(IgniteImpl::lowWatermark)
+                .map(lowWatermark -> {
+                    var future = new CompletableFuture<Void>();
+
+                    lowWatermark.listen(LOW_WATERMARK_CHANGED, fromConsumer(p 
-> future.complete(null)));
+
+                    return future;
+                })
+                .collect(toList());
+
+        return CompletableFutures.allOf(futures);
     }
 }

Reply via email to