This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26034 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit cdfea14aabc38ee3461fe6689308957d9d4eda6a Author: Kirill Tkalenko <[email protected]> AuthorDate: Wed Jul 30 14:18:32 2025 +0300 IGNITE-26034 wip --- .../raftsnapshot/ItParallelRaftSnapshotsTest.java | 2 - .../table/distributed/gc/GcStorageHandler.java | 9 +++++ .../ignite/internal/table/distributed/gc/MvGc.java | 28 ++++++++++++-- .../distributed/gc/StorageRemovedException.java | 22 +++++++++++ .../internal/table/distributed/gc/MvGcTest.java | 44 +++++++++++++++++++--- 5 files changed, 95 insertions(+), 10 deletions(-) diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java index 65c4ab30987..b395cbd0949 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java @@ -49,7 +49,6 @@ import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.table.Table; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest { @@ -95,7 +94,6 @@ class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-26034") void testInstallRaftSnapshotAfterUpdateLowWatermark() { updateLowWatermarkUpdateInterval(1_000); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java index f2d8f74822f..adfcecc9c07 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java @@ -39,6 +39,15 @@ class GcStorageHandler { */ final AtomicReference<CompletableFuture<Void>> gcInProgressFuture = new AtomicReference<>(); + /** + * Reference to the future waiting of reaching a partition safe time for an updated low watermark on garbage collection. + * + * <p>This future will also help to complete garbage collection when deleting a multi-version storage, since there may be a situation + * where updating the safe time is not possible. For example, due to replication via a raft snapshot, which will not update the safe + * time until the snapshot is installed, and for this it need to clean up the storage and remove it from garbage collection.</p> + */ + final AtomicReference<CompletableFuture<Void>> awaitSafeTimeFuture = new AtomicReference<>(); + GcStorageHandler(GcUpdateHandler gcUpdateHandler) { this.gcUpdateHandler = gcUpdateHandler; } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java index f8358b94486..b4631a83d14 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LO import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.ExceptionUtils.hasCause; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; @@ -154,6 +155,12 @@ public class MvGc implements ManuallyCloseable { return nullCompletedFuture(); } + CompletableFuture<Void> awaitSafeTimeFuture = removed.awaitSafeTimeFuture.get(); + + if (awaitSafeTimeFuture != null && !awaitSafeTimeFuture.isDone()) { + awaitSafeTimeFuture.completeExceptionally(new StorageRemovedException()); + } + CompletableFuture<Void> gcInProgressFuture = removed.gcInProgressFuture.get(); return gcInProgressFuture == null ? nullCompletedFuture() : gcInProgressFuture; @@ -183,7 +190,8 @@ public class MvGc implements ManuallyCloseable { private void scheduleGcForStorage(TablePartitionId tablePartitionId) { inBusyLock(() -> { - CompletableFuture<Void> currentGcFuture = new CompletableFuture<>(); + var currentGcFuture = new CompletableFuture<Void>(); + var currentAwaitSafeTimeFuture = new CompletableFuture<Void>(); GcStorageHandler storageHandler = storageHandlerByPartitionId.compute(tablePartitionId, (tablePartId, gcStorageHandler) -> { if (gcStorageHandler == null) { @@ -192,11 +200,16 @@ public class MvGc implements ManuallyCloseable { } CompletableFuture<Void> inProgressFuture = gcStorageHandler.gcInProgressFuture.get(); + CompletableFuture<Void> awaitProgressFuture = gcStorageHandler.awaitSafeTimeFuture.get(); if (inProgressFuture == null || inProgressFuture.isDone()) { boolean casResult = gcStorageHandler.gcInProgressFuture.compareAndSet(inProgressFuture, currentGcFuture); assert casResult : tablePartId; + + casResult = gcStorageHandler.awaitSafeTimeFuture.compareAndSet(awaitProgressFuture, currentAwaitSafeTimeFuture); + + assert casResult : tablePartId; } else { inProgressFuture.whenComplete((unused, throwable) -> scheduleGcForStorage(tablePartitionId)); } @@ -231,11 +244,20 @@ public class MvGc implements ManuallyCloseable { // We can only start garbage collection when the partition safe time is reached. gcUpdateHandler.getSafeTimeTracker() .waitFor(lowWatermark) + .whenComplete((unused, throwable) -> { + if (throwable == null) { + currentAwaitSafeTimeFuture.complete(null); + } else { + currentAwaitSafeTimeFuture.completeExceptionally(throwable); + } + }); + + currentAwaitSafeTimeFuture .thenApplyAsync(unused -> gcUpdateHandler.vacuumBatch(lowWatermark, gcConfig.value().batchSize(), true), executor) .whenComplete((isGarbageLeft, throwable) -> { if (throwable != null) { - if (unwrapCause(throwable) instanceof TrackerClosedException) { - LOG.debug("TrackerClosedException caught", throwable); + if (hasCause(throwable, TrackerClosedException.class, StorageRemovedException.class)) { + LOG.debug("Caught an expected exception", throwable); currentGcFuture.complete(null); } else { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java new file mode 100644 index 00000000000..4e1518e8e54 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/StorageRemovedException.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.gc; + +/** Special internal exception to notify futures about removal of multi-version storage from garbage collection. */ +class StorageRemovedException extends RuntimeException { +} diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java index f81911cfc02..3c1f6995756 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java @@ -29,9 +29,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -374,11 +376,27 @@ public class MvGcTest extends BaseIgniteAbstractTest { assertThat(invokeVacuumMethodFuture, willSucceedFast()); } + @Test + void testRemoveStorageWithSafeTimeUpdateStuck() { + var startAwaitSafeTimeFuture = new CompletableFuture<Void>(); + + GcUpdateHandler gcUpdateHandler = createWithSafeTimeUpdateStuck(startAwaitSafeTimeFuture); + TablePartitionId tablePartitionId = createTablePartitionId(); + + gc.addStorage(tablePartitionId, gcUpdateHandler); + + assertThat(lowWatermark.updateAndNotify(new HybridTimestamp(10, 10)), willCompleteSuccessfully()); + assertThat(startAwaitSafeTimeFuture, willCompleteSuccessfully()); + assertThat(gc.removeStorage(tablePartitionId), willCompleteSuccessfully()); + + verify(gcUpdateHandler, never()).vacuumBatch(any(), anyInt(), anyBoolean()); + } + private TablePartitionId createTablePartitionId() { return new TablePartitionId(nextTableId.getAndIncrement(), PARTITION_ID); } - private GcUpdateHandler createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable HybridTimestamp exp) { + private static GcUpdateHandler createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable HybridTimestamp exp) { GcUpdateHandler gcUpdateHandler = createGcUpdateHandler(); completeFutureOnVacuum(gcUpdateHandler, future, exp); @@ -386,7 +404,7 @@ public class MvGcTest extends BaseIgniteAbstractTest { return gcUpdateHandler; } - private void completeFutureOnVacuum( + private static void completeFutureOnVacuum( GcUpdateHandler gcUpdateHandler, CompletableFuture<Void> future, @Nullable HybridTimestamp exp @@ -408,7 +426,7 @@ public class MvGcTest extends BaseIgniteAbstractTest { }); } - private GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch latch) { + private static GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch latch) { GcUpdateHandler gcUpdateHandler = createGcUpdateHandler(); when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), eq(true))).then(invocation -> { @@ -420,7 +438,7 @@ public class MvGcTest extends BaseIgniteAbstractTest { return gcUpdateHandler; } - private GcUpdateHandler createWithWaitFinishVacuum(CompletableFuture<Void> startFuture, CompletableFuture<Void> finishFuture) { + private static GcUpdateHandler createWithWaitFinishVacuum(CompletableFuture<Void> startFuture, CompletableFuture<Void> finishFuture) { GcUpdateHandler gcUpdateHandler = createGcUpdateHandler(); when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), eq(true))).then(invocation -> { @@ -440,7 +458,7 @@ public class MvGcTest extends BaseIgniteAbstractTest { assertEquals(GarbageCollector.CLOSED_ERR, exception.code()); } - private GcUpdateHandler createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) { + private static GcUpdateHandler createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) { GcUpdateHandler gcUpdateHandler = createGcUpdateHandler(); when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(), eq(true))).then(invocation -> { @@ -460,4 +478,20 @@ public class MvGcTest extends BaseIgniteAbstractTest { return gcUpdateHandler; } + + private static GcUpdateHandler createWithSafeTimeUpdateStuck(CompletableFuture<Void> startAwaitSafeTimeFuture) { + GcUpdateHandler gcUpdateHandler = mock(GcUpdateHandler.class); + + PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker = mock(PendingComparableValuesTracker.class); + + when(safeTimeTracker.waitFor(any())).then(invocation -> { + startAwaitSafeTimeFuture.complete(null); + + return new CompletableFuture<Void>(); + }); + + when(gcUpdateHandler.getSafeTimeTracker()).thenReturn(safeTimeTracker); + + return gcUpdateHandler; + } }
