This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new e1a4ded604 IGNITE-18968 Possible race between updating a low watermark and processing the last batch for storage in a background GC (#1760) e1a4ded604 is described below commit e1a4ded604c735209fa5188c8fe1da9a0559e406 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Mar 7 11:40:22 2023 +0300 IGNITE-18968 Possible race between updating a low watermark and processing the last batch for storage in a background GC (#1760) --- .../ignite/internal/table/distributed/gc/MvGc.java | 41 ++++++++++++---- .../internal/table/distributed/gc/MvGcTest.java | 54 +++++++++++++++++++++- 2 files changed, 86 insertions(+), 9 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java index 91b7867dec..a29f4e6d36 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.table.distributed.gc; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.thread.NamedThreadFactory.threadPrefix; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import java.util.concurrent.CompletableFuture; @@ -42,6 +43,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.lang.ErrorGroups.GarbageCollector; import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.TestOnly; /** * Garbage collector for multi-versioned storages and their indexes in the background. @@ -98,7 +100,7 @@ public class MvGc implements ManuallyCloseable { 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), - new NamedThreadFactory(nodeName, LOG) + new NamedThreadFactory(threadPrefix(nodeName, "mv-gc"), LOG) ); } @@ -193,17 +195,34 @@ public class MvGc implements ManuallyCloseable { private void scheduleGcForStorage(TablePartitionId tablePartitionId) { executor.submit(() -> inBusyLock(() -> { - GcStorageHandler storageHandler = storageHandlerByPartitionId.get(tablePartitionId); + CompletableFuture<Void> future = new CompletableFuture<>(); + + GcStorageHandler storageHandler = storageHandlerByPartitionId.compute(tablePartitionId, (tablePartId, gcStorageHandler) -> { + if (gcStorageHandler == null) { + // Storage has been removed from garbage collection. + return null; + } + + CompletableFuture<Void> inProgressFuture = gcStorageHandler.gcInProgressFuture.get(); + + if (inProgressFuture == null || inProgressFuture.isDone()) { + boolean casResult = gcStorageHandler.gcInProgressFuture.compareAndSet(inProgressFuture, future); + + assert casResult : tablePartId; + } else { + inProgressFuture.whenComplete((unused, throwable) -> scheduleGcForStorage(tablePartitionId)); + } + + return gcStorageHandler; + }); if (storageHandler == null) { // Storage has been removed from garbage collection. return; } - CompletableFuture<Void> future = new CompletableFuture<>(); - - if (!storageHandler.gcInProgressFuture.compareAndSet(null, future)) { - // In parallel, another task has already begun collecting garbage. + if (storageHandler.gcInProgressFuture.get() != future) { + // Someone in parallel is already collecting garbage, we will try once again after completion of gcInProgressFuture. return; } @@ -227,8 +246,6 @@ public class MvGc implements ManuallyCloseable { if (!future.isCompletedExceptionally()) { future.complete(null); } - - storageHandler.gcInProgressFuture.set(null); } scheduleGcForStorage(tablePartitionId); @@ -254,4 +271,12 @@ public class MvGc implements ManuallyCloseable { return null; }); } + + /** + * Schedule a new garbage collection for all storages. + */ + @TestOnly + void scheduleGcForAllStorages() { + inBusyLock(this::initNewGcBusy); + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java index 070d22f1b8..7df77e99f5 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.gc; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast; @@ -25,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -193,7 +195,7 @@ public class MvGcTest { gc.updateLowWatermark(new HybridTimestamp(2, 2)); - latch.await(1, TimeUnit.SECONDS); + assertTrue(latch.await(200, TimeUnit.MILLISECONDS)); } @Test @@ -296,6 +298,43 @@ public class MvGcTest { assertDoesNotThrow(gc::close); } + @Test + void testParallelUpdateLowWatermark( + @InjectConfiguration + TablesConfiguration tablesConfig + ) throws Exception { + // By default, in the tests we work in one thread, we don’t have enough this, we will add more. + assertThat(tablesConfig.gcThreads().update(Runtime.getRuntime().availableProcessors()), willCompleteSuccessfully()); + + gc.close(); + + gc = new MvGc("test", tablesConfig); + + gc.start(); + + gc.updateLowWatermark(new HybridTimestamp(1, 1)); + + for (int i = 0; i < 100; i++) { + CountDownLatch latch = new CountDownLatch(5); + + TablePartitionId tablePartitionId = createTablePartitionId(); + + gc.addStorage(tablePartitionId, createWithCountDownOnVacuumWithoutNextBatch(latch)); + + runRace( + () -> gc.scheduleGcForAllStorages(), + () -> gc.scheduleGcForAllStorages(), + () -> gc.scheduleGcForAllStorages(), + () -> gc.scheduleGcForAllStorages() + ); + + // We will check that we will call the vacuum on each update of the low watermark. + assertTrue(latch.await(200, TimeUnit.MILLISECONDS), "remaining=" + latch.getCount()); + + assertThat(gc.removeStorage(tablePartitionId), willCompleteSuccessfully()); + } + } + private TablePartitionId createTablePartitionId() { return new TablePartitionId(UUID.randomUUID(), PARTITION_ID); } @@ -361,4 +400,17 @@ public class MvGcTest { assertEquals(GarbageCollector.CLOSED_ERR, exception.code()); } + + private StorageUpdateHandler createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) { + StorageUpdateHandler storageUpdateHandler = mock(StorageUpdateHandler.class); + + when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation -> { + latch.countDown(); + + // So that there is no processing of the next batch. + return false; + }); + + return storageUpdateHandler; + } }