This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new d8ffc2b IGNITE-13681 Lightweight checkpoint implementation, per data region checkpoint listeners. - Fixes #8433. d8ffc2b is described below commit d8ffc2b2991e262b65dfb9ee32fd818852cd5aaf Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Wed Nov 11 15:12:20 2020 +0300 IGNITE-13681 Lightweight checkpoint implementation, per data region checkpoint listeners. - Fixes #8433. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../processors/cache/mvcc/txlog/TxLog.java | 9 +- .../GridCacheDatabaseSharedManager.java | 32 ++- .../cache/persistence/GridCacheOffheapManager.java | 2 +- .../IgniteCacheDatabaseSharedManager.java | 4 +- .../persistence/checkpoint/CheckpointManager.java | 7 +- .../checkpoint/CheckpointPagesWriter.java | 21 +- .../checkpoint/CheckpointPagesWriterFactory.java | 26 ++- .../persistence/checkpoint/CheckpointWorkflow.java | 92 +++++--- .../cache/persistence/checkpoint/Checkpointer.java | 14 +- ...ager.java => LightweightCheckpointManager.java} | 134 +++--------- .../cache/persistence/metastorage/MetaStorage.java | 4 +- .../CheckpointListenerForRegionTest.java | 191 +++++++++++++++++ .../db/checkpoint/CheckpointStartLoggingTest.java | 2 +- .../db/checkpoint/LightweightCheckpointTest.java | 231 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 5 +- 15 files changed, 599 insertions(+), 175 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java index 8adb6b3..9b8c73b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; @@ -102,6 +103,8 @@ public class TxLog implements CheckpointListener { PageLockListener txLogLockLsnr = diagnosticMgr.pageLockTracker().createPageLockTracker(txLogName); + DataRegion txLogDataRegion = mgr.dataRegion(TX_LOG_CACHE_NAME); + if (CU.isPersistenceEnabled(ctx.config())) { String txLogReuseListName = TX_LOG_CACHE_NAME + "##ReuseList"; PageLockListener txLogReuseListLockLsnr = diagnosticMgr.pageLockTracker().createPageLockTracker(txLogReuseListName); @@ -110,7 +113,7 @@ public class TxLog implements CheckpointListener { try { IgniteWriteAheadLogManager wal = ctx.cache().context().wal(); - PageMemoryEx pageMemory = (PageMemoryEx)mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory(); + PageMemoryEx pageMemory = (PageMemoryEx)txLogDataRegion.pageMemory(); long metaId = pageMemory.metaPageId(TX_LOG_CACHE_ID); long metaPage = pageMemory.acquirePage(TX_LOG_CACHE_ID, metaId); @@ -195,14 +198,14 @@ public class TxLog implements CheckpointListener { txLogLockLsnr ); - ((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this); + ((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this, txLogDataRegion); } finally { mgr.checkpointReadUnlock(); } } else { - PageMemory pageMemory = mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory(); + PageMemory pageMemory = txLogDataRegion.pageMemory(); ReuseList reuseList1 = mgr.reuseList(TX_LOG_CACHE_NAME); long treeRoot; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 37bd464..6a0ecb2 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -52,7 +52,6 @@ import java.util.function.Predicate; import java.util.function.ToLongFunction; import java.util.regex.Pattern; import java.util.stream.Collectors; - import org.apache.ignite.DataRegionMetricsProvider; import org.apache.ignite.DataStorageMetrics; import org.apache.ignite.IgniteCheckedException; @@ -136,6 +135,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDat import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridCountDownCallback; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.StripedExecutor; @@ -317,6 +317,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Lock for releasing history for preloading. */ private ReentrantLock releaseHistForPreloadingLock = new ReentrantLock(); + /** Data regions which should be checkpointed. */ + protected final Set<DataRegion> checkpointedDataRegions = new GridConcurrentHashSet<>(); + /** * @param ctx Kernal context. */ @@ -471,7 +474,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan persistenceCfg, storeMgr, this::isCheckpointInapplicableForWalRebalance, - this::dataRegions, + this::checkpointedDataRegions, this::cacheGroupContexts, this::getPageMemoryForCacheGroup, resolveThrottlingPolicy(), @@ -494,6 +497,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** */ + public Collection<DataRegion> checkpointedDataRegions() { + return checkpointedDataRegions; + } + + /** */ private Collection<CacheGroupContext> cacheGroupContexts() { return cctx.cache().cacheGroups(); } @@ -594,6 +602,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan fileLockHolder.close(); } + /** {@inheritDoc} */ + @Override public DataRegion addDataRegion(DataStorageConfiguration dataStorageCfg, DataRegionConfiguration dataRegionCfg, + boolean trackable) throws IgniteCheckedException { + DataRegion region = super.addDataRegion(dataStorageCfg, dataRegionCfg, trackable); + + checkpointedDataRegions.add(region); + + return region; + } + /** */ private void readMetastore() throws IgniteCheckedException { try { @@ -1559,9 +1577,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param lsnr Listener. + * @param dataRegion Data region for which listener is corresponded to. + */ + public void addCheckpointListener(CheckpointListener lsnr, DataRegion dataRegion) { + checkpointManager.addCheckpointListener(lsnr, dataRegion); + } + + /** + * @param lsnr Listener. */ public void addCheckpointListener(CheckpointListener lsnr) { - checkpointManager.addCheckpointListener(lsnr); + checkpointManager.addCheckpointListener(lsnr, null); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index fa4fabe..4bfda53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -225,7 +225,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple persStoreMetrics = databaseSharedManager.persistentStoreMetricsImpl(); - databaseSharedManager.addCheckpointListener(this); + databaseSharedManager.addCheckpointListener(this, grp.dataRegion()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 1d77566..42ad2a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -413,7 +413,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap * @param dataRegionCfg Data region config. * @throws IgniteCheckedException If failed to initialize swap path. */ - public void addDataRegion( + public DataRegion addDataRegion( DataStorageConfiguration dataStorageCfg, DataRegionConfiguration dataRegionCfg, boolean trackable @@ -441,6 +441,8 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap else if (dataRegionName.equals(DFLT_DATA_REG_DEFAULT_NAME)) U.warn(log, "Data Region with name 'default' isn't used as a default. " + "Please, check Data Region configuration."); + + return region; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java index 415e2cf..2beac7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java @@ -170,7 +170,7 @@ public class CheckpointManager { checkpointPagesWriterFactory = new CheckpointPagesWriterFactory( logger, snapshotMgr, - (fullPage, buf, tag) -> pageStoreManager.writeInternal(fullPage.groupId(), fullPage.pageId(), buf, tag, true), + (pageMemEx, fullPage, buf, tag) -> pageStoreManager.writeInternal(fullPage.groupId(), fullPage.pageId(), buf, tag, true), persStoreMetrics, throttlingPolicy, threadBuf, pageMemoryGroupResolver @@ -231,9 +231,10 @@ public class CheckpointManager { /** * @param lsnr Listener. + * @param dataRegion Data region for which listener is corresponded to. */ - public void addCheckpointListener(CheckpointListener lsnr) { - checkpointWorkflow.addCheckpointListener(lsnr); + public void addCheckpointListener(CheckpointListener lsnr, DataRegion dataRegion) { + checkpointWorkflow.addCheckpointListener(lsnr, dataRegion); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java index 77f9e2e..79f774c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java @@ -178,7 +178,7 @@ public class CheckpointPagesWriter implements Runnable { CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null; - PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry); + Map<PageMemoryEx, PageStoreWriter> pageStoreWriters = new HashMap<>(); ByteBuffer tmpWriteBuf = threadBuf.get(); @@ -201,6 +201,8 @@ public class CheckpointPagesWriter implements Runnable { tmpWriteBuf.rewind(); + PageStoreWriter pageStoreWriter = pageStoreWriters.computeIfAbsent(pageMem, pageMemEx -> createPageStoreWriter(pageMemEx, pagesToRetry)); + pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker); if (throttlingEnabled) { @@ -227,18 +229,20 @@ public class CheckpointPagesWriter implements Runnable { /** * Factory method for create {@link PageStoreWriter}. * + * @param pageMemEx * @param pagesToRetry List pages for retry. * @return Checkpoint page write context. */ - private PageStoreWriter createPageStoreWriter(Map<PageMemoryEx, List<FullPageId>> pagesToRetry) { + private PageStoreWriter createPageStoreWriter( + PageMemoryEx pageMemEx, + Map<PageMemoryEx, List<FullPageId>> pagesToRetry + ) { return new PageStoreWriter() { /** {@inheritDoc} */ @Override public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException { if (tag == PageMemoryImpl.TRY_AGAIN_TAG) { - PageMemoryEx pageMem = pageMemoryGroupResolver.apply(fullPageId.groupId()); - - pagesToRetry.computeIfAbsent(pageMem, k -> new ArrayList<>()).add(fullPageId); + pagesToRetry.computeIfAbsent(pageMemEx, k -> new ArrayList<>()).add(fullPageId); return; } @@ -258,7 +262,7 @@ public class CheckpointPagesWriter implements Runnable { curCpProgress.updateWrittenPages(1); - PageStore store = pageWriter.write(fullPageId, buf, tag); + PageStore store = pageWriter.write(pageMemEx, fullPageId, buf, tag); updStores.computeIfAbsent(store, k -> new LongAdder()).increment(); } @@ -268,12 +272,15 @@ public class CheckpointPagesWriter implements Runnable { /** Interface which allows to write one page to page store. */ public interface CheckpointPageWriter { /** + * + * @param pageMemEx Page memory from which page should be written. * @param fullPageId Full page id. * @param buf Byte buffer. * @param tag Page tag. * @return {@link PageStore} which was used to write. * @throws IgniteCheckedException if fail. */ - PageStore write(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException; + PageStore write(PageMemoryEx pageMemEx, FullPageId fullPageId, ByteBuffer buf, int tag) + throws IgniteCheckedException; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java index eb3607b..8c882e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -144,22 +146,13 @@ public class CheckpointPagesWriterFactory { AtomicInteger cpPagesCnt ) { return () -> { - PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> { - assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId; - - // Write buf to page store. - PageStore store = checkpointPageWriter.write(fullPageId, buf, tag); - - // Save store for future fsync. - updStores.add(store); - }; - GridConcurrentMultiPairQueue.Result<PageMemoryEx, FullPageId> res = new GridConcurrentMultiPairQueue.Result<>(); int pagesWritten = 0; ByteBuffer tmpWriteBuf = threadBuf.get(); + Map<PageMemoryEx, PageStoreWriter> pageStoreWriters = new HashMap<>(); try { while (pages.next(res)) { // Fail-fast break if some exception occurred. @@ -168,6 +161,19 @@ public class CheckpointPagesWriterFactory { PageMemoryEx pageMem = res.getKey(); + PageStoreWriter pageStoreWriter = pageStoreWriters.computeIfAbsent( + pageMem, + (pageMemEx) -> (fullPageId, buf, tag) -> { + assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId; + + // Write buf to page store. + PageStore store = checkpointPageWriter.write(pageMemEx, fullPageId, buf, tag); + + // Save store for future fsync. + updStores.add(store); + } + ); + // Write page content to page store via pageStoreWriter. // Tracker is null, because no need to track checkpoint metrics on recovery. pageMem.checkpointWritePage(res.getValue(), tmpWriteBuf, pageStoreWriter, null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java index 6a97a18..93b7fed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; @@ -39,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; @@ -74,7 +74,9 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.WorkProgressDispatcher; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.thread.IgniteThreadPoolExecutor; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; @@ -106,6 +108,9 @@ public class CheckpointWorkflow { /** @see IgniteSystemProperties#CHECKPOINT_PARALLEL_SORT_THRESHOLD */ public static final int DFLT_CHECKPOINT_PARALLEL_SORT_THRESHOLD = 512 * 1024; + /****/ + private static final DataRegion NO_REGION = new DataRegion(null, null, null, null); + /** * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link * Arrays#parallelSort(Comparable[])} in case of {@link CheckpointWriteOrder#SEQUENTIAL}. @@ -144,7 +149,7 @@ public class CheckpointWorkflow { private final CheckpointWriteOrder checkpointWriteOrder; /** Collections of checkpoint listeners. */ - private final Collection<CheckpointListener> lsnrs = new CopyOnWriteArrayList<>(); + private final Map<CheckpointListener, DataRegion> lsnrs = new ConcurrentLinkedHashMap<>(); /** Ignite instance name. */ private final String igniteInstanceName; @@ -228,7 +233,9 @@ public class CheckpointWorkflow { CheckpointMetricsTracker tracker, WorkProgressDispatcher workProgressDispatcher ) throws IgniteCheckedException { - List<CheckpointListener> dbLsnrs = new ArrayList<>(lsnrs); + Collection<DataRegion> checkpointedRegions = dataRegions.get(); + + List<CheckpointListener> dbLsnrs = getRelevantCheckpointListeners(checkpointedRegions); CheckpointRecord cpRec = new CheckpointRecord(memoryRecoveryRecordPtr); @@ -283,7 +290,7 @@ public class CheckpointWorkflow { fillCacheGroupState(cpRec); //There are allowable to replace pages only after checkpoint entry was stored to disk. - cpPagesHolder = beginAllCheckpoints(dataRegions.get(), curr.futureFor(MARKER_STORED_TO_DISK)); + cpPagesHolder = beginAllCheckpoints(checkpointedRegions, curr.futureFor(MARKER_STORED_TO_DISK)); curr.currentCheckpointPagesCount(cpPagesHolder.pagesNum()); @@ -293,7 +300,8 @@ public class CheckpointWorkflow { if (dirtyPagesCount > 0 || curr.nextSnapshot() || hasPartitionsToDestroy) { // No page updates for this checkpoint are allowed from now on. - cpPtr = wal.log(cpRec); + if (wal != null) + cpPtr = wal.log(cpRec); if (cpPtr == null) cpPtr = CheckpointStatus.NULL_PTR; @@ -326,18 +334,22 @@ public class CheckpointWorkflow { tracker.onWalCpRecordFsyncStart(); // Sync log outside the checkpoint write lock. - wal.flush(cpPtr, true); + if (wal != null) + wal.flush(cpPtr, true); tracker.onWalCpRecordFsyncEnd(); - CheckpointEntry checkpointEntry = checkpointMarkersStorage.writeCheckpointEntry( - cpTs, - cpRec.checkpointId(), - cpPtr, - cpRec, - CheckpointEntryType.START, - skipSync - ); + CheckpointEntry checkpointEntry = null; + + if (checkpointMarkersStorage != null) + checkpointEntry = checkpointMarkersStorage.writeCheckpointEntry( + cpTs, + cpRec.checkpointId(), + cpPtr, + cpRec, + CheckpointEntryType.START, + skipSync + ); curr.transitTo(MARKER_STORED_TO_DISK); @@ -351,7 +363,7 @@ public class CheckpointWorkflow { return new Checkpoint(checkpointEntry, cpPages, curr); } else { - if (curr.nextSnapshot()) + if (curr.nextSnapshot() && wal != null) wal.flush(null, true); return new Checkpoint(null, GridConcurrentMultiPairQueue.EMPTY, curr); @@ -563,23 +575,28 @@ public class CheckpointWorkflow { } if (chp.hasDelta()) { - checkpointMarkersStorage.writeCheckpointEntry( - chp.cpEntry.timestamp(), - chp.cpEntry.checkpointId(), - chp.cpEntry.checkpointMark(), - null, - CheckpointEntryType.END, - skipSync - ); - - wal.notchLastCheckpointPtr(chp.cpEntry.checkpointMark()); + if (checkpointMarkersStorage != null) + checkpointMarkersStorage.writeCheckpointEntry( + chp.cpEntry.timestamp(), + chp.cpEntry.checkpointId(), + chp.cpEntry.checkpointMark(), + null, + CheckpointEntryType.END, + skipSync + ); + + if (wal != null) + wal.notchLastCheckpointPtr(chp.cpEntry.checkpointMark()); } - checkpointMarkersStorage.onCheckpointFinished(chp); + if (checkpointMarkersStorage != null) + checkpointMarkersStorage.onCheckpointFinished(chp); CheckpointContextImpl emptyCtx = new CheckpointContextImpl(chp.progress, null, null, null); - List<CheckpointListener> dbLsnrs = new ArrayList<>(lsnrs); + Collection<DataRegion> checkpointedRegions = dataRegions.get(); + + List<CheckpointListener> dbLsnrs = getRelevantCheckpointListeners(checkpointedRegions); for (CheckpointListener lsnr : dbLsnrs) lsnr.afterCheckpointEnd(emptyCtx); @@ -588,6 +605,17 @@ public class CheckpointWorkflow { } /** + * @param checkpointedRegions Regions which will be checkpointed. + * @return Checkpoint listeners which should be handled. + */ + @NotNull private List<CheckpointListener> getRelevantCheckpointListeners(Collection<DataRegion> checkpointedRegions) { + return lsnrs.entrySet().stream() + .filter(entry -> entry.getValue() == NO_REGION || checkpointedRegions.contains(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** * This method makes sense if node was stopped during the checkpoint(Start marker was written to disk while end * marker are not). It is able to write all pages to disk and create end marker. * @@ -687,10 +715,13 @@ public class CheckpointWorkflow { } /** + * Adding the listener which will be called only when given data region will be checkpointed. + * * @param lsnr Listener. + * @param dataRegion Data region for which listener is corresponded to. */ - public void addCheckpointListener(CheckpointListener lsnr) { - lsnrs.add(lsnr); + public void addCheckpointListener(CheckpointListener lsnr, DataRegion dataRegion) { + lsnrs.put(lsnr, dataRegion == null ? NO_REGION : dataRegion); } /** @@ -720,7 +751,8 @@ public class CheckpointWorkflow { checkpointCollectPagesInfoPool = null; } - lsnrs.clear(); + for (CheckpointListener lsnr : lsnrs.keySet()) + lsnrs.remove(lsnr); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index c0368f4..9e39dfe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -106,7 +106,8 @@ public class Checkpointer extends GridWorker { "walCpRecordFsyncDuration=%dms, " + "writeCheckpointEntryDuration=%dms, " + "splitAndSortCpPagesDuration=%dms, " + - "%s pages=%d, " + + "%s" + + "pages=%d, " + "reason='%s']"; /** Skip sync. */ @@ -408,8 +409,8 @@ public class Checkpointer extends GridWorker { log.info( String.format( CHECKPOINT_STARTED_LOG_FORMAT, - chp.cpEntry.checkpointId(), - chp.cpEntry.checkpointMark(), + chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(), + chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(), tracker.beforeLockDuration(), tracker.lockWaitDuration(), tracker.listenersExecuteDuration(), @@ -417,7 +418,7 @@ public class Checkpointer extends GridWorker { tracker.walCpRecordFsyncDuration(), tracker.writeCheckpointEntryDuration(), tracker.splitAndSortCpPagesDuration(), - possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "", + possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "", chp.pagesSize, chp.progress.reason() ) @@ -455,7 +456,7 @@ public class Checkpointer extends GridWorker { if (chp.hasDelta() || destroyedPartitionsCnt > 0) { if (log.isInfoEnabled()) { - String walSegsCoveredMsg = prepareWalSegsCoveredMsg(chp.walSegsCoveredRange); + String walSegsCoveredMsg = chp.walSegsCoveredRange == null ? "" : prepareWalSegsCoveredMsg(chp.walSegsCoveredRange); log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + "walSegmentsCleared=%d, walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " + @@ -852,6 +853,9 @@ public class Checkpointer extends GridWorker { * Restart worker in IgniteThread. */ public void start() { + if (runner() != null) + return; + assert runner() == null : "Checkpointer is running."; new IgniteThread(this).start(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java similarity index 70% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java index 415e2cf..7a329f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java @@ -17,10 +17,10 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; -import java.io.File; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Collection; +import java.util.Collections; import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; @@ -30,12 +30,10 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.LongJVMPauseDetector; -import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; @@ -43,7 +41,6 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCa import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.util.StripedExecutor; -import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate; import org.apache.ignite.internal.util.lang.IgniteThrowableFunction; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.lang.IgniteInClosure; @@ -52,32 +49,25 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT; /** - * Main class to abstract checkpoint-related processes and actions and hide them from higher-level components. - * Implements default checkpointing algorithm which is sharp checkpoint but can be replaced - * by other implementations if needed. - * Represents only an intermediate step in refactoring of checkpointing component and may change in the future. + * Like a sharp checkpoint algorithm implemented in {@link CheckpointManager} this checkpoint ensures that + * all pages marked dirty under {@link #checkpointTimeoutLock()} will be consistently saved to disk. * - * This checkpoint ensures that all pages marked as - * dirty under {@link #checkpointTimeoutLock ()} will be consistently saved to disk. + * But unlike {@link CheckpointManager} lightweight checkpoint doesn't store any checkpoint markers to disk + * nor write cp-related records to WAL log. * - * Configuration of this checkpoint allows the following: - * <p>Collecting all pages from configured dataRegions which was marked as dirty under {@link #checkpointTimeoutLock - * ()}.</p> * - * <p>Marking the start of checkpoint in WAL and on disk.</p> - * <p>Notifying the subscribers of different checkpoint states through {@link CheckpointListener}.</p> * - * <p>Synchronizing collected pages with disk using {@link FilePageStoreManager}.</p> - * <p>Restoring memory in consistent state if the node failed in the middle of checkpoint.</p> + * This allows to use it in situations where no recovery is needed after crush in the middle of checkpoint + * but work can simply be replayed from the beginning. + * + * Such situations include defragmentation and node recovery after crush + * (regular sharp checkpoint cannot be used during recovery). */ -public class CheckpointManager { +public class LightweightCheckpointManager { /** Checkpoint worker. */ private volatile Checkpointer checkpointer; /** Main checkpoint steps. */ private final CheckpointWorkflow checkpointWorkflow; - /** Checkpoint markers storage which mark the start and end of each checkpoint. */ - private final CheckpointMarkersStorage checkpointMarkersStorage; - /** Timeout checkpoint lock which should be used while write to memory happened. */ final CheckpointTimeoutLock checkpointTimeoutLock; @@ -91,13 +81,9 @@ public class CheckpointManager { * @param logger Logger producer. * @param igniteInstanceName Ignite instance name. * @param checkpointThreadName Name of main checkpoint thread. - * @param wal Write ahead log manager. * @param workersRegistry Workers registry. * @param persistenceCfg Persistence configuration. - * @param pageStoreManager File page store manager. - * @param checkpointInapplicableChecker Checker of checkpoints. * @param dataRegions Data regions. - * @param cacheGroupContexts Cache group contexts. * @param pageMemoryGroupResolver Page memory resolver. * @param throttlingPolicy Throttling policy. * @param snapshotMgr Snapshot manager. @@ -107,52 +93,33 @@ public class CheckpointManager { * @param cacheProcessor Cache processor. * @throws IgniteCheckedException if fail. */ - public CheckpointManager( + public LightweightCheckpointManager( Function<Class<?>, IgniteLogger> logger, String igniteInstanceName, String checkpointThreadName, - IgniteWriteAheadLogManager wal, WorkersRegistry workersRegistry, DataStorageConfiguration persistenceCfg, - FilePageStoreManager pageStoreManager, - IgniteThrowableBiPredicate<Long, Integer> checkpointInapplicableChecker, Supplier<Collection<DataRegion>> dataRegions, - Supplier<Collection<CacheGroupContext>> cacheGroupContexts, IgniteThrowableFunction<Integer, PageMemoryEx> pageMemoryGroupResolver, PageMemoryImpl.ThrottlingPolicy throttlingPolicy, IgniteCacheSnapshotManager snapshotMgr, DataStorageMetricsImpl persStoreMetrics, LongJVMPauseDetector longJvmPauseDetector, FailureProcessor failureProcessor, - GridCacheProcessor cacheProcessor + GridCacheProcessor cacheProcessor, + FilePageStoreManager pageStoreManager ) throws IgniteCheckedException { - CheckpointHistory cpHistory = new CheckpointHistory( - persistenceCfg, - logger, - wal, - checkpointInapplicableChecker - ); - - FileIOFactory ioFactory = persistenceCfg.getFileIOFactory(); - - checkpointMarkersStorage = new CheckpointMarkersStorage( - logger, - cpHistory, - ioFactory, - pageStoreManager.workDir().getAbsolutePath() - ); - CheckpointReadWriteLock lock = new CheckpointReadWriteLock(logger); checkpointWorkflow = new CheckpointWorkflow( logger, - wal, + null, snapshotMgr, - checkpointMarkersStorage, + null, lock, persistenceCfg.getCheckpointWriteOrder(), dataRegions, - cacheGroupContexts, + Collections::emptyList, persistenceCfg.getCheckpointThreads(), igniteInstanceName ); @@ -169,10 +136,13 @@ public class CheckpointManager { }; checkpointPagesWriterFactory = new CheckpointPagesWriterFactory( - logger, snapshotMgr, - (fullPage, buf, tag) -> pageStoreManager.writeInternal(fullPage.groupId(), fullPage.pageId(), buf, tag, true), + logger, + snapshotMgr, + (pageMemEx, fullPage, buf, tag) -> + pageStoreManager.writeInternal(fullPage.groupId(), fullPage.pageId(), buf, tag, true), persStoreMetrics, - throttlingPolicy, threadBuf, + throttlingPolicy, + threadBuf, pageMemoryGroupResolver ); @@ -231,9 +201,10 @@ public class CheckpointManager { /** * @param lsnr Listener. + * @param dataRegion */ - public void addCheckpointListener(CheckpointListener lsnr) { - checkpointWorkflow.addCheckpointListener(lsnr); + public void addCheckpointListener(CheckpointListener lsnr, DataRegion dataRegion) { + checkpointWorkflow.addCheckpointListener(lsnr, dataRegion); } /** @@ -251,21 +222,6 @@ public class CheckpointManager { } /** - * @return Checkpoint directory. - */ - public File checkpointDirectory() { - return checkpointMarkersStorage.cpDir; - } - - /** - * @return Read checkpoint status. - * @throws IgniteCheckedException If failed to read checkpoint status page. - */ - public CheckpointStatus readCheckpointStatus() throws IgniteCheckedException { - return checkpointMarkersStorage.readCheckpointStatus(); - } - - /** * Start the new checkpoint immediately. * * @param reason Reason. @@ -285,44 +241,6 @@ public class CheckpointManager { } /** - * @return Checkpoint history. - */ - public CheckpointHistory checkpointHistory() { - return checkpointMarkersStorage.history(); - } - - /** - * Initialize checkpoint storage. - */ - public void initializeStorage() throws IgniteCheckedException { - checkpointMarkersStorage.initialize(); - } - - /** - * Wal truncate callBack. - * - * @param highBound WALPointer. - */ - public void removeCheckpointsUntil(WALPointer highBound) throws IgniteCheckedException { - checkpointMarkersStorage.removeCheckpointsUntil(highBound); - } - - /** - * Cleanup checkpoint directory from all temporary files. - */ - public void cleanupTempCheckpointDirectory() throws IgniteCheckedException { - checkpointMarkersStorage.cleanupTempCheckpointDirectory(); - } - - /** - * Clean checkpoint directory {@link CheckpointMarkersStorage#cpDir}. The operation is necessary when local node joined to - * baseline topology with different consistentId. - */ - public void cleanupCheckpointDirectory() throws IgniteCheckedException { - checkpointMarkersStorage.cleanupCheckpointDirectory(); - } - - /** * */ public Checkpointer getCheckpointer() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java index 7dd1e80..b88cad8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java @@ -196,7 +196,7 @@ public class MetaStorage implements CheckpointListener, ReadWriteMetastorage { /** {@inheritDoc} */ @Override public void beforeCheckpointBegin(Context ctx) { } - }); + }, dataRegion); } } } @@ -291,7 +291,7 @@ public class MetaStorage implements CheckpointListener, ReadWriteMetastorage { ); if (!readOnly) - ((GridCacheDatabaseSharedManager)db).addCheckpointListener(this); + ((GridCacheDatabaseSharedManager)db).addCheckpointListener(this, dataRegion); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointListenerForRegionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointListenerForRegionTest.java new file mode 100644 index 0000000..fb10775 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointListenerForRegionTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME; + +/** + * + */ +public class CheckpointListenerForRegionTest extends GridCommonAbstractTest { + /** This number show how many mandatory methods will be called on checkpoint listener during checkpoint. */ + private static final int CALLS_COUNT_PER_CHECKPOINT = 3; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + DataStorageConfiguration storageCfg = new DataStorageConfiguration(); + + storageCfg.setCheckpointFrequency(100_000); + storageCfg.getDefaultDataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(300L * 1024 * 1024); + + cfg.setDataStorageConfiguration(storageCfg) + .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction(false, 16))); + + return cfg; + } + + /** + * 1. Start the one node. + * 2. Configure the default cache. + * 3. Set the checkpoint listeners(for default region and for all regions) to watch the checkpoint. + * 4. Fill the data and trigger the checkpoint. + * 5. Expected: Both listeners should be called. + * 6. Remove the default region from the checkpoint. + * 7. Fill the data and trigger the checkpoint. + * 8. Expected: The only listener for all regions should be called. + * 9. Return default region back to the checkpoint. + * 10. Fill the data and trigger the checkpoint. + * 11. Expected: Both listeners should be called. + * + * @throws Exception if fail. + */ + @Test + public void testCheckpointListenersInvokedOnlyIfRegionConfigured() throws Exception { + //given: One started node with default cache. + IgniteEx ignite0 = startGrid(0); + + ignite0.cluster().active(true); + + IgniteCache<Integer, Object> cache = ignite0.cache(DEFAULT_CACHE_NAME); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)(ignite0.context().cache().context().database()); + + DataRegion defaultRegion = db.checkpointedDataRegions().stream() + .filter(region -> DFLT_DATA_REG_DEFAULT_NAME.equals(region.config().getName())) + .findFirst() + .orElse(null); + + assertNotNull("Expected default data region in checkpoint list is not found.", defaultRegion); + + //and: Configure the listeners(for default region and for all regions) for watching for checkpoint. + AtomicInteger checkpointListenerDefaultRegionCounter = checkpointListenerWatcher(db, defaultRegion); + AtomicInteger checkpointListenerAllRegionCounter = checkpointListenerWatcher(db, null); + + //when: Checkpoint happened. + fillDataAndCheckpoint(ignite0, cache); + + //then: Both listeners should be called. + assertEquals(CALLS_COUNT_PER_CHECKPOINT, checkpointListenerDefaultRegionCounter.get()); + assertEquals(CALLS_COUNT_PER_CHECKPOINT, checkpointListenerAllRegionCounter.get()); + + //Remove the default region from checkpoint. + db.checkpointedDataRegions().remove(defaultRegion); + + //when: Checkpoint happened. + fillDataAndCheckpoint(ignite0, cache); + + //then: Only listener for all regions should be called. + assertEquals(CALLS_COUNT_PER_CHECKPOINT, checkpointListenerDefaultRegionCounter.get()); + assertEquals(2 * CALLS_COUNT_PER_CHECKPOINT, checkpointListenerAllRegionCounter.get()); + + assertTrue( + "Expected default data region in all regions list is not found.", + db.dataRegions().stream().anyMatch(region -> DFLT_DATA_REG_DEFAULT_NAME.equals(region.config().getName())) + ); + + //Return default region back to the checkpoint. + db.checkpointedDataRegions().add(defaultRegion); + + //when: Checkpoint happened. + fillDataAndCheckpoint(ignite0, cache); + + //then: Both listeners should be called. + assertEquals(2 * CALLS_COUNT_PER_CHECKPOINT, checkpointListenerDefaultRegionCounter.get()); + assertEquals(3 * CALLS_COUNT_PER_CHECKPOINT, checkpointListenerAllRegionCounter.get()); + } + + /** + * Fill the data and trigger the checkpoint after that. + */ + private void fillDataAndCheckpoint( + IgniteEx ignite0, + IgniteCache<Integer, Object> cache + ) throws IgniteCheckedException { + for (int j = 0; j < 1024; j++) + cache.put(j, j); + + forceCheckpoint(ignite0); + } + + /** + * Add checkpoint listener which count the number of listener calls during each checkpoint. + * + * @param db Shared manager for manage the listeners. + * @param defaultRegion Region for which listener should be added. + * @return Integer which count the listener calls. + */ + @NotNull + private AtomicInteger checkpointListenerWatcher(GridCacheDatabaseSharedManager db, DataRegion defaultRegion) { + AtomicInteger checkpointListenerCounter = new AtomicInteger(); + + db.addCheckpointListener(new CheckpointListener() { + @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException { + checkpointListenerCounter.getAndIncrement(); + } + + @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { + checkpointListenerCounter.getAndIncrement(); + } + + @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException { + checkpointListenerCounter.getAndIncrement(); + } + }, defaultRegion); + return checkpointListenerCounter; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java index 51a4073..5d75b1f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java @@ -44,7 +44,7 @@ public class CheckpointStartLoggingTest extends GridCommonAbstractTest { "walCpRecordFsyncDuration=" + VALID_MS_PATTERN + ", " + "writeCheckpointEntryDuration=" + VALID_MS_PATTERN + ", " + "splitAndSortCpPagesDuration=" + VALID_MS_PATTERN + ", " + - ".* pages=[1-9][0-9]*, " + + ".*pages=[1-9][0-9]*, " + "reason=.*"; /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java new file mode 100644 index 0000000..85da92c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint; + +import java.io.File; +import java.nio.file.Paths; +import java.util.Arrays; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointState; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.METASTORE_DATA_REGION_NAME; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** + * + */ +public class LightweightCheckpointTest extends GridCommonAbstractTest { + /** Data region which should not be checkpointed. */ + public static final String NOT_CHECKPOINTED_REGION = "NotCheckpointedRegion"; + + /** Cache which should not be checkpointed. */ + public static final String NOT_CHECKPOINTED_CACHE = "notCheckpointedCache"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + +// cleanPersistenceDir(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + DataStorageConfiguration storageCfg = new DataStorageConfiguration(); + + storageCfg.setWalMode(WALMode.NONE); + storageCfg.setCheckpointFrequency(100_000); + storageCfg.setDataRegionConfigurations(new DataRegionConfiguration() + .setName(NOT_CHECKPOINTED_REGION) + .setPersistenceEnabled(true) + .setMaxSize(300L * 1024 * 1024) + + ); + storageCfg.getDefaultDataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(300L * 1024 * 1024); + + cfg.setDataStorageConfiguration(storageCfg) + + .setCacheConfiguration( + new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction(false, 16)) + .setDataRegionName(DFLT_DATA_REG_DEFAULT_NAME), + new CacheConfiguration<>(NOT_CHECKPOINTED_CACHE) + .setAffinity(new RendezvousAffinityFunction(false, 16)) + .setDataRegionName(NOT_CHECKPOINTED_REGION) + ); + + return cfg; + } + + /** + * 1. Start the one node with disabled WAL and with two caches. + * 2. Disable default checkpoint. + * 3. Create light checkpoint for one cache and configure checkpoint listener for it. + * 4. Fill the both caches. + * 5. Trigger the light checkpoint and wait for the finish. + * 6. Stop the node and start it again. + * 7. Expected: Cache which was checkpointed would have the all data meanwhile second cache would be empty. + * + * @throws Exception if fail. + */ + @Test + public void testLightCheckpointAbleToStoreOnlyGivenDataRegion() throws Exception { + //given: One started node with default cache and cache which won't be checkpointed. + IgniteEx ignite0 = startGrid(0); + ignite0.cluster().active(true); + + IgniteCache<Integer, Object> checkpointedCache = ignite0.cache(DEFAULT_CACHE_NAME); + IgniteCache<Integer, Object> notCheckpointedCache = ignite0.cache(NOT_CHECKPOINTED_CACHE); + + GridKernalContext context = ignite0.context(); + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)(context.cache().context().database()); + + waitForCondition(() -> !db.getCheckpointer().currentProgress().inProgress(), 10_000); + + //and: disable the default checkpoint. + db.enableCheckpoints(false); + + DataRegion regionForCheckpoint = db.dataRegion(DFLT_DATA_REG_DEFAULT_NAME); + + //and: Create light checkpoint with only one region. + LightweightCheckpointManager lightweightCheckpointManager = new LightweightCheckpointManager( + context::log, + context.igniteInstanceName(), + "light-test-checkpoint", + context.workersRegistry(), + context.config().getDataStorageConfiguration(), + () -> Arrays.asList(regionForCheckpoint), + grpId -> getPageMemoryForCacheGroup(grpId, db, context), + PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY, + context.cache().context().snapshot(), + db.persistentStoreMetricsImpl(), + context.longJvmPauseDetector(), + context.failure(), + context.cache(), + (FilePageStoreManager)context.cache().context().pageStore() + ); + + //and: Add checkpoint listener for DEFAULT_CACHE in order of storing the meta pages. + lightweightCheckpointManager.addCheckpointListener( + (CheckpointListener)context.cache().cacheGroup(groupIdForCache(ignite0, DEFAULT_CACHE_NAME)).offheap(), + regionForCheckpoint + ); + + lightweightCheckpointManager.start(); + + //when: Fill the caches + for (int j = 0; j < 1024; j++) { + checkpointedCache.put(j, j); + notCheckpointedCache.put(j, j); + } + + //and: Trigger and wait for the checkpoint. + lightweightCheckpointManager.forceCheckpoint("test", null) + .futureFor(CheckpointState.FINISHED) + .get(); + + //and: Stop and start node. + stopAllGrids(); + + ignite0 = startGrid(0); + ignite0.cluster().active(true); + + checkpointedCache = ignite0.cache(DEFAULT_CACHE_NAME); + notCheckpointedCache = ignite0.cache(NOT_CHECKPOINTED_CACHE); + + //then: Checkpointed cache should have all data meanwhile uncheckpointed cache should be empty. + for (int j = 1; j < 1024; j++) { + assertEquals(j, checkpointedCache.get(j)); + assertNull(notCheckpointedCache.get(j)); + } + + GridCacheDatabaseSharedManager db2 = (GridCacheDatabaseSharedManager) + (ignite0.context().cache().context().database()); + + waitForCondition(() -> !db2.getCheckpointer().currentProgress().inProgress(), 10_000); + + String nodeFolderName = ignite0.context().pdsFolderResolver().resolveFolders().folderName(); + File cpMarkersDir = Paths.get(U.defaultWorkDirectory(), "db", nodeFolderName, "cp").toFile(); + + //then: Expected only two pairs checkpoint markers - both from the start of node. + assertEquals(4, cpMarkersDir.listFiles().length); + } + + /** + * @return Page memory which corresponds to grpId. + */ + private PageMemoryEx getPageMemoryForCacheGroup( + int grpId, + GridCacheDatabaseSharedManager db, + GridKernalContext context + ) throws IgniteCheckedException { + if (grpId == MetaStorage.METASTORAGE_CACHE_ID) + return (PageMemoryEx)db.dataRegion(METASTORE_DATA_REGION_NAME).pageMemory(); + + if (grpId == TxLog.TX_LOG_CACHE_ID) + return (PageMemoryEx)db.dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory(); + + CacheGroupDescriptor desc = context.cache().cacheGroupDescriptors().get(grpId); + + if (desc == null) + return null; + + String memPlcName = desc.config().getDataRegionName(); + + return (PageMemoryEx)context.cache().context().database().dataRegion(memPlcName).pageMemory(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 11b9ba5..5510c4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -20,7 +20,6 @@ package org.apache.ignite.testsuites; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest; import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheStartStopWithFreqCheckpointTest; @@ -52,8 +51,10 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdown import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFreeListTest; +import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointListenerForRegionTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointStartLoggingTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest; +import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.LightweightCheckpointTest; import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.FsyncWalRolloverDoesNotBlockTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteLocalWalSizeTest; @@ -212,6 +213,8 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, IgnitePdsCorruptedStoreTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CheckpointFailBeforeWriteMarkTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CheckpointFreeListTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CheckpointListenerForRegionTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, LightweightCheckpointTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CheckpointStartLoggingTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, FreeListCachingTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteWalIteratorSwitchSegmentTest.class, ignoredTests);