This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1bd321a85bf IGNITE-26034 Rebalancing via raft snapshot does not work 
after low watermark update (#6341)
1bd321a85bf is described below

commit 1bd321a85bf4eb3e0a21e6aa04ce16d3dbb2681b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jul 30 17:06:23 2025 +0300

    IGNITE-26034 Rebalancing via raft snapshot does not work after low 
watermark update (#6341)
---
 .../raftsnapshot/ItParallelRaftSnapshotsTest.java  |  2 -
 .../table/distributed/gc/GcStorageHandler.java     |  9 +++++
 .../ignite/internal/table/distributed/gc/MvGc.java | 29 ++++++++++++--
 .../distributed/gc/StorageRemovedException.java    | 22 +++++++++++
 .../internal/table/distributed/gc/MvGcTest.java    | 44 +++++++++++++++++++---
 5 files changed, 95 insertions(+), 11 deletions(-)

diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
index 65c4ab30987..b395cbd0949 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
@@ -49,7 +49,6 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.table.Table;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest {
@@ -95,7 +94,6 @@ class ItParallelRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26034";)
     void testInstallRaftSnapshotAfterUpdateLowWatermark() {
         updateLowWatermarkUpdateInterval(1_000);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
index f2d8f74822f..adfcecc9c07 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
@@ -39,6 +39,15 @@ class GcStorageHandler {
      */
     final AtomicReference<CompletableFuture<Void>> gcInProgressFuture = new 
AtomicReference<>();
 
+    /**
+     * Reference to the future waiting of reaching a partition safe time for 
an updated low watermark on garbage collection.
+     *
+     * <p>This future will also help to complete garbage collection when 
deleting a multi-version storage, since there may be a situation
+     * where updating the safe time is not possible. For example, due to 
replication via a raft snapshot, which will not update the safe
+     * time until the snapshot is installed, and for this it need to clean up 
the storage and remove it from garbage collection.</p>
+     */
+    final AtomicReference<CompletableFuture<Void>> awaitSafeTimeFuture = new 
AtomicReference<>();
+
     GcStorageHandler(GcUpdateHandler gcUpdateHandler) {
         this.gcUpdateHandler = gcUpdateHandler;
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index f8358b94486..c8bd08da27d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -22,7 +22,7 @@ import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LO
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
@@ -154,6 +154,12 @@ public class MvGc implements ManuallyCloseable {
                 return nullCompletedFuture();
             }
 
+            CompletableFuture<Void> awaitSafeTimeFuture = 
removed.awaitSafeTimeFuture.get();
+
+            if (awaitSafeTimeFuture != null && !awaitSafeTimeFuture.isDone()) {
+                awaitSafeTimeFuture.completeExceptionally(new 
StorageRemovedException());
+            }
+
             CompletableFuture<Void> gcInProgressFuture = 
removed.gcInProgressFuture.get();
 
             return gcInProgressFuture == null ? nullCompletedFuture() : 
gcInProgressFuture;
@@ -183,7 +189,8 @@ public class MvGc implements ManuallyCloseable {
 
     private void scheduleGcForStorage(TablePartitionId tablePartitionId) {
         inBusyLock(() -> {
-            CompletableFuture<Void> currentGcFuture = new 
CompletableFuture<>();
+            var currentGcFuture = new CompletableFuture<Void>();
+            var currentAwaitSafeTimeFuture = new CompletableFuture<Void>();
 
             GcStorageHandler storageHandler = 
storageHandlerByPartitionId.compute(tablePartitionId, (tablePartId, 
gcStorageHandler) -> {
                 if (gcStorageHandler == null) {
@@ -192,11 +199,16 @@ public class MvGc implements ManuallyCloseable {
                 }
 
                 CompletableFuture<Void> inProgressFuture = 
gcStorageHandler.gcInProgressFuture.get();
+                CompletableFuture<Void> awaitProgressFuture = 
gcStorageHandler.awaitSafeTimeFuture.get();
 
                 if (inProgressFuture == null || inProgressFuture.isDone()) {
                     boolean casResult = 
gcStorageHandler.gcInProgressFuture.compareAndSet(inProgressFuture, 
currentGcFuture);
 
                     assert casResult : tablePartId;
+
+                    casResult = 
gcStorageHandler.awaitSafeTimeFuture.compareAndSet(awaitProgressFuture, 
currentAwaitSafeTimeFuture);
+
+                    assert casResult : tablePartId;
                 } else {
                     inProgressFuture.whenComplete((unused, throwable) -> 
scheduleGcForStorage(tablePartitionId));
                 }
@@ -231,11 +243,20 @@ public class MvGc implements ManuallyCloseable {
                 // We can only start garbage collection when the partition 
safe time is reached.
                 gcUpdateHandler.getSafeTimeTracker()
                         .waitFor(lowWatermark)
+                        .whenComplete((unused, throwable) -> {
+                            if (throwable == null) {
+                                currentAwaitSafeTimeFuture.complete(null);
+                            } else {
+                                
currentAwaitSafeTimeFuture.completeExceptionally(throwable);
+                            }
+                        });
+
+                currentAwaitSafeTimeFuture
                         .thenApplyAsync(unused -> 
gcUpdateHandler.vacuumBatch(lowWatermark, gcConfig.value().batchSize(), true), 
executor)
                         .whenComplete((isGarbageLeft, throwable) -> {
                             if (throwable != null) {
-                                if (unwrapCause(throwable) instanceof 
TrackerClosedException) {
-                                    LOG.debug("TrackerClosedException caught", 
throwable);
+                                if (hasCause(throwable, 
TrackerClosedException.class, StorageRemovedException.class)) {
+                                    LOG.debug("Caught an expected exception", 
throwable);
 
                                     currentGcFuture.complete(null);
                                 } else {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java
new file mode 100644
index 00000000000..4e1518e8e54
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+/** Special internal exception to notify futures about removal of 
multi-version storage from garbage collection. */
+class StorageRemovedException extends RuntimeException {
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
index f81911cfc02..3c1f6995756 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -29,9 +29,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -374,11 +376,27 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         assertThat(invokeVacuumMethodFuture, willSucceedFast());
     }
 
+    @Test
+    void testRemoveStorageWithSafeTimeUpdateStuck() {
+        var startAwaitSafeTimeFuture = new CompletableFuture<Void>();
+
+        GcUpdateHandler gcUpdateHandler = 
createWithSafeTimeUpdateStuck(startAwaitSafeTimeFuture);
+        TablePartitionId tablePartitionId = createTablePartitionId();
+
+        gc.addStorage(tablePartitionId, gcUpdateHandler);
+
+        assertThat(lowWatermark.updateAndNotify(new HybridTimestamp(10, 10)), 
willCompleteSuccessfully());
+        assertThat(startAwaitSafeTimeFuture, willCompleteSuccessfully());
+        assertThat(gc.removeStorage(tablePartitionId), 
willCompleteSuccessfully());
+
+        verify(gcUpdateHandler, never()).vacuumBatch(any(), anyInt(), 
anyBoolean());
+    }
+
     private TablePartitionId createTablePartitionId() {
         return new TablePartitionId(nextTableId.getAndIncrement(), 
PARTITION_ID);
     }
 
-    private GcUpdateHandler 
createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable 
HybridTimestamp exp) {
+    private static GcUpdateHandler 
createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable 
HybridTimestamp exp) {
         GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
 
         completeFutureOnVacuum(gcUpdateHandler, future, exp);
@@ -386,7 +404,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         return gcUpdateHandler;
     }
 
-    private void completeFutureOnVacuum(
+    private static void completeFutureOnVacuum(
             GcUpdateHandler gcUpdateHandler,
             CompletableFuture<Void> future,
             @Nullable HybridTimestamp exp
@@ -408,7 +426,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         });
     }
 
-    private GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch latch) {
+    private static GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch 
latch) {
         GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
 
         when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), 
eq(true))).then(invocation -> {
@@ -420,7 +438,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         return gcUpdateHandler;
     }
 
-    private GcUpdateHandler createWithWaitFinishVacuum(CompletableFuture<Void> 
startFuture, CompletableFuture<Void> finishFuture) {
+    private static GcUpdateHandler 
createWithWaitFinishVacuum(CompletableFuture<Void> startFuture, 
CompletableFuture<Void> finishFuture) {
         GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
 
         when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), 
eq(true))).then(invocation -> {
@@ -440,7 +458,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
         assertEquals(GarbageCollector.CLOSED_ERR, exception.code());
     }
 
-    private GcUpdateHandler 
createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
+    private static GcUpdateHandler 
createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
         GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
 
         when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), 
eq(true))).then(invocation -> {
@@ -460,4 +478,20 @@ public class MvGcTest extends BaseIgniteAbstractTest {
 
         return gcUpdateHandler;
     }
+
+    private static GcUpdateHandler 
createWithSafeTimeUpdateStuck(CompletableFuture<Void> startAwaitSafeTimeFuture) 
{
+        GcUpdateHandler gcUpdateHandler = mock(GcUpdateHandler.class);
+
+        PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker 
= mock(PendingComparableValuesTracker.class);
+
+        when(safeTimeTracker.waitFor(any())).then(invocation -> {
+            startAwaitSafeTimeFuture.complete(null);
+
+            return new CompletableFuture<Void>();
+        });
+
+        when(gcUpdateHandler.getSafeTimeTracker()).thenReturn(safeTimeTracker);
+
+        return gcUpdateHandler;
+    }
 }

Reply via email to