This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b74522ac677 IGNITE-26315 Fix race between increasing partition
generation and performing write operations to it (#6502)
b74522ac677 is described below
commit b74522ac6778fa3e74e964327ad8a015124bc152
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Aug 29 14:52:07 2025 +0300
IGNITE-26315 Fix race between increasing partition generation and
performing write operations to it (#6502)
---
.../ItBplusTreePersistentPageMemoryTest.java | 4 +-
...BplusTreeReuseListPersistentPageMemoryTest.java | 4 +-
.../PartitionDestructionLockManager.java | 44 +++++++++
.../persistence/PartitionMetaManager.java | 2 +-
.../persistence/PartitionProcessingCounterMap.java | 84 -----------------
.../persistence/PersistentPageMemory.java | 12 ++-
.../persistence/checkpoint/CheckpointManager.java | 30 +++---
.../persistence/checkpoint/CheckpointPages.java | 50 ----------
.../checkpoint/CheckpointPagesWriter.java | 35 +++++--
.../checkpoint/CheckpointPagesWriterFactory.java | 11 ++-
.../checkpoint/CheckpointProgressImpl.java | 71 --------------
.../persistence/checkpoint/Checkpointer.java | 54 ++++-------
.../persistence/compaction/Compactor.java | 35 +++----
.../replacement/DelayedDirtyPageWrite.java | 15 ++-
.../replacement/DelayedPageReplacementTracker.java | 10 +-
.../PartitionProcessingCounterMapTest.java | 79 ----------------
.../checkpoint/CheckpointPagesWriterTest.java | 10 +-
.../checkpoint/CheckpointProgressImplTest.java | 48 ----------
.../persistence/checkpoint/CheckpointerTest.java | 102 ++++++++-------------
.../persistence/compaction/CompactorTest.java | 66 +++++--------
.../replacement/AbstractPageReplacementTest.java | 3 +-
.../throttling/PageMemoryThrottlingTest.java | 3 +-
.../pagememory/PersistentPageMemoryDataRegion.java | 3 +-
.../PersistentPageMemoryTableStorage.java | 20 +++-
.../pagememory/PersistentPageMemoryNoLoadTest.java | 6 +-
25 files changed, 256 insertions(+), 545 deletions(-)
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
index 7bb5761eb51..01f6be69a70 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.persistence.PageHeader;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
import
org.apache.ignite.internal.pagememory.persistence.TestPageReadWriteManager;
@@ -75,7 +76,8 @@ public class ItBplusTreePersistentPageMemoryTest extends
AbstractBplusTreePageMe
(fullPageId, buf, tag) -> {
},
mockCheckpointTimeoutLock(true),
- wrapLock(offheapReadWriteLock)
+ wrapLock(offheapReadWriteLock),
+ new PartitionDestructionLockManager()
);
}
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
index cc3f96239b6..f27fe2e0059 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.persistence.PageHeader;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
import
org.apache.ignite.internal.pagememory.persistence.TestPageReadWriteManager;
@@ -61,7 +62,8 @@ public class ItBplusTreeReuseListPersistentPageMemoryTest
extends AbstractBplusT
(fullPageId, buf, tag) -> {
},
mockCheckpointTimeoutLock(true),
- wrapLock(new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL))
+ wrapLock(new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)),
+ new PartitionDestructionLockManager()
);
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionDestructionLockManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionDestructionLockManager.java
new file mode 100644
index 00000000000..79fc2858bd5
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionDestructionLockManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** Partition Destruction Lock Manager. */
+// TODO: IGNITE-26339 Make partition generation increase non-blocking
+public class PartitionDestructionLockManager {
+ private final Map<GroupPartitionId, ReentrantReadWriteLock>
lockByPartitionId = new ConcurrentHashMap<>();
+
+ /**
+ * Returns a lock to synchronize between partition destruction and write
operations to it.
+ *
+ * <p>For partition write operations you need to use {@link
ReadWriteLock#readLock()} and for partition destruction
+ * {@link ReadWriteLock#writeLock()}.</p>
+ */
+ public ReadWriteLock destructionLock(GroupPartitionId groupPartitionId) {
+ return lockByPartitionId.computeIfAbsent(groupPartitionId, unused ->
new ReentrantReadWriteLock());
+ }
+
+ /** Removes all locks for a group. */
+ public void removeLockForGroup(int groupId) {
+ lockByPartitionId.entrySet().removeIf(e -> e.getKey().getGroupId() ==
groupId);
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
index 3db2224454a..5cdb733af75 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManager.java
@@ -65,7 +65,7 @@ public class PartitionMetaManager {
}
/**
- * Returns the partition's meta information.
+ * Returns the partition's meta information, {@code null} if not yet added
or was removed when the partition was destroyed.
*
* @param groupPartitionId Partition of the group.
*/
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
deleted file mode 100644
index d0369054794..00000000000
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Helper class for thread-safe work with {@link PartitionProcessingCounter}
for any partition of any group.
- */
-public class PartitionProcessingCounterMap {
- private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter>
processedPartitions = new ConcurrentHashMap<>();
-
- /**
- * Atomically increments the partition processing counter.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- public void incrementPartitionProcessingCounter(GroupPartitionId
groupPartitionId) {
- processedPartitions.compute(groupPartitionId, (id,
partitionProcessingCounter) -> {
- if (partitionProcessingCounter == null) {
- PartitionProcessingCounter counter = new
PartitionProcessingCounter();
-
- counter.incrementPartitionProcessingCounter();
-
- return counter;
- }
-
- partitionProcessingCounter.incrementPartitionProcessingCounter();
-
- return partitionProcessingCounter;
- });
- }
-
- /**
- * Atomically decrements the partition processing counter.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- public void decrementPartitionProcessingCounter(GroupPartitionId
groupPartitionId) {
- processedPartitions.compute(groupPartitionId, (id,
partitionProcessingCounter) -> {
- assert partitionProcessingCounter != null : id;
- assert !partitionProcessingCounter.future().isDone() : id;
-
- partitionProcessingCounter.decrementPartitionProcessingCounter();
-
- return partitionProcessingCounter.future().isDone() ? null :
partitionProcessingCounter;
- });
- }
-
- /**
- * Returns the future if the partition according to the given parameters
is currently being processed, for example, dirty pages are
- * being written or fsync is being done, {@code null} if the partition is
not currently being processed.
- *
- * <p>Future will be added on {@link
#incrementPartitionProcessingCounter(GroupPartitionId)} call and completed on
- * {@link #incrementPartitionProcessingCounter(GroupPartitionId)} call
(equal to the number of
- * {@link #decrementPartitionProcessingCounter(GroupPartitionId)} calls).
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- @Nullable
- public CompletableFuture<Void>
getProcessedPartitionFuture(GroupPartitionId groupPartitionId) {
- PartitionProcessingCounter partitionProcessingCounter =
processedPartitions.get(groupPartitionId);
-
- return partitionProcessingCounter == null ? null :
partitionProcessingCounter.future();
- }
-}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index fe0a6b8b1da..80741dc7d0f 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -225,6 +225,7 @@ public class PersistentPageMemory implements PageMemory {
* @param flushDirtyPageForReplacement Write callback invoked when a dirty
page is removed for replacement.
* @param checkpointTimeoutLock Checkpoint timeout lock.
* @param rwLock Read-write lock for pages.
+ * @param partitionDestructionLockManager Partition Destruction Lock
Manager.
*/
public PersistentPageMemory(
PersistentDataRegionConfiguration dataRegionConfiguration,
@@ -235,7 +236,8 @@ public class PersistentPageMemory implements PageMemory {
PageReadWriteManager pageStoreManager,
WriteDirtyPage flushDirtyPageForReplacement,
CheckpointTimeoutLock checkpointTimeoutLock,
- OffheapReadWriteLock rwLock
+ OffheapReadWriteLock rwLock,
+ PartitionDestructionLockManager partitionDestructionLockManager
) {
this.dataRegionConfiguration = dataRegionConfiguration;
this.metricSource = metricSource;
@@ -272,7 +274,13 @@ public class PersistentPageMemory implements PageMemory {
throw new IgniteInternalException("Unexpected page replacement
mode: " + replacementMode);
}
- delayedPageReplacementTracker = new
DelayedPageReplacementTracker(pageSize, flushDirtyPageForReplacement, LOG,
sizes.length - 1);
+ delayedPageReplacementTracker = new DelayedPageReplacementTracker(
+ pageSize,
+ flushDirtyPageForReplacement,
+ LOG,
+ sizes.length - 1,
+ partitionDestructionLockManager
+ );
this.writeThrottle = null;
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index fe973622617..acbe84a2f13 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
@@ -84,6 +85,8 @@ public class CheckpointManager {
/** Delta file compactor. */
private final Compactor compactor;
+ private final PartitionDestructionLockManager
partitionDestructionLockManager;
+
/**
* Constructor.
*
@@ -133,11 +136,14 @@ public class CheckpointManager {
checkpointConfig.checkpointThreads()
);
+ partitionDestructionLockManager = new
PartitionDestructionLockManager();
+
checkpointPagesWriterFactory = new CheckpointPagesWriterFactory(
this::writePageToFilePageStore,
ioRegistry,
partitionMetaManager,
- pageSize
+ pageSize,
+ partitionDestructionLockManager
);
compactor = new Compactor(
@@ -146,7 +152,8 @@ public class CheckpointManager {
checkpointConfig.compactionThreads(),
filePageStoreManager,
pageSize,
- failureManager
+ failureManager,
+ partitionDestructionLockManager
);
checkpointer = new Checkpointer(
@@ -160,7 +167,8 @@ public class CheckpointManager {
compactor,
pageSize,
checkpointConfig,
- logSyncer
+ logSyncer,
+ partitionDestructionLockManager
);
checkpointTimeoutLock = new CheckpointTimeoutLock(
@@ -395,18 +403,8 @@ public class CheckpointManager {
compactor.triggerCompaction();
}
- /**
- * Callback on destruction of the partition of the corresponding group.
- *
- * <p>Prepares the checkpointer and compactor for partition destruction.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @return Future that will complete when the callback completes.
- */
- public CompletableFuture<Void> onPartitionDestruction(GroupPartitionId
groupPartitionId) {
- return CompletableFuture.allOf(
- checkpointer.prepareToDestroyPartition(groupPartitionId),
- compactor.prepareToDestroyPartition(groupPartitionId)
- );
+ /** Partition Destruction Lock Manager. */
+ public PartitionDestructionLockManager partitionDestructionLockManager() {
+ return partitionDestructionLockManager;
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
index 01c2e69ad9e..60be59913dc 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -25,10 +25,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
-import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.jetbrains.annotations.Nullable;
/**
@@ -148,51 +145,4 @@ public class CheckpointPages {
public void unblockFsyncOnPageReplacement(DirtyFullPageId pageId,
@Nullable Throwable error) {
checkpointProgress.unblockFsyncOnPageReplacement(pageId, error);
}
-
- /**
- * Blocks physical destruction of partition.
- *
- * <p>When the intention to destroy partition appears, {@link
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
- * {@link PersistentPageMemory#invalidate(int, int)} invoked at the
beginning. And if there is a block, it waits for unblocking.
- * Then it destroys the partition, {@link
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
- *
- * <p>It is recommended to use where physical destruction of the partition
may have an impact, for example when writing dirty pages and
- * executing a fsync.</p>
- *
- * <p>To make sure that we can physically do something with the partition
during a block, we will need to use approximately the
- * following code:</p>
- * <pre><code>
- * checkpointProgress.blockPartitionDestruction(partitionId);
- *
- * try {
- * FilePageStore pageStore =
FilePageStoreManager#getStore(partitionId);
- *
- * if (pageStore == null || pageStore.isMarkedToDestroy()) {
- * return;
- * }
- *
- * someAction(pageStore);
- * } finally {
- * checkpointProgress.unblockPartitionDestruction(partitionId);
- * }
- * </code></pre>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #unblockPartitionDestruction(GroupPartitionId)
- */
- public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
- checkpointProgress.blockPartitionDestruction(groupPartitionId);
- }
-
- /**
- * Unblocks physical destruction of partition.
- *
- * <p>As soon as the last thread makes an unlock, the physical destruction
of the partition can immediately begin.</p>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #blockPartitionDestruction(GroupPartitionId)
- */
- public void unblockPartitionDestruction(GroupPartitionId groupPartitionId)
{
- checkpointProgress.unblockPartitionDestruction(groupPartitionId);
- }
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index e0746d46c18..ebfdeb867f5 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -44,6 +45,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.PartitionMetaSnapshot;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
@@ -118,6 +120,8 @@ public class CheckpointPagesWriter implements Runnable {
/** Shutdown now. */
private final BooleanSupplier shutdownNow;
+ private final PartitionDestructionLockManager
partitionDestructionLockManager;
+
/**
* Creates task for write pages.
*
@@ -133,6 +137,7 @@ public class CheckpointPagesWriter implements Runnable {
* @param ioRegistry Page IO registry.
* @param partitionMetaManager Partition meta information manager.
* @param shutdownNow Shutdown supplier.
+ * @param partitionDestructionLockManager Partition Destruction Lock
Manager.
*/
CheckpointPagesWriter(
CheckpointMetricsTracker tracker,
@@ -146,7 +151,8 @@ public class CheckpointPagesWriter implements Runnable {
WriteDirtyPage pageWriter,
PageIoRegistry ioRegistry,
PartitionMetaManager partitionMetaManager,
- BooleanSupplier shutdownNow
+ BooleanSupplier shutdownNow,
+ PartitionDestructionLockManager partitionDestructionLockManager
) {
this.tracker = tracker;
this.dirtyPartitionQueue = dirtyPartitionQueue;
@@ -160,6 +166,7 @@ public class CheckpointPagesWriter implements Runnable {
this.ioRegistry = ioRegistry;
this.partitionMetaManager = partitionMetaManager;
this.shutdownNow = shutdownNow;
+ this.partitionDestructionLockManager = partitionDestructionLockManager;
}
@Override
@@ -203,7 +210,9 @@ public class CheckpointPagesWriter implements Runnable {
) throws IgniteInternalCheckedException {
CheckpointDirtyPagesView checkpointDirtyPagesView =
checkpointDirtyPagesView(pageMemory, partitionId);
- checkpointProgress.blockPartitionDestruction(partitionId);
+ Lock partitionDestructionLock =
partitionDestructionLockManager.destructionLock(partitionId).readLock();
+
+ partitionDestructionLock.lock();
try {
addUpdatePartitionCounterIfAbsent(partitionId);
@@ -224,7 +233,7 @@ public class CheckpointPagesWriter implements Runnable {
writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageStoreWriter, true);
}
} finally {
- checkpointProgress.unblockPartitionDestruction(partitionId);
+ partitionDestructionLock.unlock();
}
}
@@ -267,6 +276,8 @@ public class CheckpointPagesWriter implements Runnable {
GroupPartitionId partitionId = null;
+ Lock partitionDestructionLock = null;
+
try {
for (DirtyFullPageId pageId : entry.getValue()) {
if (shutdownNow.getAsBoolean()) {
@@ -276,20 +287,22 @@ public class CheckpointPagesWriter implements Runnable {
updateHeartbeat.run();
if (partitionIdChanged(partitionId, pageId)) {
- if (partitionId != null) {
-
checkpointProgress.unblockPartitionDestruction(partitionId);
+ if (partitionDestructionLock != null) {
+ partitionDestructionLock.unlock();
}
partitionId = GroupPartitionId.convert(pageId);
-
checkpointProgress.blockPartitionDestruction(partitionId);
+ partitionDestructionLock =
partitionDestructionLockManager.destructionLock(partitionId).readLock();
+
+ partitionDestructionLock.lock();
}
writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageStoreWriter, useTryWriteLockOnPage);
}
} finally {
- if (partitionId != null) {
-
checkpointProgress.unblockPartitionDestruction(partitionId);
+ if (partitionDestructionLock != null) {
+ partitionDestructionLock.unlock();
}
}
}
@@ -333,14 +346,16 @@ public class CheckpointPagesWriter implements Runnable {
GroupPartitionId partitionId =
GroupPartitionId.convert(cpPageId);
- checkpointProgress.blockPartitionDestruction(partitionId);
+ Lock partitionDestructionLock =
partitionDestructionLockManager.destructionLock(partitionId).readLock();
+
+ partitionDestructionLock.lock();
try {
addUpdatePartitionCounterIfAbsent(partitionId);
pageMemory.checkpointWritePage(cpPageId,
tmpWriteBuf.rewind(), pageStoreWriter, tracker, true);
} finally {
-
checkpointProgress.unblockPartitionDestruction(partitionId);
+ partitionDestructionLock.unlock();
}
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index c0c05e5ca16..d003ab8ff87 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
@@ -49,6 +50,8 @@ public class CheckpointPagesWriterFactory {
/** Partition meta information manager. */
private final PartitionMetaManager partitionMetaManager;
+ private final PartitionDestructionLockManager
partitionDestructionLockManager;
+
/**
* Constructor.
*
@@ -56,17 +59,20 @@ public class CheckpointPagesWriterFactory {
* @param ioRegistry Page IO registry.
* @param partitionMetaManager Partition meta information manager.
* @param pageSize Page size in bytes.
+ * @param partitionDestructionLockManager Partition Destruction Lock
Manager.
*/
CheckpointPagesWriterFactory(
WriteDirtyPage dirtyPageWriter,
PageIoRegistry ioRegistry,
PartitionMetaManager partitionMetaManager,
// TODO: IGNITE-17017 Move to common config
- int pageSize
+ int pageSize,
+ PartitionDestructionLockManager partitionDestructionLockManager
) {
this.dirtyPageWriter = dirtyPageWriter;
this.ioRegistry = ioRegistry;
this.partitionMetaManager = partitionMetaManager;
+ this.partitionDestructionLockManager = partitionDestructionLockManager;
threadBuf = ThreadLocal.withInitial(() -> {
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize);
@@ -112,7 +118,8 @@ public class CheckpointPagesWriterFactory {
dirtyPageWriter,
ioRegistry,
partitionMetaManager,
- shutdownNow
+ shutdownNow,
+ partitionDestructionLockManager
);
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index ea77ba217fe..fdd160b2b17 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -29,11 +29,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
-import
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
-import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
-import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.jetbrains.annotations.Nullable;
/**
@@ -75,9 +70,6 @@ public class CheckpointProgressImpl implements
CheckpointProgress {
/** Sorted dirty pages to be written on the checkpoint. */
private volatile @Nullable CheckpointDirtyPages pageToWrite;
- /** Partitions currently being processed, for example, writing dirty pages
or doing fsync. */
- private final PartitionProcessingCounterMap processedPartitionMap = new
PartitionProcessingCounterMap();
-
/** Assistant for synchronizing page replacement and fsync phase. */
private final CheckpointPageReplacement checkpointPageReplacement = new
CheckpointPageReplacement();
@@ -305,69 +297,6 @@ public class CheckpointProgressImpl implements
CheckpointProgress {
this.pageToWrite = pageToWrite;
}
- /**
- * Blocks physical destruction of partition.
- *
- * <p>When the intention to destroy partition appears, {@link
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
- * {@link PersistentPageMemory#invalidate(int, int)} invoked at the
beginning. And if there is a block, it waits for unblocking.
- * Then it destroys the partition, {@link
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
- *
- * <p>It is recommended to use where physical destruction of the partition
may have an impact, for example when writing dirty pages and
- * executing a fsync.</p>
- *
- * <p>To make sure that we can physically do something with the partition
during a block, we will need to use approximately the
- * following code:</p>
- * <pre><code>
- * checkpointProgress.blockPartitionDestruction(partitionId);
- *
- * try {
- * FilePageStore pageStore =
FilePageStoreManager#getStore(partitionId);
- *
- * if (pageStore == null || pageStore.isMarkedToDestroy()) {
- * return;
- * }
- *
- * someAction(pageStore);
- * } finally {
- * checkpointProgress.unblockPartitionDestruction(partitionId);
- * }
- * </code></pre>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #unblockPartitionDestruction(GroupPartitionId)
- * @see #getUnblockPartitionDestructionFuture(GroupPartitionId)
- */
- public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
-
processedPartitionMap.incrementPartitionProcessingCounter(groupPartitionId);
- }
-
- /**
- * Unblocks physical destruction of partition.
- *
- * <p>As soon as the last thread makes an unlock, the physical destruction
of the partition can immediately begin.</p>
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @see #blockPartitionDestruction(GroupPartitionId)
- * @see #getUnblockPartitionDestructionFuture(GroupPartitionId)
- */
- public void unblockPartitionDestruction(GroupPartitionId groupPartitionId)
{
-
processedPartitionMap.decrementPartitionProcessingCounter(groupPartitionId);
- }
-
- /**
- * Returns the future if the partition according to the given parameters
is currently being blocked, for example, dirty pages are
- * being written or fsync is being done, {@code null} if the partition is
not currently being blocked.
- *
- * <p>Future will be added on {@link
#blockPartitionDestruction(GroupPartitionId)} call and completed on
- * {@link #unblockPartitionDestruction(GroupPartitionId)} call (equal to
the number of
- * {@link #unblockPartitionDestruction(GroupPartitionId)} calls).
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- */
- public @Nullable CompletableFuture<Void>
getUnblockPartitionDestructionFuture(GroupPartitionId groupPartitionId) {
- return
processedPartitionMap.getProcessedPartitionFuture(groupPartitionId);
- }
-
/**
* Block the start of the fsync phase at a checkpoint before replacing the
page.
*
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 463866fd26d..df0d9062170 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -28,7 +28,6 @@ import static
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMI
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SNAPSHOT_TAKEN;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -46,6 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.DataRegion;
import
org.apache.ignite.internal.pagememory.configuration.CheckpointConfiguration;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -183,6 +184,8 @@ public class Checkpointer extends IgniteWorker {
private final LogSyncer logSyncer;
+ private final PartitionDestructionLockManager
partitionDestructionLockManager;
+
/**
* Constructor.
*
@@ -196,6 +199,7 @@ public class Checkpointer extends IgniteWorker {
* @param pageSize Page size.
* @param checkpointConfig Checkpoint configuration.
* @param logSyncer Write-ahead log synchronizer.
+ * @param partitionDestructionLockManager Partition Destruction Lock
Manager.
*/
Checkpointer(
String igniteInstanceName,
@@ -208,7 +212,8 @@ public class Checkpointer extends IgniteWorker {
Compactor compactor,
int pageSize,
CheckpointConfiguration checkpointConfig,
- LogSyncer logSyncer
+ LogSyncer logSyncer,
+ PartitionDestructionLockManager partitionDestructionLockManager
) {
super(LOG, igniteInstanceName, "checkpoint-thread");
@@ -222,6 +227,7 @@ public class Checkpointer extends IgniteWorker {
this.failureManager = failureManager;
this.logSyncer = logSyncer;
this.partitionMetaManager = partitionMetaManager;
+ this.partitionDestructionLockManager = partitionDestructionLockManager;
scheduledCheckpointProgress = new
CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
@@ -612,27 +618,29 @@ public class Checkpointer extends IgniteWorker {
return;
}
- currentCheckpointProgress.blockPartitionDestruction(partitionId);
-
- try {
- fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore);
-
- fsyncFilePageStoreOnCheckpointThread(filePageStore);
+ Lock partitionDestructionLock =
partitionDestructionLockManager.destructionLock(partitionId).readLock();
- renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
+ partitionDestructionLock.lock();
- // TODO: IGNITE-26315 Deal with partition deletion blocking on
checkpoint
+ try {
PartitionMeta meta = partitionMetaManager.getMeta(partitionId);
+ // If this happens, then the partition is destroyed.
if (meta == null) {
return;
}
+ fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore);
+
+ fsyncFilePageStoreOnCheckpointThread(filePageStore);
+
+ renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
+
filePageStore.checkpointedPageCount(meta.metaSnapshot(currentCheckpointProgress.id()).pageCount());
currentCheckpointProgress.syncedPagesCounter().addAndGet(pagesWritten.intValue());
} finally {
- currentCheckpointProgress.unblockPartitionDestruction(partitionId);
+ partitionDestructionLock.unlock();
}
}
@@ -929,30 +937,6 @@ public class Checkpointer extends IgniteWorker {
afterReleaseWriteLockCheckpointProgress = currentCheckpointProgress;
}
- /**
- * Prepares the checkpointer to destroy a partition.
- *
- * <p>If the checkpoint is in progress, then wait until it finishes
processing the partition that we are going to destroy, in order to
- * prevent the situation when we want to destroy the partition file along
with its delta files, and at this time the checkpoint performs
- * I/O operations on them.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @return Future that will end when the checkpoint is ready to destroy
the partition.
- */
- CompletableFuture<Void> prepareToDestroyPartition(GroupPartitionId
groupPartitionId) {
- CheckpointProgressImpl currentCheckpointProgress =
this.currentCheckpointProgress;
-
- // If the checkpoint starts after this line, then the data region will
already know that we want to destroy the partition, and when
- // reading the page for writing to the delta file, we will receive an
"outdated" page that we will not write to disk.
- if (currentCheckpointProgress == null ||
!currentCheckpointProgress.inProgress()) {
- return nullCompletedFuture();
- }
-
- CompletableFuture<Void> processedPartitionFuture =
currentCheckpointProgress.getUnblockPartitionDestructionFuture(groupPartitionId);
-
- return processedPartitionFuture == null ? nullCompletedFuture() :
processedPartitionFuture;
- }
-
private void replicatorLogSync(CheckpointMetricsTracker tracker) throws
IgniteInternalCheckedException {
try {
tracker.onReplicatorLogSyncStart();
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index e708f55bad6..773d7901d08 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -22,7 +22,6 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toCollection;
import static
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -34,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -41,7 +41,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
-import
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.WriteSpeedFormatter;
import
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
@@ -84,15 +84,14 @@ public class Compactor extends IgniteWorker {
/** Thread local with buffers for the compaction threads. */
private static final ThreadLocal<ByteBuffer> THREAD_BUF = new
ThreadLocal<>();
- /** Partitions for which delta files are currently compacted. */
- private final PartitionProcessingCounterMap
partitionCompactionInProgressMap = new PartitionProcessingCounterMap();
-
/** Page size in bytes. */
private final int pageSize;
/** Failure processor. */
private final FailureManager failureManager;
+ private final PartitionDestructionLockManager
partitionDestructionLockManager;
+
/**
* Creates new ignite worker with given parameters.
*
@@ -102,6 +101,7 @@ public class Compactor extends IgniteWorker {
* @param filePageStoreManager File page store manager.
* @param pageSize Page size in bytes.
* @param failureManager Failure processor that is used to handle critical
errors.
+ * @param partitionDestructionLockManager Partition Destruction Lock
Manager.
*/
public Compactor(
IgniteLogger log,
@@ -109,12 +109,14 @@ public class Compactor extends IgniteWorker {
int threads,
FilePageStoreManager filePageStoreManager,
int pageSize,
- FailureManager failureManager
+ FailureManager failureManager,
+ PartitionDestructionLockManager partitionDestructionLockManager
) {
super(log, igniteInstanceName, "compaction-thread");
this.filePageStoreManager = filePageStoreManager;
this.failureManager = failureManager;
+ this.partitionDestructionLockManager = partitionDestructionLockManager;
if (threads > 1) {
threadPoolExecutor = new ThreadPoolExecutor(
@@ -250,7 +252,9 @@ public class Compactor extends IgniteWorker {
GroupPartitionId groupPartitionId =
toMerge.groupPartitionFilePageStore.groupPartitionId();
-
partitionCompactionInProgressMap.incrementPartitionProcessingCounter(groupPartitionId);
+ Lock partitionDestructionLock =
partitionDestructionLockManager.destructionLock(groupPartitionId).readLock();
+
+ partitionDestructionLock.lock();
try {
mergeDeltaFileToMainFile(
@@ -259,7 +263,7 @@ public class Compactor extends IgniteWorker {
tracker
);
} finally {
-
partitionCompactionInProgressMap.decrementPartitionProcessingCounter(groupPartitionId);
+ partitionDestructionLock.unlock();
}
}
} catch (Throwable ex) {
@@ -451,21 +455,6 @@ public class Compactor extends IgniteWorker {
assert removed : filePageStore.filePath();
}
- /**
- * Prepares the compactor to destroy a partition.
- *
- * <p>If the partition compaction is in progress, then we will wait until
it is completed so that there are no errors when we want to
- * destroy the partition file and its delta file, and at this time its
compaction occurs.
- *
- * @param groupPartitionId Pair of group ID with partition ID.
- * @return Future at the complete of which we can delete the partition
file and its delta files.
- */
- public CompletableFuture<Void> prepareToDestroyPartition(GroupPartitionId
groupPartitionId) {
- CompletableFuture<Void> partitionProcessingFuture =
partitionCompactionInProgressMap.getProcessedPartitionFuture(groupPartitionId);
-
- return partitionProcessingFuture == null ? nullCompletedFuture() :
partitionProcessingFuture;
- }
-
private static ByteBuffer getThreadLocalBuffer(int pageSize) {
ByteBuffer buffer = THREAD_BUF.get();
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
index 243ace83dbf..6ea44830d2c 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
@@ -21,9 +21,11 @@ import static
org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
@@ -55,6 +57,8 @@ public class DelayedDirtyPageWrite {
/** Replacing pages tracker, used to register & unregister pages being
written. */
private final DelayedPageReplacementTracker tracker;
+ private final PartitionDestructionLockManager
partitionDestructionLockManager;
+
/** Full page id to be written on {@link #flushCopiedPageIfExists}, {@code
null} if nothing to write. */
private @Nullable DirtyFullPageId fullPageId;
@@ -73,18 +77,21 @@ public class DelayedDirtyPageWrite {
* @param byteBufThreadLoc Thread local buffers to use for pages copying.
* @param pageSize Page size in bytes.
* @param tracker Tracker to lock/unlock page reads.
+ * @param partitionDestructionLockManager Partition Destruction Lock
Manager.
*/
DelayedDirtyPageWrite(
WriteDirtyPage flushDirtyPage,
ThreadLocal<ByteBuffer> byteBufThreadLoc,
// TODO: IGNITE-17017 Move to common config
int pageSize,
- DelayedPageReplacementTracker tracker
+ DelayedPageReplacementTracker tracker,
+ PartitionDestructionLockManager partitionDestructionLockManager
) {
this.flushDirtyPage = flushDirtyPage;
this.pageSize = pageSize;
this.byteBufThreadLoc = byteBufThreadLoc;
this.tracker = tracker;
+ this.partitionDestructionLockManager = partitionDestructionLockManager;
}
/**
@@ -134,7 +141,9 @@ public class DelayedDirtyPageWrite {
Throwable errorOnWrite = null;
-
checkpointPages.blockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+ Lock partitionDestructionLock =
partitionDestructionLockManager.destructionLock(GroupPartitionId.convert(fullPageId)).readLock();
+
+ partitionDestructionLock.lock();
try {
flushDirtyPage.write(pageMemory, fullPageId,
byteBufThreadLoc.get());
@@ -143,7 +152,7 @@ public class DelayedDirtyPageWrite {
throw t;
} finally {
-
checkpointPages.unblockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+ partitionDestructionLock.unlock();
checkpointPages.unblockFsyncOnPageReplacement(fullPageId,
errorOnWrite);
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
index d5a3c14d392..63e1dec3560 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.pagememory.FullPageId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
/**
@@ -55,6 +56,8 @@ public class DelayedPageReplacementTracker {
*/
private final Map<Long, DelayedDirtyPageWrite>
delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
+ private final PartitionDestructionLockManager
partitionDestructionLockManager;
+
/**
* Constructor.
*
@@ -62,17 +65,20 @@ public class DelayedPageReplacementTracker {
* @param flushDirtyPage Flush dirty page.
* @param log Logger.
* @param segmentCnt Segments count.
+ * @param partitionDestructionLockManager Partition Destruction Lock
Manager.
*/
public DelayedPageReplacementTracker(
// TODO: IGNITE-17017 Move to common config
int pageSize,
WriteDirtyPage flushDirtyPage,
IgniteLogger log,
- int segmentCnt
+ int segmentCnt,
+ PartitionDestructionLockManager partitionDestructionLockManager
) {
this.pageSize = pageSize;
this.flushDirtyPage = flushDirtyPage;
this.log = log;
+ this.partitionDestructionLockManager = partitionDestructionLockManager;
stripes = new Stripe[segmentCnt];
@@ -94,7 +100,7 @@ public class DelayedPageReplacementTracker {
*/
public DelayedDirtyPageWrite delayedPageWrite() {
return
delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
- id -> new DelayedDirtyPageWrite(flushDirtyPage,
byteBufThreadLoc, pageSize, this));
+ id -> new DelayedDirtyPageWrite(flushDirtyPage,
byteBufThreadLoc, pageSize, this, partitionDestructionLockManager));
}
/**
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
deleted file mode 100644
index af5bf41158a..00000000000
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMapTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.CompletableFuture;
-import org.junit.jupiter.api.Test;
-
-/**
- * For {@link PartitionProcessingCounterMap} testing.
- */
-public class PartitionProcessingCounterMapTest {
- @Test
- void test() {
- PartitionProcessingCounterMap processingCounterMap = new
PartitionProcessingCounterMap();
-
- GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
-
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
-
-
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture0 =
processingCounterMap.getProcessedPartitionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture0);
- assertFalse(processedPartitionFuture0.isDone());
-
-
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
-
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
-
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
-
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture0.isDone());
-
- // Let's check the reprocessing of the partition.
-
-
processingCounterMap.incrementPartitionProcessingCounter(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture1 =
processingCounterMap.getProcessedPartitionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture1);
- assertFalse(processedPartitionFuture1.isDone());
- assertNotSame(processedPartitionFuture0, processedPartitionFuture1);
-
-
processingCounterMap.decrementPartitionProcessingCounter(groupPartitionId);
-
-
assertNull(processingCounterMap.getProcessedPartitionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture1.isDone());
- }
-}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index b187a1d8950..ec4639c6641 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -63,6 +63,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
@@ -152,7 +153,8 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
pageWriter,
ioRegistry,
createPartitionMetaManager(Map.of(groupPartId0,
partitionMeta0, groupPartId1, partitionMeta1)),
- () -> false
+ () -> false,
+ new PartitionDestructionLockManager()
);
pagesWriter.run();
@@ -231,7 +233,8 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
createDirtyPageWriter(null),
ioRegistry,
createPartitionMetaManager(Map.of(groupPartId,
mock(PartitionMeta.class))),
- () -> false
+ () -> false,
+ new PartitionDestructionLockManager()
);
pagesWriter.run();
@@ -292,7 +295,8 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
createDirtyPageWriter(null),
ioRegistry,
createPartitionMetaManager(Map.of(groupPartId, partitionMeta)),
- () -> checkpointWritePageCount.get() > 0
+ () -> checkpointWritePageCount.get() > 0,
+ new PartitionDestructionLockManager()
);
pagesWriter.run();
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
index c1c96cd8d3c..8454c71e511 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
@@ -30,7 +30,6 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -39,7 +38,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.junit.jupiter.api.Test;
/**
@@ -367,50 +365,4 @@ public class CheckpointProgressImplTest {
assertNull(progressImpl.pagesToWrite());
}
-
- @Test
- void testProcessedPartition() {
- CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
-
- GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
-
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
-
- progressImpl.blockPartitionDestruction(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture0 =
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture0);
- assertFalse(processedPartitionFuture0.isDone());
-
- progressImpl.blockPartitionDestruction(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
- progressImpl.unblockPartitionDestruction(groupPartitionId);
-
- assertSame(processedPartitionFuture0,
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertFalse(processedPartitionFuture0.isDone());
-
- progressImpl.unblockPartitionDestruction(groupPartitionId);
-
-
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture0.isDone());
-
- // Let's check the reprocessing of the partition.
-
- progressImpl.blockPartitionDestruction(groupPartitionId);
-
- CompletableFuture<Void> processedPartitionFuture1 =
progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId);
-
- assertNotNull(processedPartitionFuture1);
- assertFalse(processedPartitionFuture1.isDone());
- assertNotSame(processedPartitionFuture0, processedPartitionFuture1);
-
- progressImpl.unblockPartitionDestruction(groupPartitionId);
-
-
assertNull(progressImpl.getUnblockPartitionDestructionFuture(groupPartitionId));
- assertTrue(processedPartitionFuture1.isDone());
- }
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index f8c66d94c68..789ac8c17e4 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -20,11 +20,9 @@ package
org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static java.lang.System.nanoTime;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FACTORY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
@@ -73,6 +71,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
@@ -121,18 +120,21 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
void testStartAndStop() throws Exception {
PartitionMetaManager mockParititonMetaManager =
mock(PartitionMetaManager.class);
- Checkpointer checkpointer = new Checkpointer(
+ var partitionDestructionLockManager = new
PartitionDestructionLockManager();
+
+ var checkpointer = new Checkpointer(
"test",
null,
mock(FailureManager.class),
createCheckpointWorkflow(EMPTY),
- createCheckpointPagesWriterFactory(mockParititonMetaManager),
+ createCheckpointPagesWriterFactory(mockParititonMetaManager,
partitionDestructionLockManager),
mock(FilePageStoreManager.class),
mockParititonMetaManager,
mock(Compactor.class),
PAGE_SIZE,
checkpointConfig,
- mock(LogSyncer.class)
+ mock(LogSyncer.class),
+ partitionDestructionLockManager
);
assertNull(checkpointer.runner());
@@ -167,7 +169,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
mock(Compactor.class),
PAGE_SIZE,
checkpointConfig,
- mock(LogSyncer.class)
+ mock(LogSyncer.class),
+ new PartitionDestructionLockManager()
));
assertNull(checkpointer.lastCheckpointProgress());
@@ -273,7 +276,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
mock(Compactor.class),
PAGE_SIZE,
checkpointConfig,
- mock(LogSyncer.class)
+ mock(LogSyncer.class),
+ new PartitionDestructionLockManager()
);
CompletableFuture<?> waitCheckpointEventFuture =
runAsync(checkpointer::waitCheckpointEvent);
@@ -293,18 +297,21 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
void testCheckpointBody() throws Exception {
intervalMillis.set(100L);
- Checkpointer checkpointer = spy(new Checkpointer(
+ var partitionDestructionLockManager = new
PartitionDestructionLockManager();
+
+ var checkpointer = spy(new Checkpointer(
"test",
null,
mock(FailureManager.class),
createCheckpointWorkflow(EMPTY),
-
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
+
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class),
partitionDestructionLockManager),
mock(FilePageStoreManager.class),
mock(PartitionMetaManager.class),
mock(Compactor.class),
PAGE_SIZE,
checkpointConfig,
- mock(LogSyncer.class)
+ mock(LogSyncer.class),
+ partitionDestructionLockManager
));
checkpointer.scheduledProgress()
@@ -392,18 +399,21 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
when(mock.getMeta(any())).thenReturn(meta);
- Checkpointer checkpointer = spy(new Checkpointer(
+ var partitionDestructionLockManager = new
PartitionDestructionLockManager();
+
+ var checkpointer = spy(new Checkpointer(
"test",
null,
mock(FailureManager.class),
createCheckpointWorkflow(dirtyPages),
- createCheckpointPagesWriterFactory(partitionMetaManager),
+ createCheckpointPagesWriterFactory(partitionMetaManager,
partitionDestructionLockManager),
createFilePageStoreManager(Map.of(new GroupPartitionId(0, 0),
filePageStore)),
mock,
compactor,
PAGE_SIZE,
checkpointConfig,
- mockLogSyncer
+ mockLogSyncer,
+ partitionDestructionLockManager
));
assertDoesNotThrow(checkpointer::doCheckpoint);
@@ -424,18 +434,24 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
Compactor compactor = mock(Compactor.class);
- Checkpointer checkpointer = spy(new Checkpointer(
+ var partitionDestructionLockManager = new
PartitionDestructionLockManager();
+
+ var checkpointer = spy(new Checkpointer(
"test",
null,
mock(FailureManager.class),
createCheckpointWorkflow(dirtyPages),
- createCheckpointPagesWriterFactory(new
PartitionMetaManager(ioRegistry, PAGE_SIZE, FACTORY)),
+ createCheckpointPagesWriterFactory(
+ new PartitionMetaManager(ioRegistry, PAGE_SIZE,
FACTORY),
+ partitionDestructionLockManager
+ ),
createFilePageStoreManager(Map.of()),
mock(PartitionMetaManager.class),
compactor,
PAGE_SIZE,
checkpointConfig,
- mock(LogSyncer.class)
+ mock(LogSyncer.class),
+ partitionDestructionLockManager
));
assertDoesNotThrow(checkpointer::doCheckpoint);
@@ -462,7 +478,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
mock(Compactor.class),
PAGE_SIZE,
checkpointConfig,
- mock(LogSyncer.class)
+ mock(LogSyncer.class),
+ new PartitionDestructionLockManager()
);
// Checks case 0 deviation.
@@ -491,49 +508,6 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
);
}
- @Test
- void testPrepareToDestroyPartition() throws Exception {
- Checkpointer checkpointer = new Checkpointer(
- "test",
- null,
- mock(FailureManager.class),
- mock(CheckpointWorkflow.class),
- mock(CheckpointPagesWriterFactory.class),
- mock(FilePageStoreManager.class),
- mock(PartitionMetaManager.class),
- mock(Compactor.class),
- PAGE_SIZE,
- checkpointConfig,
- mock(LogSyncer.class)
- );
-
- GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
-
- // Everything should be fine as there is no current running checkpoint.
- checkpointer.prepareToDestroyPartition(groupPartitionId).get(1,
SECONDS);
-
- CheckpointProgressImpl checkpointProgress = (CheckpointProgressImpl)
checkpointer.scheduledProgress();
-
- checkpointer.startCheckpointProgress();
-
- checkpointer.prepareToDestroyPartition(groupPartitionId).get(1,
SECONDS);
-
- checkpointProgress.transitTo(LOCK_RELEASED);
- assertTrue(checkpointProgress.inProgress());
-
- // Everything should be fine so on a "working" checkpoint we don't
process the partition anyhow.
- checkpointer.prepareToDestroyPartition(groupPartitionId).get(1,
SECONDS);
-
- // Let's emulate that we are processing a partition and check that
everything will be fine after processing is completed.
- checkpointProgress.blockPartitionDestruction(groupPartitionId);
-
- CompletableFuture<?> onPartitionDestructionFuture =
checkpointer.prepareToDestroyPartition(groupPartitionId);
-
- checkpointProgress.unblockPartitionDestruction(groupPartitionId);
-
- onPartitionDestructionFuture.get(1, SECONDS);
- }
-
private static CheckpointDirtyPages dirtyPages(PersistentPageMemory
pageMemory, DirtyFullPageId... pageIds) {
return new
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory,
pageIds)));
}
@@ -573,12 +547,16 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
return mock;
}
- private CheckpointPagesWriterFactory
createCheckpointPagesWriterFactory(PartitionMetaManager partitionMetaManager) {
+ private CheckpointPagesWriterFactory createCheckpointPagesWriterFactory(
+ PartitionMetaManager partitionMetaManager,
+ PartitionDestructionLockManager partitionDestructionLockManager
+ ) {
return new CheckpointPagesWriterFactory(
mock(WriteDirtyPage.class),
ioRegistry,
partitionMetaManager,
- PAGE_SIZE
+ PAGE_SIZE,
+ partitionDestructionLockManager
);
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
index fd2917c41bf..1af89b6e7c8 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -61,14 +62,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
@Test
void testStartAndStop() throws Exception {
- var compactor = new Compactor(
- log,
- "test",
- 1,
- mock(FilePageStoreManager.class),
- PAGE_SIZE,
- mock(FailureManager.class)
- );
+ Compactor compactor = newCompactor();
assertNull(compactor.runner());
@@ -95,13 +89,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
@Test
void testMergeDeltaFileToMainFile() throws Throwable {
- Compactor compactor = new Compactor(
- log,
- "test",
- 1,
- mock(FilePageStoreManager.class),
- PAGE_SIZE,
- mock(FailureManager.class));
+ Compactor compactor = newCompactor();
FilePageStore filePageStore = mock(FilePageStore.class);
DeltaFilePageStoreIo deltaFilePageStoreIo =
mock(DeltaFilePageStoreIo.class);
@@ -147,13 +135,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
when(filePageStoreManager.allPageStores()).then(answer ->
groupPageStoresMap.getAll());
- Compactor compactor = spy(new Compactor(
- log,
- "test",
- 1,
- filePageStoreManager,
- PAGE_SIZE,
- mock(FailureManager.class)));
+ Compactor compactor = spy(newCompactor(filePageStoreManager));
doAnswer(answer -> {
assertSame(filePageStore, answer.getArgument(0));
@@ -179,13 +161,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
@Test
void testBody() throws Exception {
- Compactor compactor = spy(new Compactor(
- log,
- "test",
- 1,
- mock(FilePageStoreManager.class),
- PAGE_SIZE,
- mock(FailureManager.class)));
+ Compactor compactor = spy(newCompactor());
doNothing().when(compactor).waitDeltaFiles();
@@ -204,13 +180,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
@Test
void testWaitDeltaFiles() throws Exception {
- Compactor compactor = spy(new Compactor(
- log,
- "test",
- 1,
- mock(FilePageStoreManager.class),
- PAGE_SIZE,
- mock(FailureManager.class)));
+ Compactor compactor = spy(newCompactor());
CompletableFuture<?> waitDeltaFilesFuture =
runAsync(compactor::waitDeltaFiles);
@@ -223,13 +193,7 @@ public class CompactorTest extends BaseIgniteAbstractTest {
@Test
void testCancel() throws Exception {
- Compactor compactor = spy(new Compactor(
- log,
- "test",
- 1,
- mock(FilePageStoreManager.class),
- PAGE_SIZE,
- mock(FailureManager.class)));
+ Compactor compactor = spy(newCompactor());
assertFalse(compactor.isCancelled());
@@ -244,4 +208,20 @@ public class CompactorTest extends BaseIgniteAbstractTest {
waitDeltaFilesFuture.get(100, MILLISECONDS);
}
+
+ private Compactor newCompactor() {
+ return newCompactor(mock(FilePageStoreManager.class));
+ }
+
+ private Compactor newCompactor(FilePageStoreManager filePageStoreManager) {
+ return new Compactor(
+ log,
+ "test",
+ 1,
+ filePageStoreManager,
+ PAGE_SIZE,
+ mock(FailureManager.class),
+ new PartitionDestructionLockManager()
+ );
+ }
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
index ba2a750a27f..13f8a2f0787 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
@@ -154,7 +154,8 @@ public abstract class AbstractPageReplacementTest extends
IgniteAbstractTest {
filePageStoreManager,
checkpointManager::writePageToFilePageStore,
checkpointManager.checkpointTimeoutLock(),
- new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)
+ new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+ checkpointManager.partitionDestructionLockManager()
);
dataRegionList.add(() -> pageMemory);
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
index 40fe9ad3ae9..ddd1b902e63 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
@@ -173,7 +173,8 @@ public class PageMemoryThrottlingTest extends
IgniteAbstractTest {
checkpointProgress.evictedPagesCounter().incrementAndGet();
},
checkpointManager.checkpointTimeoutLock(),
- new OffheapReadWriteLock(2)
+ new OffheapReadWriteLock(2),
+ checkpointManager.partitionDestructionLockManager()
);
pageStoreManager.start();
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index fed1a143b38..45b54e3dc78 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -173,7 +173,8 @@ public class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPage
filePageStoreManager,
this::flushDirtyPageOnReplacement,
checkpointManager.checkpointTimeoutLock(),
- new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)
+ new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+ checkpointManager.partitionDestructionLockManager()
);
initThrottling(pageMemory);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 12d08279c3e..e9c50688e0b 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -108,6 +109,8 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
protected void finishDestruction() {
dataRegion.pageMemory().onGroupDestroyed(getTableId());
+
dataRegion.checkpointManager().partitionDestructionLockManager().removeLockForGroup(getTableId());
+
try {
dataRegion.filePageStoreManager().destroyGroupIfExists(getTableId());
} catch (IOException e) {
@@ -364,11 +367,20 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
private CompletableFuture<Void>
destroyPartitionPhysically(GroupPartitionId groupPartitionId) {
dataRegion.filePageStoreManager().getStore(groupPartitionId).markToDestroy();
- dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(),
groupPartitionId.getPartitionId());
+ Lock partitionDestructionLock =
dataRegion.checkpointManager().partitionDestructionLockManager().destructionLock(groupPartitionId)
+ .writeLock();
+
+ partitionDestructionLock.lock();
+
+ try {
+ dataRegion.pageMemory().invalidate(groupPartitionId.getGroupId(),
groupPartitionId.getPartitionId());
- return
dataRegion.checkpointManager().onPartitionDestruction(groupPartitionId)
- .thenAccept(unused ->
dataRegion.partitionMetaManager().removeMeta(groupPartitionId))
- .thenCompose(unused ->
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId));
+ dataRegion.partitionMetaManager().removeMeta(groupPartitionId);
+
+ return
dataRegion.filePageStoreManager().destroyPartition(groupPartitionId);
+ } finally {
+ partitionDestructionLock.unlock();
+ }
}
private GroupPartitionId createGroupPartitionId(int partitionId) {
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
index 1520688a02e..e90e71da7fb 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
@@ -70,6 +70,7 @@ import
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionC
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.PartitionMetaSnapshot;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -448,7 +449,7 @@ public class PersistentPageMemoryNoLoadTest extends
AbstractPageMemoryNoLoadSelf
}
}
- protected PersistentPageMemory createPageMemory(
+ private PersistentPageMemory createPageMemory(
long[] segmentSizes,
long checkpointBufferSize,
@Nullable FilePageStoreManager filePageStoreManager,
@@ -464,7 +465,8 @@ public class PersistentPageMemoryNoLoadTest extends
AbstractPageMemoryNoLoadSelf
filePageStoreManager == null ? new TestPageReadWriteManager()
: filePageStoreManager,
flushDirtyPageForReplacement,
checkpointManager == null ? mockCheckpointTimeoutLock(true) :
checkpointManager.checkpointTimeoutLock(),
- new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)
+ new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL),
+ checkpointManager == null ? new
PartitionDestructionLockManager() :
checkpointManager.partitionDestructionLockManager()
);
}