This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e1a4ded604 IGNITE-18968 Possible race between updating a low watermark 
and processing the last batch for storage in a background GC (#1760)
e1a4ded604 is described below

commit e1a4ded604c735209fa5188c8fe1da9a0559e406
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Tue Mar 7 11:40:22 2023 +0300

    IGNITE-18968 Possible race between updating a low watermark and processing 
the last batch for storage in a background GC (#1760)
---
 .../ignite/internal/table/distributed/gc/MvGc.java | 41 ++++++++++++----
 .../internal/table/distributed/gc/MvGcTest.java    | 54 +++++++++++++++++++++-
 2 files changed, 86 insertions(+), 9 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index 91b7867dec..a29f4e6d36 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.gc;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.thread.NamedThreadFactory.threadPrefix;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.util.concurrent.CompletableFuture;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.ErrorGroups.GarbageCollector;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Garbage collector for multi-versioned storages and their indexes in the 
background.
@@ -98,7 +100,7 @@ public class MvGc implements ManuallyCloseable {
                 30,
                 TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>(),
-                new NamedThreadFactory(nodeName, LOG)
+                new NamedThreadFactory(threadPrefix(nodeName, "mv-gc"), LOG)
         );
     }
 
@@ -193,17 +195,34 @@ public class MvGc implements ManuallyCloseable {
 
     private void scheduleGcForStorage(TablePartitionId tablePartitionId) {
         executor.submit(() -> inBusyLock(() -> {
-            GcStorageHandler storageHandler = 
storageHandlerByPartitionId.get(tablePartitionId);
+            CompletableFuture<Void> future = new CompletableFuture<>();
+
+            GcStorageHandler storageHandler = 
storageHandlerByPartitionId.compute(tablePartitionId, (tablePartId, 
gcStorageHandler) -> {
+                if (gcStorageHandler == null) {
+                    // Storage has been removed from garbage collection.
+                    return null;
+                }
+
+                CompletableFuture<Void> inProgressFuture = 
gcStorageHandler.gcInProgressFuture.get();
+
+                if (inProgressFuture == null || inProgressFuture.isDone()) {
+                    boolean casResult = 
gcStorageHandler.gcInProgressFuture.compareAndSet(inProgressFuture, future);
+
+                    assert casResult : tablePartId;
+                } else {
+                    inProgressFuture.whenComplete((unused, throwable) -> 
scheduleGcForStorage(tablePartitionId));
+                }
+
+                return gcStorageHandler;
+            });
 
             if (storageHandler == null) {
                 // Storage has been removed from garbage collection.
                 return;
             }
 
-            CompletableFuture<Void> future = new CompletableFuture<>();
-
-            if (!storageHandler.gcInProgressFuture.compareAndSet(null, 
future)) {
-                // In parallel, another task has already begun collecting 
garbage.
+            if (storageHandler.gcInProgressFuture.get() != future) {
+                // Someone in parallel is already collecting garbage, we will 
try once again after completion of gcInProgressFuture.
                 return;
             }
 
@@ -227,8 +246,6 @@ public class MvGc implements ManuallyCloseable {
                 if (!future.isCompletedExceptionally()) {
                     future.complete(null);
                 }
-
-                storageHandler.gcInProgressFuture.set(null);
             }
 
             scheduleGcForStorage(tablePartitionId);
@@ -254,4 +271,12 @@ public class MvGc implements ManuallyCloseable {
             return null;
         });
     }
+
+    /**
+     * Schedule a new garbage collection for all storages.
+     */
+    @TestOnly
+    void scheduleGcForAllStorages() {
+        inBusyLock(this::initNewGcBusy);
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
index 070d22f1b8..7df77e99f5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.gc;
 
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
@@ -25,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -193,7 +195,7 @@ public class MvGcTest {
 
         gc.updateLowWatermark(new HybridTimestamp(2, 2));
 
-        latch.await(1, TimeUnit.SECONDS);
+        assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
     }
 
     @Test
@@ -296,6 +298,43 @@ public class MvGcTest {
         assertDoesNotThrow(gc::close);
     }
 
+    @Test
+    void testParallelUpdateLowWatermark(
+            @InjectConfiguration
+            TablesConfiguration tablesConfig
+    ) throws Exception {
+        // By default, in the tests we work in one thread, we don’t have 
enough this, we will add more.
+        
assertThat(tablesConfig.gcThreads().update(Runtime.getRuntime().availableProcessors()),
 willCompleteSuccessfully());
+
+        gc.close();
+
+        gc = new MvGc("test", tablesConfig);
+
+        gc.start();
+
+        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+        for (int i = 0; i < 100; i++) {
+            CountDownLatch latch = new CountDownLatch(5);
+
+            TablePartitionId tablePartitionId = createTablePartitionId();
+
+            gc.addStorage(tablePartitionId, 
createWithCountDownOnVacuumWithoutNextBatch(latch));
+
+            runRace(
+                    () -> gc.scheduleGcForAllStorages(),
+                    () -> gc.scheduleGcForAllStorages(),
+                    () -> gc.scheduleGcForAllStorages(),
+                    () -> gc.scheduleGcForAllStorages()
+            );
+
+            // We will check that we will call the vacuum on each update of 
the low watermark.
+            assertTrue(latch.await(200, TimeUnit.MILLISECONDS), "remaining=" + 
latch.getCount());
+
+            assertThat(gc.removeStorage(tablePartitionId), 
willCompleteSuccessfully());
+        }
+    }
+
     private TablePartitionId createTablePartitionId() {
         return new TablePartitionId(UUID.randomUUID(), PARTITION_ID);
     }
@@ -361,4 +400,17 @@ public class MvGcTest {
 
         assertEquals(GarbageCollector.CLOSED_ERR, exception.code());
     }
+
+    private StorageUpdateHandler 
createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
+        StorageUpdateHandler storageUpdateHandler = 
mock(StorageUpdateHandler.class);
+
+        
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation 
-> {
+            latch.countDown();
+
+            // So that there is no processing of the next batch.
+            return false;
+        });
+
+        return storageUpdateHandler;
+    }
 }

Reply via email to