This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 65a02ca IGNITE-13912 Fixed calculation of the number of segments to be deleted - Fixes #8650. 65a02ca is described below commit 65a02ca6750e891178495e1cc4d36877dd49ccdc Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Jan 19 15:44:29 2021 +0300 IGNITE-13912 Fixed calculation of the number of segments to be deleted - Fixes #8650. Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com> --- .../IgniteCacheDatabaseSharedManager.java | 2 +- .../persistence/checkpoint/CheckpointHistory.java | 4 +- .../persistence/wal/FileWriteAheadLogManager.java | 21 +++---- .../db/wal/WalDeletionArchiveAbstractTest.java | 69 ++++++++++++++++++++++ 4 files changed, 80 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 821d3a3..06df66a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -1006,7 +1006,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** * @return Last checkpoint mark WAL pointer. */ - public WALPointer lastCheckpointMarkWalPointer() { + @Nullable public WALPointer lastCheckpointMarkWalPointer() { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java index 87be2eb..2720072 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java @@ -152,9 +152,9 @@ public class CheckpointHistory { } /** - * @return Last checkpoint entry if exists. Otherwise {@code null}. + * @return Last checkpoint entry if exists. */ - public CheckpointEntry lastCheckpoint() { + @Nullable public CheckpointEntry lastCheckpoint() { Map.Entry<Long, CheckpointEntry> entry = histMap.lastEntry(); return entry != null ? entry.getValue() : null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 4a1d968..a7eee3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -216,14 +216,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, DFLT_WAL_MMAP); /** - * Percentage of archive size for checkpoint trigger. Need for calculate max size of WAL after last checkpoint. - * Checkpoint should be triggered when max size of WAL after last checkpoint more than maxWallArchiveSize * thisValue - */ - private static final double CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE = - IgniteSystemProperties.getDouble(IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE, - DFLT_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE); - - /** * Percentage of WAL archive size to calculate threshold since which removing of old archive should be started. */ private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE = @@ -422,9 +414,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl fileHandleManagerFactory = new FileHandleManagerFactory(dsCfg); - maxSegCountWithoutCheckpoint = - (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE) - / dsCfg.getWalSegmentSize()); + double cpTriggerArchiveSizePercentage = IgniteSystemProperties.getDouble( + IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE, DFLT_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE); + + maxSegCountWithoutCheckpoint = (long)((U.adjustedWalHistorySize(dsCfg, log) * cpTriggerArchiveSizePercentage) + / dsCfg.getWalSegmentSize()); switchSegmentRecordOffset = isArchiverEnabled() ? new AtomicLongArray(dsCfg.getWalSegments()) : null; } @@ -3182,6 +3176,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileDescriptor high = null; long size = 0; + long totalSize = totalSize(walArchiveFiles); for (FileDescriptor fileDesc : walArchiveFiles) { if (fileDesc.idx >= lastCheckpointPtr.index() || segmentAware.reserved(fileDesc.idx)) @@ -3190,7 +3185,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl high = fileDesc; // Ensure that there will be exactly removed at least one segment. - if ((size += fileDesc.file.length()) > allowedThresholdWalArchiveSize) + if (totalSize - (size += fileDesc.file.length()) < allowedThresholdWalArchiveSize) break; } } @@ -3200,7 +3195,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (log.isInfoEnabled()) { log.info("Starting to clean WAL archive [highIdx=" + highPtr.index() - + ", currSize=" + U.humanReadableByteCount(totalSize(walArchiveFiles)) + + ", currSize=" + U.humanReadableByteCount(totalSize) + ", maxSize=" + U.humanReadableByteCount(dsCfg.getMaxWalArchiveSize()) + ']'); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java index e66abe3..479021c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.ignite.Ignite; @@ -37,11 +40,16 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; +import static org.apache.ignite.internal.util.IgniteUtils.GB; +import static org.apache.ignite.internal.util.IgniteUtils.KB; +import static org.apache.ignite.internal.util.IgniteUtils.MB; import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -276,6 +284,57 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT } /** + * Checks that the deletion of WAL segments occurs with the maximum number of segments. + * + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE, value = "1000") + public void testSingleCleanWalArchive() throws Exception { + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0)) + .setCacheConfiguration(cacheConfiguration()) + .setDataStorageConfiguration( + new DataStorageConfiguration() + .setCheckpointFrequency(Long.MAX_VALUE) + .setMaxWalArchiveSize(5 * MB) + .setWalSegmentSize((int)MB) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(GB) + .setCheckpointPageBufferSize(GB) + ) + ); + + ListeningTestLogger listeningLog = new ListeningTestLogger(cfg.getGridLogger()); + cfg.setGridLogger(listeningLog); + + IgniteEx n = startGrid(cfg); + + n.cluster().state(ClusterState.ACTIVE); + awaitPartitionMapExchange(); + + for (int i = 0; walArchiveSize(n) < 20L * cfg.getDataStorageConfiguration().getWalSegmentSize(); ) + n.cache(DEFAULT_CACHE_NAME).put(i++, new byte[(int)(512 * KB)]); + + assertEquals(-1, wal(n).lastTruncatedSegment()); + assertEquals(0, gridDatabase(n).lastCheckpointMarkWalPointer().index()); + + Collection<String> logStrs = new ConcurrentLinkedQueue<>(); + listeningLog.registerListener(logStr -> { + if (logStr.contains("Finish clean WAL archive")) + logStrs.add(logStr); + }); + + forceCheckpoint(); + + long maxWalArchiveSize = cfg.getDataStorageConfiguration().getMaxWalArchiveSize(); + assertTrue(waitForCondition(() -> walArchiveSize(n) < maxWalArchiveSize, getTestTimeout())); + + assertEquals(logStrs.toString(), 1, logStrs.size()); + } + + /** * Extract GridCacheDatabaseSharedManager. */ private GridCacheDatabaseSharedManager gridDatabase(Ignite ignite) { @@ -288,4 +347,14 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT private FileWriteAheadLogManager wal(Ignite ignite) { return (FileWriteAheadLogManager)((IgniteEx)ignite).context().cache().context().wal(); } + + /** + * Calculate current WAL archive size. + * + * @param n Node. + * @return Total WAL archive size. + */ + private long walArchiveSize(Ignite n) { + return Arrays.stream(wal(n).walArchiveFiles()).mapToLong(fd -> fd.file().length()).sum(); + } }