This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1bd321a85bf IGNITE-26034 Rebalancing via raft snapshot does not work
after low watermark update (#6341)
1bd321a85bf is described below
commit 1bd321a85bf4eb3e0a21e6aa04ce16d3dbb2681b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jul 30 17:06:23 2025 +0300
IGNITE-26034 Rebalancing via raft snapshot does not work after low
watermark update (#6341)
---
.../raftsnapshot/ItParallelRaftSnapshotsTest.java | 2 -
.../table/distributed/gc/GcStorageHandler.java | 9 +++++
.../ignite/internal/table/distributed/gc/MvGc.java | 29 ++++++++++++--
.../distributed/gc/StorageRemovedException.java | 22 +++++++++++
.../internal/table/distributed/gc/MvGcTest.java | 44 +++++++++++++++++++---
5 files changed, 95 insertions(+), 11 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
index 65c4ab30987..b395cbd0949 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java
@@ -49,7 +49,6 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.table.Table;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest {
@@ -95,7 +94,6 @@ class ItParallelRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26034")
void testInstallRaftSnapshotAfterUpdateLowWatermark() {
updateLowWatermarkUpdateInterval(1_000);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
index f2d8f74822f..adfcecc9c07 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
@@ -39,6 +39,15 @@ class GcStorageHandler {
*/
final AtomicReference<CompletableFuture<Void>> gcInProgressFuture = new
AtomicReference<>();
+ /**
+ * Reference to the future waiting of reaching a partition safe time for
an updated low watermark on garbage collection.
+ *
+ * <p>This future will also help to complete garbage collection when
deleting a multi-version storage, since there may be a situation
+ * where updating the safe time is not possible. For example, due to
replication via a raft snapshot, which will not update the safe
+ * time until the snapshot is installed, and for this it need to clean up
the storage and remove it from garbage collection.</p>
+ */
+ final AtomicReference<CompletableFuture<Void>> awaitSafeTimeFuture = new
AtomicReference<>();
+
GcStorageHandler(GcUpdateHandler gcUpdateHandler) {
this.gcUpdateHandler = gcUpdateHandler;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index f8358b94486..c8bd08da27d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -22,7 +22,7 @@ import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LO
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -154,6 +154,12 @@ public class MvGc implements ManuallyCloseable {
return nullCompletedFuture();
}
+ CompletableFuture<Void> awaitSafeTimeFuture =
removed.awaitSafeTimeFuture.get();
+
+ if (awaitSafeTimeFuture != null && !awaitSafeTimeFuture.isDone()) {
+ awaitSafeTimeFuture.completeExceptionally(new
StorageRemovedException());
+ }
+
CompletableFuture<Void> gcInProgressFuture =
removed.gcInProgressFuture.get();
return gcInProgressFuture == null ? nullCompletedFuture() :
gcInProgressFuture;
@@ -183,7 +189,8 @@ public class MvGc implements ManuallyCloseable {
private void scheduleGcForStorage(TablePartitionId tablePartitionId) {
inBusyLock(() -> {
- CompletableFuture<Void> currentGcFuture = new
CompletableFuture<>();
+ var currentGcFuture = new CompletableFuture<Void>();
+ var currentAwaitSafeTimeFuture = new CompletableFuture<Void>();
GcStorageHandler storageHandler =
storageHandlerByPartitionId.compute(tablePartitionId, (tablePartId,
gcStorageHandler) -> {
if (gcStorageHandler == null) {
@@ -192,11 +199,16 @@ public class MvGc implements ManuallyCloseable {
}
CompletableFuture<Void> inProgressFuture =
gcStorageHandler.gcInProgressFuture.get();
+ CompletableFuture<Void> awaitProgressFuture =
gcStorageHandler.awaitSafeTimeFuture.get();
if (inProgressFuture == null || inProgressFuture.isDone()) {
boolean casResult =
gcStorageHandler.gcInProgressFuture.compareAndSet(inProgressFuture,
currentGcFuture);
assert casResult : tablePartId;
+
+ casResult =
gcStorageHandler.awaitSafeTimeFuture.compareAndSet(awaitProgressFuture,
currentAwaitSafeTimeFuture);
+
+ assert casResult : tablePartId;
} else {
inProgressFuture.whenComplete((unused, throwable) ->
scheduleGcForStorage(tablePartitionId));
}
@@ -231,11 +243,20 @@ public class MvGc implements ManuallyCloseable {
// We can only start garbage collection when the partition
safe time is reached.
gcUpdateHandler.getSafeTimeTracker()
.waitFor(lowWatermark)
+ .whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ currentAwaitSafeTimeFuture.complete(null);
+ } else {
+
currentAwaitSafeTimeFuture.completeExceptionally(throwable);
+ }
+ });
+
+ currentAwaitSafeTimeFuture
.thenApplyAsync(unused ->
gcUpdateHandler.vacuumBatch(lowWatermark, gcConfig.value().batchSize(), true),
executor)
.whenComplete((isGarbageLeft, throwable) -> {
if (throwable != null) {
- if (unwrapCause(throwable) instanceof
TrackerClosedException) {
- LOG.debug("TrackerClosedException caught",
throwable);
+ if (hasCause(throwable,
TrackerClosedException.class, StorageRemovedException.class)) {
+ LOG.debug("Caught an expected exception",
throwable);
currentGcFuture.complete(null);
} else {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java
new file mode 100644
index 00000000000..4e1518e8e54
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+/** Special internal exception to notify futures about removal of
multi-version storage from garbage collection. */
+class StorageRemovedException extends RuntimeException {
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
index f81911cfc02..3c1f6995756 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -29,9 +29,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -374,11 +376,27 @@ public class MvGcTest extends BaseIgniteAbstractTest {
assertThat(invokeVacuumMethodFuture, willSucceedFast());
}
+ @Test
+ void testRemoveStorageWithSafeTimeUpdateStuck() {
+ var startAwaitSafeTimeFuture = new CompletableFuture<Void>();
+
+ GcUpdateHandler gcUpdateHandler =
createWithSafeTimeUpdateStuck(startAwaitSafeTimeFuture);
+ TablePartitionId tablePartitionId = createTablePartitionId();
+
+ gc.addStorage(tablePartitionId, gcUpdateHandler);
+
+ assertThat(lowWatermark.updateAndNotify(new HybridTimestamp(10, 10)),
willCompleteSuccessfully());
+ assertThat(startAwaitSafeTimeFuture, willCompleteSuccessfully());
+ assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
+
+ verify(gcUpdateHandler, never()).vacuumBatch(any(), anyInt(),
anyBoolean());
+ }
+
private TablePartitionId createTablePartitionId() {
return new TablePartitionId(nextTableId.getAndIncrement(),
PARTITION_ID);
}
- private GcUpdateHandler
createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable
HybridTimestamp exp) {
+ private static GcUpdateHandler
createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable
HybridTimestamp exp) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
completeFutureOnVacuum(gcUpdateHandler, future, exp);
@@ -386,7 +404,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
return gcUpdateHandler;
}
- private void completeFutureOnVacuum(
+ private static void completeFutureOnVacuum(
GcUpdateHandler gcUpdateHandler,
CompletableFuture<Void> future,
@Nullable HybridTimestamp exp
@@ -408,7 +426,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
});
}
- private GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch latch) {
+ private static GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch
latch) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(),
eq(true))).then(invocation -> {
@@ -420,7 +438,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
return gcUpdateHandler;
}
- private GcUpdateHandler createWithWaitFinishVacuum(CompletableFuture<Void>
startFuture, CompletableFuture<Void> finishFuture) {
+ private static GcUpdateHandler
createWithWaitFinishVacuum(CompletableFuture<Void> startFuture,
CompletableFuture<Void> finishFuture) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(),
eq(true))).then(invocation -> {
@@ -440,7 +458,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
assertEquals(GarbageCollector.CLOSED_ERR, exception.code());
}
- private GcUpdateHandler
createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
+ private static GcUpdateHandler
createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(),
eq(true))).then(invocation -> {
@@ -460,4 +478,20 @@ public class MvGcTest extends BaseIgniteAbstractTest {
return gcUpdateHandler;
}
+
+ private static GcUpdateHandler
createWithSafeTimeUpdateStuck(CompletableFuture<Void> startAwaitSafeTimeFuture)
{
+ GcUpdateHandler gcUpdateHandler = mock(GcUpdateHandler.class);
+
+ PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker
= mock(PendingComparableValuesTracker.class);
+
+ when(safeTimeTracker.waitFor(any())).then(invocation -> {
+ startAwaitSafeTimeFuture.complete(null);
+
+ return new CompletableFuture<Void>();
+ });
+
+ when(gcUpdateHandler.getSafeTimeTracker()).thenReturn(safeTimeTracker);
+
+ return gcUpdateHandler;
+ }
}