This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 62e8494fda IGNITE-22050 Fix incorrect partId in reused pages (#3789) 62e8494fda is described below commit 62e8494fda57e2276849691cb10dcf2785bc7ad0 Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Mon May 20 14:35:05 2024 +0300 IGNITE-22050 Fix incorrect partId in reused pages (#3789) --- .../pagememory/datastructure/DataStructure.java | 18 ++++++++ .../internal/pagememory/freelist/PagesList.java | 2 + .../table/distributed/gc/GcStorageHandler.java | 3 +- .../ignite/internal/table/distributed/gc/MvGc.java | 10 ++++- .../gc/AbstractGcUpdateHandlerTest.java | 52 ++++++++++++++++++++++ 5 files changed, 81 insertions(+), 4 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java index a3c8970cc0..5007a62081 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java @@ -166,6 +166,10 @@ public abstract class DataStructure implements ManuallyCloseable { // Recycled. "pollFreePage" result should be reinitialized to move rotatedId to itemId. if (pageId != 0) { + // Replace the partition ID, because the reused page might have come from a different data structure if the reuse + // list is shared between them. + pageId = replacePartitionId(pageId); + pageId = reuseList.initRecycledPage(pageId, defaultPageFlag, null); } } @@ -183,6 +187,20 @@ public abstract class DataStructure implements ManuallyCloseable { return pageId; } + /** + * Replaces the "partition ID" part of the given page ID with the partition ID that his data structure is responsible for. + * + * @param pageId Original page ID. + * @return Page ID with replaced partition ID. + */ + private long replacePartitionId(long pageId) { + long partitionIdZeroMask = ~(PageIdUtils.PART_ID_MASK << PageIdUtils.PAGE_IDX_SIZE); + + long partitionIdMask = ((long) partId) << PageIdUtils.PAGE_IDX_SIZE; + + return pageId & partitionIdZeroMask | partitionIdMask; + } + /** * Allocates a new page. * diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java index 0979df874f..1393ff0f9a 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java @@ -1377,6 +1377,8 @@ public abstract class PagesList extends DataStructure { try { long pageAddr = pageMem.writeLock(grpId, pageId, page); + assert pageAddr != 0; + try { return initReusedPage(pageId, pageAddr, partitionId(pageId), flag, initIo); } finally { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java index 157a3b84d0..f2d8f74822 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.table.distributed.gc; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import org.apache.ignite.internal.hlc.HybridTimestamp; /** * Container for handling storage by the garbage collector. @@ -28,7 +27,7 @@ class GcStorageHandler { /** * Handler of multi-versioned partition storage and its indexes for garbage collection. * - * @see GcUpdateHandler#vacuumBatch(HybridTimestamp, int) + * @see GcUpdateHandler#vacuumBatch */ final GcUpdateHandler gcUpdateHandler; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java index 48ab7afefb..a52b8df005 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LO import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import java.util.concurrent.CompletableFuture; @@ -226,10 +227,13 @@ public class MvGc implements ManuallyCloseable { .thenApplyAsync(unused -> gcUpdateHandler.vacuumBatch(lowWatermark, gcConfig.value().batchSize(), true), executor) .whenComplete((isGarbageLeft, throwable) -> { if (throwable != null) { - if (throwable instanceof TrackerClosedException - || throwable.getCause() instanceof TrackerClosedException) { + if (unwrapCause(throwable) instanceof TrackerClosedException) { + LOG.debug("TrackerClosedException caught", throwable); + currentGcFuture.complete(null); } else { + LOG.error("Error when running GC", throwable); + currentGcFuture.completeExceptionally(throwable); } @@ -245,6 +249,8 @@ public class MvGc implements ManuallyCloseable { } }); } catch (Throwable t) { + LOG.error("Error when running GC", t); + currentGcFuture.completeExceptionally(t); } }); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java index 5f795adaaf..a4f8741a3a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.gc; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -32,6 +33,7 @@ import static org.mockito.Mockito.verify; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.IntStream; import org.apache.ignite.distributed.TestPartitionDataStorage; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; @@ -198,6 +200,56 @@ abstract class AbstractGcUpdateHandlerTest extends BaseMvStoragesTest { } } + /** + * Tests a particular scenario when some data is inserted into multiple partition storages, then removed by the GC and then inserted + * again. + */ + @Test + void testVacuumThenInsert() { + int numPartitions = 3; + + int numRows = 1000; + + IndexUpdateHandler indexUpdateHandler = createIndexUpdateHandler(); + + List<TestPartitionDataStorage> partitionStorages = IntStream.range(0, numPartitions) + .mapToObj(partId -> new TestPartitionDataStorage(TABLE_ID, partId, getOrCreateMvPartition(tableStorage, partId))) + .collect(toList()); + + List<GcUpdateHandler> gcUpdateHandlers = partitionStorages.stream() + .map(partitionStorage -> createGcUpdateHandler(partitionStorage, indexUpdateHandler)) + .collect(toList()); + + BinaryRow row = binaryRow(new TestKey(0, "key"), new TestValue(0, "value")); + + HybridTimestamp timestamp = clock.now(); + + for (int i = 0; i < numPartitions; i++) { + TestPartitionDataStorage storage = partitionStorages.get(i); + + for (int j = 0; j < numRows; j++) { + var rowId = new RowId(i); + + addWriteCommitted(storage, rowId, row, timestamp); + addWriteCommitted(storage, rowId, null, timestamp); + } + } + + for (GcUpdateHandler gcUpdateHandler : gcUpdateHandlers) { + gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, Integer.MAX_VALUE, true); + } + + for (int i = 0; i < numPartitions; i++) { + TestPartitionDataStorage storage = partitionStorages.get(i); + + for (int j = 0; j < numRows; j++) { + var rowId = new RowId(i); + + addWriteCommitted(storage, rowId, row, timestamp); + } + } + } + private TestPartitionDataStorage createPartitionDataStorage() { return new TestPartitionDataStorage(TABLE_ID, PARTITION_ID, getOrCreateMvPartition(tableStorage, PARTITION_ID)); }