This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d8ffc2b  IGNITE-13681 Lightweight checkpoint implementation, per data 
region checkpoint listeners. - Fixes #8433.
d8ffc2b is described below

commit d8ffc2b2991e262b65dfb9ee32fd818852cd5aaf
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Wed Nov 11 15:12:20 2020 +0300

    IGNITE-13681 Lightweight checkpoint implementation, per data region 
checkpoint listeners. - Fixes #8433.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../processors/cache/mvcc/txlog/TxLog.java         |   9 +-
 .../GridCacheDatabaseSharedManager.java            |  32 ++-
 .../cache/persistence/GridCacheOffheapManager.java |   2 +-
 .../IgniteCacheDatabaseSharedManager.java          |   4 +-
 .../persistence/checkpoint/CheckpointManager.java  |   7 +-
 .../checkpoint/CheckpointPagesWriter.java          |  21 +-
 .../checkpoint/CheckpointPagesWriterFactory.java   |  26 ++-
 .../persistence/checkpoint/CheckpointWorkflow.java |  92 +++++---
 .../cache/persistence/checkpoint/Checkpointer.java |  14 +-
 ...ager.java => LightweightCheckpointManager.java} | 134 +++---------
 .../cache/persistence/metastorage/MetaStorage.java |   4 +-
 .../CheckpointListenerForRegionTest.java           | 191 +++++++++++++++++
 .../db/checkpoint/CheckpointStartLoggingTest.java  |   2 +-
 .../db/checkpoint/LightweightCheckpointTest.java   | 231 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   5 +-
 15 files changed, 599 insertions(+), 175 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
index 8adb6b3..9b8c73b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
 import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
@@ -102,6 +103,8 @@ public class TxLog implements CheckpointListener {
 
         PageLockListener txLogLockLsnr = 
diagnosticMgr.pageLockTracker().createPageLockTracker(txLogName);
 
+        DataRegion txLogDataRegion = mgr.dataRegion(TX_LOG_CACHE_NAME);
+
         if (CU.isPersistenceEnabled(ctx.config())) {
             String txLogReuseListName = TX_LOG_CACHE_NAME + "##ReuseList";
             PageLockListener txLogReuseListLockLsnr = 
diagnosticMgr.pageLockTracker().createPageLockTracker(txLogReuseListName);
@@ -110,7 +113,7 @@ public class TxLog implements CheckpointListener {
 
             try {
                 IgniteWriteAheadLogManager wal = ctx.cache().context().wal();
-                PageMemoryEx pageMemory = 
(PageMemoryEx)mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory();
+                PageMemoryEx pageMemory = 
(PageMemoryEx)txLogDataRegion.pageMemory();
 
                 long metaId = pageMemory.metaPageId(TX_LOG_CACHE_ID);
                 long metaPage = pageMemory.acquirePage(TX_LOG_CACHE_ID, 
metaId);
@@ -195,14 +198,14 @@ public class TxLog implements CheckpointListener {
                     txLogLockLsnr
                 );
 
-                
((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this);
+                
((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this, 
txLogDataRegion);
             }
             finally {
                 mgr.checkpointReadUnlock();
             }
         }
         else {
-            PageMemory pageMemory = 
mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory();
+            PageMemory pageMemory = txLogDataRegion.pageMemory();
             ReuseList reuseList1 = mgr.reuseList(TX_LOG_CACHE_NAME);
 
             long treeRoot;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 37bd464..6a0ecb2 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -52,7 +52,6 @@ import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.ignite.DataRegionMetricsProvider;
 import org.apache.ignite.DataStorageMetrics;
 import org.apache.ignite.IgniteCheckedException;
@@ -136,6 +135,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDat
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridCountDownCallback;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.StripedExecutor;
@@ -317,6 +317,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Lock for releasing history for preloading. */
     private ReentrantLock releaseHistForPreloadingLock = new ReentrantLock();
 
+    /** Data regions which should be checkpointed. */
+    protected final Set<DataRegion> checkpointedDataRegions = new 
GridConcurrentHashSet<>();
+
     /**
      * @param ctx Kernal context.
      */
@@ -471,7 +474,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 persistenceCfg,
                 storeMgr,
                 this::isCheckpointInapplicableForWalRebalance,
-                this::dataRegions,
+                this::checkpointedDataRegions,
                 this::cacheGroupContexts,
                 this::getPageMemoryForCacheGroup,
                 resolveThrottlingPolicy(),
@@ -494,6 +497,11 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /** */
+    public Collection<DataRegion> checkpointedDataRegions() {
+        return checkpointedDataRegions;
+    }
+
+    /** */
     private Collection<CacheGroupContext> cacheGroupContexts() {
         return cctx.cache().cacheGroups();
     }
@@ -594,6 +602,16 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         fileLockHolder.close();
     }
 
+    /** {@inheritDoc} */
+    @Override public DataRegion addDataRegion(DataStorageConfiguration 
dataStorageCfg, DataRegionConfiguration dataRegionCfg,
+        boolean trackable) throws IgniteCheckedException {
+        DataRegion region = super.addDataRegion(dataStorageCfg, dataRegionCfg, 
trackable);
+
+        checkpointedDataRegions.add(region);
+
+        return region;
+    }
+
     /** */
     private void readMetastore() throws IgniteCheckedException {
         try {
@@ -1559,9 +1577,17 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
     /**
      * @param lsnr Listener.
+     * @param dataRegion Data region for which listener is corresponded to.
+     */
+    public void addCheckpointListener(CheckpointListener lsnr, DataRegion 
dataRegion) {
+        checkpointManager.addCheckpointListener(lsnr, dataRegion);
+    }
+
+    /**
+     * @param lsnr Listener.
      */
     public void addCheckpointListener(CheckpointListener lsnr) {
-        checkpointManager.addCheckpointListener(lsnr);
+        checkpointManager.addCheckpointListener(lsnr, null);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index fa4fabe..4bfda53 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -225,7 +225,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
         persStoreMetrics = databaseSharedManager.persistentStoreMetricsImpl();
 
-        databaseSharedManager.addCheckpointListener(this);
+        databaseSharedManager.addCheckpointListener(this, grp.dataRegion());
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 1d77566..42ad2a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -413,7 +413,7 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
      * @param dataRegionCfg Data region config.
      * @throws IgniteCheckedException If failed to initialize swap path.
      */
-    public void addDataRegion(
+    public DataRegion addDataRegion(
         DataStorageConfiguration dataStorageCfg,
         DataRegionConfiguration dataRegionCfg,
         boolean trackable
@@ -441,6 +441,8 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
         else if (dataRegionName.equals(DFLT_DATA_REG_DEFAULT_NAME))
             U.warn(log, "Data Region with name 'default' isn't used as a 
default. " +
                     "Please, check Data Region configuration.");
+
+        return region;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 415e2cf..2beac7b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -170,7 +170,7 @@ public class CheckpointManager {
 
         checkpointPagesWriterFactory = new CheckpointPagesWriterFactory(
             logger, snapshotMgr,
-            (fullPage, buf, tag) -> 
pageStoreManager.writeInternal(fullPage.groupId(), fullPage.pageId(), buf, tag, 
true),
+            (pageMemEx, fullPage, buf, tag) -> 
pageStoreManager.writeInternal(fullPage.groupId(), fullPage.pageId(), buf, tag, 
true),
             persStoreMetrics,
             throttlingPolicy, threadBuf,
             pageMemoryGroupResolver
@@ -231,9 +231,10 @@ public class CheckpointManager {
 
     /**
      * @param lsnr Listener.
+     * @param dataRegion Data region for which listener is corresponded to.
      */
-    public void addCheckpointListener(CheckpointListener lsnr) {
-        checkpointWorkflow.addCheckpointListener(lsnr);
+    public void addCheckpointListener(CheckpointListener lsnr, DataRegion 
dataRegion) {
+        checkpointWorkflow.addCheckpointListener(lsnr, dataRegion);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java
index 77f9e2e..79f774c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java
@@ -178,7 +178,7 @@ public class CheckpointPagesWriter implements Runnable {
 
         CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? 
this.tracker : null;
 
-        PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry);
+        Map<PageMemoryEx, PageStoreWriter> pageStoreWriters = new HashMap<>();
 
         ByteBuffer tmpWriteBuf = threadBuf.get();
 
@@ -201,6 +201,8 @@ public class CheckpointPagesWriter implements Runnable {
 
             tmpWriteBuf.rewind();
 
+            PageStoreWriter pageStoreWriter = 
pageStoreWriters.computeIfAbsent(pageMem, pageMemEx -> 
createPageStoreWriter(pageMemEx, pagesToRetry));
+
             pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, 
tracker);
 
             if (throttlingEnabled) {
@@ -227,18 +229,20 @@ public class CheckpointPagesWriter implements Runnable {
     /**
      * Factory method for create {@link PageStoreWriter}.
      *
+     * @param pageMemEx
      * @param pagesToRetry List pages for retry.
      * @return Checkpoint page write context.
      */
-    private PageStoreWriter createPageStoreWriter(Map<PageMemoryEx, 
List<FullPageId>> pagesToRetry) {
+    private PageStoreWriter createPageStoreWriter(
+        PageMemoryEx pageMemEx,
+        Map<PageMemoryEx, List<FullPageId>> pagesToRetry
+    ) {
         return new PageStoreWriter() {
             /** {@inheritDoc} */
             @Override public void writePage(FullPageId fullPageId, ByteBuffer 
buf,
                 int tag) throws IgniteCheckedException {
                 if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
-                    PageMemoryEx pageMem = 
pageMemoryGroupResolver.apply(fullPageId.groupId());
-
-                    pagesToRetry.computeIfAbsent(pageMem, k -> new 
ArrayList<>()).add(fullPageId);
+                    pagesToRetry.computeIfAbsent(pageMemEx, k -> new 
ArrayList<>()).add(fullPageId);
 
                     return;
                 }
@@ -258,7 +262,7 @@ public class CheckpointPagesWriter implements Runnable {
 
                 curCpProgress.updateWrittenPages(1);
 
-                PageStore store = pageWriter.write(fullPageId, buf, tag);
+                PageStore store = pageWriter.write(pageMemEx, fullPageId, buf, 
tag);
 
                 updStores.computeIfAbsent(store, k -> new 
LongAdder()).increment();
             }
@@ -268,12 +272,15 @@ public class CheckpointPagesWriter implements Runnable {
     /** Interface which allows to write one page to page store. */
     public interface CheckpointPageWriter {
         /**
+         *
+         * @param pageMemEx Page memory from which page should be written.
          * @param fullPageId Full page id.
          * @param buf Byte buffer.
          * @param tag Page tag.
          * @return {@link PageStore} which was used to write.
          * @throws IgniteCheckedException if fail.
          */
-        PageStore write(FullPageId fullPageId, ByteBuffer buf, int tag) throws 
IgniteCheckedException;
+        PageStore write(PageMemoryEx pageMemEx, FullPageId fullPageId, 
ByteBuffer buf, int tag)
+            throws IgniteCheckedException;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java
index eb3607b..8c882e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -19,6 +19,8 @@ package 
org.apache.ignite.internal.processors.cache.persistence.checkpoint;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
@@ -144,22 +146,13 @@ public class CheckpointPagesWriterFactory {
         AtomicInteger cpPagesCnt
     ) {
         return () -> {
-            PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
-                assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by 
other thread for page " + fullPageId;
-
-                // Write buf to page store.
-                PageStore store = checkpointPageWriter.write(fullPageId, buf, 
tag);
-
-                // Save store for future fsync.
-                updStores.add(store);
-            };
-
             GridConcurrentMultiPairQueue.Result<PageMemoryEx, FullPageId> res =
                 new GridConcurrentMultiPairQueue.Result<>();
 
             int pagesWritten = 0;
             ByteBuffer tmpWriteBuf = threadBuf.get();
 
+            Map<PageMemoryEx, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
             try {
                 while (pages.next(res)) {
                     // Fail-fast break if some exception occurred.
@@ -168,6 +161,19 @@ public class CheckpointPagesWriterFactory {
 
                     PageMemoryEx pageMem = res.getKey();
 
+                    PageStoreWriter pageStoreWriter = 
pageStoreWriters.computeIfAbsent(
+                        pageMem,
+                        (pageMemEx) -> (fullPageId, buf, tag) -> {
+                            assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock 
is held by other thread for page " + fullPageId;
+
+                            // Write buf to page store.
+                            PageStore store = 
checkpointPageWriter.write(pageMemEx, fullPageId, buf, tag);
+
+                            // Save store for future fsync.
+                            updStores.add(store);
+                        }
+                    );
+
                     // Write page content to page store via pageStoreWriter.
                     // Tracker is null, because no need to track checkpoint 
metrics on recovery.
                     pageMem.checkpointWritePage(res.getValue(), tmpWriteBuf, 
pageStoreWriter, null);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
index 6a97a18..93b7fed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
@@ -39,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
@@ -74,7 +74,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedHashMap;
 
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
@@ -106,6 +108,9 @@ public class CheckpointWorkflow {
     /** @see IgniteSystemProperties#CHECKPOINT_PARALLEL_SORT_THRESHOLD */
     public static final int DFLT_CHECKPOINT_PARALLEL_SORT_THRESHOLD = 512 * 
1024;
 
+    /****/
+    private static final DataRegion NO_REGION = new DataRegion(null, null, 
null, null);
+
     /**
      * Starting from this number of dirty pages in checkpoint, array will be 
sorted with {@link
      * Arrays#parallelSort(Comparable[])} in case of {@link 
CheckpointWriteOrder#SEQUENTIAL}.
@@ -144,7 +149,7 @@ public class CheckpointWorkflow {
     private final CheckpointWriteOrder checkpointWriteOrder;
 
     /** Collections of checkpoint listeners. */
-    private final Collection<CheckpointListener> lsnrs = new 
CopyOnWriteArrayList<>();
+    private final Map<CheckpointListener, DataRegion> lsnrs = new 
ConcurrentLinkedHashMap<>();
 
     /** Ignite instance name. */
     private final String igniteInstanceName;
@@ -228,7 +233,9 @@ public class CheckpointWorkflow {
         CheckpointMetricsTracker tracker,
         WorkProgressDispatcher workProgressDispatcher
     ) throws IgniteCheckedException {
-        List<CheckpointListener> dbLsnrs = new ArrayList<>(lsnrs);
+        Collection<DataRegion> checkpointedRegions = dataRegions.get();
+
+        List<CheckpointListener> dbLsnrs = 
getRelevantCheckpointListeners(checkpointedRegions);
 
         CheckpointRecord cpRec = new CheckpointRecord(memoryRecoveryRecordPtr);
 
@@ -283,7 +290,7 @@ public class CheckpointWorkflow {
             fillCacheGroupState(cpRec);
 
             //There are allowable to replace pages only after checkpoint entry 
was stored to disk.
-            cpPagesHolder = beginAllCheckpoints(dataRegions.get(), 
curr.futureFor(MARKER_STORED_TO_DISK));
+            cpPagesHolder = beginAllCheckpoints(checkpointedRegions, 
curr.futureFor(MARKER_STORED_TO_DISK));
 
             curr.currentCheckpointPagesCount(cpPagesHolder.pagesNum());
 
@@ -293,7 +300,8 @@ public class CheckpointWorkflow {
 
             if (dirtyPagesCount > 0 || curr.nextSnapshot() || 
hasPartitionsToDestroy) {
                 // No page updates for this checkpoint are allowed from now on.
-                cpPtr = wal.log(cpRec);
+                if (wal != null)
+                    cpPtr = wal.log(cpRec);
 
                 if (cpPtr == null)
                     cpPtr = CheckpointStatus.NULL_PTR;
@@ -326,18 +334,22 @@ public class CheckpointWorkflow {
             tracker.onWalCpRecordFsyncStart();
 
             // Sync log outside the checkpoint write lock.
-            wal.flush(cpPtr, true);
+            if (wal != null)
+                wal.flush(cpPtr, true);
 
             tracker.onWalCpRecordFsyncEnd();
 
-            CheckpointEntry checkpointEntry = 
checkpointMarkersStorage.writeCheckpointEntry(
-                cpTs,
-                cpRec.checkpointId(),
-                cpPtr,
-                cpRec,
-                CheckpointEntryType.START,
-                skipSync
-            );
+            CheckpointEntry checkpointEntry = null;
+
+            if (checkpointMarkersStorage != null)
+                checkpointEntry = 
checkpointMarkersStorage.writeCheckpointEntry(
+                    cpTs,
+                    cpRec.checkpointId(),
+                    cpPtr,
+                    cpRec,
+                    CheckpointEntryType.START,
+                    skipSync
+                );
 
             curr.transitTo(MARKER_STORED_TO_DISK);
 
@@ -351,7 +363,7 @@ public class CheckpointWorkflow {
             return new Checkpoint(checkpointEntry, cpPages, curr);
         }
         else {
-            if (curr.nextSnapshot())
+            if (curr.nextSnapshot() && wal != null)
                 wal.flush(null, true);
 
             return new Checkpoint(null, GridConcurrentMultiPairQueue.EMPTY, 
curr);
@@ -563,23 +575,28 @@ public class CheckpointWorkflow {
         }
 
         if (chp.hasDelta()) {
-            checkpointMarkersStorage.writeCheckpointEntry(
-                chp.cpEntry.timestamp(),
-                chp.cpEntry.checkpointId(),
-                chp.cpEntry.checkpointMark(),
-                null,
-                CheckpointEntryType.END,
-                skipSync
-            );
-
-            wal.notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
+            if (checkpointMarkersStorage != null)
+                checkpointMarkersStorage.writeCheckpointEntry(
+                    chp.cpEntry.timestamp(),
+                    chp.cpEntry.checkpointId(),
+                    chp.cpEntry.checkpointMark(),
+                    null,
+                    CheckpointEntryType.END,
+                    skipSync
+                );
+
+            if (wal != null)
+                wal.notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
         }
 
-        checkpointMarkersStorage.onCheckpointFinished(chp);
+        if (checkpointMarkersStorage != null)
+            checkpointMarkersStorage.onCheckpointFinished(chp);
 
         CheckpointContextImpl emptyCtx = new 
CheckpointContextImpl(chp.progress, null, null, null);
 
-        List<CheckpointListener> dbLsnrs = new ArrayList<>(lsnrs);
+        Collection<DataRegion> checkpointedRegions = dataRegions.get();
+
+        List<CheckpointListener> dbLsnrs = 
getRelevantCheckpointListeners(checkpointedRegions);
 
         for (CheckpointListener lsnr : dbLsnrs)
             lsnr.afterCheckpointEnd(emptyCtx);
@@ -588,6 +605,17 @@ public class CheckpointWorkflow {
     }
 
     /**
+     * @param checkpointedRegions Regions which will be checkpointed.
+     * @return Checkpoint listeners which should be handled.
+     */
+    @NotNull private List<CheckpointListener> 
getRelevantCheckpointListeners(Collection<DataRegion> checkpointedRegions) {
+        return lsnrs.entrySet().stream()
+            .filter(entry -> entry.getValue() == NO_REGION || 
checkpointedRegions.contains(entry.getValue()))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+    }
+
+    /**
      * This method makes sense if node was stopped during the checkpoint(Start 
marker was written to disk while end
      * marker are not). It is able to write all pages to disk and create end 
marker.
      *
@@ -687,10 +715,13 @@ public class CheckpointWorkflow {
     }
 
     /**
+     * Adding the listener which will be called only when given data region 
will be checkpointed.
+     *
      * @param lsnr Listener.
+     * @param dataRegion Data region for which listener is corresponded to.
      */
-    public void addCheckpointListener(CheckpointListener lsnr) {
-        lsnrs.add(lsnr);
+    public void addCheckpointListener(CheckpointListener lsnr, DataRegion 
dataRegion) {
+        lsnrs.put(lsnr, dataRegion == null ? NO_REGION : dataRegion);
     }
 
     /**
@@ -720,7 +751,8 @@ public class CheckpointWorkflow {
             checkpointCollectPagesInfoPool = null;
         }
 
-        lsnrs.clear();
+        for (CheckpointListener lsnr : lsnrs.keySet())
+            lsnrs.remove(lsnr);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index c0368f4..9e39dfe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -106,7 +106,8 @@ public class Checkpointer extends GridWorker {
         "walCpRecordFsyncDuration=%dms, " +
         "writeCheckpointEntryDuration=%dms, " +
         "splitAndSortCpPagesDuration=%dms, " +
-        "%s pages=%d, " +
+        "%s" +
+        "pages=%d, " +
         "reason='%s']";
 
     /** Skip sync. */
@@ -408,8 +409,8 @@ public class Checkpointer extends GridWorker {
                         log.info(
                             String.format(
                                 CHECKPOINT_STARTED_LOG_FORMAT,
-                                chp.cpEntry.checkpointId(),
-                                chp.cpEntry.checkpointMark(),
+                                chp.cpEntry == null ? "" : 
chp.cpEntry.checkpointId(),
+                                chp.cpEntry == null ? "" : 
chp.cpEntry.checkpointMark(),
                                 tracker.beforeLockDuration(),
                                 tracker.lockWaitDuration(),
                                 tracker.listenersExecuteDuration(),
@@ -417,7 +418,7 @@ public class Checkpointer extends GridWorker {
                                 tracker.walCpRecordFsyncDuration(),
                                 tracker.writeCheckpointEntryDuration(),
                                 tracker.splitAndSortCpPagesDuration(),
-                                possibleJvmPauseDur > 0 ? 
"possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "",
+                                possibleJvmPauseDur > 0 ? 
"possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "",
                                 chp.pagesSize,
                                 chp.progress.reason()
                             )
@@ -455,7 +456,7 @@ public class Checkpointer extends GridWorker {
 
             if (chp.hasDelta() || destroyedPartitionsCnt > 0) {
                 if (log.isInfoEnabled()) {
-                    String walSegsCoveredMsg = 
prepareWalSegsCoveredMsg(chp.walSegsCoveredRange);
+                    String walSegsCoveredMsg = chp.walSegsCoveredRange == null 
? "" : prepareWalSegsCoveredMsg(chp.walSegsCoveredRange);
 
                     log.info(String.format("Checkpoint finished [cpId=%s, 
pages=%d, markPos=%s, " +
                             "walSegmentsCleared=%d, walSegmentsCovered=%s, 
markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
@@ -852,6 +853,9 @@ public class Checkpointer extends GridWorker {
      * Restart worker in IgniteThread.
      */
     public void start() {
+        if (runner() != null)
+            return;
+
         assert runner() == null : "Checkpointer is running.";
 
         new IgniteThread(this).start();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
similarity index 70%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
index 415e2cf..7a329f7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
 
-import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -30,12 +30,10 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.LongJVMPauseDetector;
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
-import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
@@ -43,7 +41,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCa
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.util.StripedExecutor;
-import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
 import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
 import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -52,32 +49,25 @@ import org.jetbrains.annotations.Nullable;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
 
 /**
- * Main class to abstract checkpoint-related processes and actions and hide 
them from higher-level components.
- * Implements default checkpointing algorithm which is sharp checkpoint but 
can be replaced
- * by other implementations if needed.
- * Represents only an intermediate step in refactoring of checkpointing 
component and may change in the future.
+ * Like a sharp checkpoint algorithm implemented in {@link CheckpointManager} 
this checkpoint ensures that
+ * all pages marked dirty under {@link #checkpointTimeoutLock()} will be 
consistently saved to disk.
  *
- * This checkpoint ensures that all pages marked as
- * dirty under {@link #checkpointTimeoutLock ()} will be consistently saved to 
disk.
+ * But unlike {@link CheckpointManager} lightweight checkpoint doesn't store 
any checkpoint markers to disk
+ * nor write cp-related records to WAL log.
  *
- * Configuration of this checkpoint allows the following:
- * <p>Collecting all pages from configured dataRegions which was marked as 
dirty under {@link #checkpointTimeoutLock
- * ()}.</p> *
- * <p>Marking the start of checkpoint in WAL and on disk.</p>
- * <p>Notifying the subscribers of different checkpoint states through {@link 
CheckpointListener}.</p> *
- * <p>Synchronizing collected pages with disk using {@link 
FilePageStoreManager}.</p>
- * <p>Restoring memory in consistent state if the node failed in the middle of 
checkpoint.</p>
+ * This allows to use it in situations where no recovery is needed after crush 
in the middle of checkpoint
+ * but work can simply be replayed from the beginning.
+ *
+ * Such situations include defragmentation and node recovery after crush
+ * (regular sharp checkpoint cannot be used during recovery).
  */
-public class CheckpointManager {
+public class LightweightCheckpointManager {
     /** Checkpoint worker. */
     private volatile Checkpointer checkpointer;
 
     /** Main checkpoint steps. */
     private final CheckpointWorkflow checkpointWorkflow;
 
-    /** Checkpoint markers storage which mark the start and end of each 
checkpoint. */
-    private final CheckpointMarkersStorage checkpointMarkersStorage;
-
     /** Timeout checkpoint lock which should be used while write to memory 
happened. */
     final CheckpointTimeoutLock checkpointTimeoutLock;
 
@@ -91,13 +81,9 @@ public class CheckpointManager {
      * @param logger Logger producer.
      * @param igniteInstanceName Ignite instance name.
      * @param checkpointThreadName Name of main checkpoint thread.
-     * @param wal Write ahead log manager.
      * @param workersRegistry Workers registry.
      * @param persistenceCfg Persistence configuration.
-     * @param pageStoreManager File page store manager.
-     * @param checkpointInapplicableChecker Checker of checkpoints.
      * @param dataRegions Data regions.
-     * @param cacheGroupContexts Cache group contexts.
      * @param pageMemoryGroupResolver Page memory resolver.
      * @param throttlingPolicy Throttling policy.
      * @param snapshotMgr Snapshot manager.
@@ -107,52 +93,33 @@ public class CheckpointManager {
      * @param cacheProcessor Cache processor.
      * @throws IgniteCheckedException if fail.
      */
-    public CheckpointManager(
+    public LightweightCheckpointManager(
         Function<Class<?>, IgniteLogger> logger,
         String igniteInstanceName,
         String checkpointThreadName,
-        IgniteWriteAheadLogManager wal,
         WorkersRegistry workersRegistry,
         DataStorageConfiguration persistenceCfg,
-        FilePageStoreManager pageStoreManager,
-        IgniteThrowableBiPredicate<Long, Integer> 
checkpointInapplicableChecker,
         Supplier<Collection<DataRegion>> dataRegions,
-        Supplier<Collection<CacheGroupContext>> cacheGroupContexts,
         IgniteThrowableFunction<Integer, PageMemoryEx> pageMemoryGroupResolver,
         PageMemoryImpl.ThrottlingPolicy throttlingPolicy,
         IgniteCacheSnapshotManager snapshotMgr,
         DataStorageMetricsImpl persStoreMetrics,
         LongJVMPauseDetector longJvmPauseDetector,
         FailureProcessor failureProcessor,
-        GridCacheProcessor cacheProcessor
+        GridCacheProcessor cacheProcessor,
+        FilePageStoreManager pageStoreManager
     ) throws IgniteCheckedException {
-        CheckpointHistory cpHistory = new CheckpointHistory(
-            persistenceCfg,
-            logger,
-            wal,
-            checkpointInapplicableChecker
-        );
-
-        FileIOFactory ioFactory = persistenceCfg.getFileIOFactory();
-
-        checkpointMarkersStorage = new CheckpointMarkersStorage(
-            logger,
-            cpHistory,
-            ioFactory,
-            pageStoreManager.workDir().getAbsolutePath()
-        );
-
         CheckpointReadWriteLock lock = new CheckpointReadWriteLock(logger);
 
         checkpointWorkflow = new CheckpointWorkflow(
             logger,
-            wal,
+            null,
             snapshotMgr,
-            checkpointMarkersStorage,
+            null,
             lock,
             persistenceCfg.getCheckpointWriteOrder(),
             dataRegions,
-            cacheGroupContexts,
+            Collections::emptyList,
             persistenceCfg.getCheckpointThreads(),
             igniteInstanceName
         );
@@ -169,10 +136,13 @@ public class CheckpointManager {
         };
 
         checkpointPagesWriterFactory = new CheckpointPagesWriterFactory(
-            logger, snapshotMgr,
-            (fullPage, buf, tag) -> 
pageStoreManager.writeInternal(fullPage.groupId(), fullPage.pageId(), buf, tag, 
true),
+            logger,
+            snapshotMgr,
+            (pageMemEx, fullPage, buf, tag) ->
+                pageStoreManager.writeInternal(fullPage.groupId(), 
fullPage.pageId(), buf, tag, true),
             persStoreMetrics,
-            throttlingPolicy, threadBuf,
+            throttlingPolicy,
+            threadBuf,
             pageMemoryGroupResolver
         );
 
@@ -231,9 +201,10 @@ public class CheckpointManager {
 
     /**
      * @param lsnr Listener.
+     * @param dataRegion
      */
-    public void addCheckpointListener(CheckpointListener lsnr) {
-        checkpointWorkflow.addCheckpointListener(lsnr);
+    public void addCheckpointListener(CheckpointListener lsnr, DataRegion 
dataRegion) {
+        checkpointWorkflow.addCheckpointListener(lsnr, dataRegion);
     }
 
     /**
@@ -251,21 +222,6 @@ public class CheckpointManager {
     }
 
     /**
-     * @return Checkpoint directory.
-     */
-    public File checkpointDirectory() {
-        return checkpointMarkersStorage.cpDir;
-    }
-
-    /**
-     * @return Read checkpoint status.
-     * @throws IgniteCheckedException If failed to read checkpoint status page.
-     */
-    public CheckpointStatus readCheckpointStatus() throws 
IgniteCheckedException {
-        return checkpointMarkersStorage.readCheckpointStatus();
-    }
-
-    /**
      * Start the new checkpoint immediately.
      *
      * @param reason Reason.
@@ -285,44 +241,6 @@ public class CheckpointManager {
     }
 
     /**
-     * @return Checkpoint history.
-     */
-    public CheckpointHistory checkpointHistory() {
-        return checkpointMarkersStorage.history();
-    }
-
-    /**
-     * Initialize checkpoint storage.
-     */
-    public void initializeStorage() throws IgniteCheckedException {
-        checkpointMarkersStorage.initialize();
-    }
-
-    /**
-     * Wal truncate callBack.
-     *
-     * @param highBound WALPointer.
-     */
-    public void removeCheckpointsUntil(WALPointer highBound) throws 
IgniteCheckedException {
-        checkpointMarkersStorage.removeCheckpointsUntil(highBound);
-    }
-
-    /**
-     * Cleanup checkpoint directory from all temporary files.
-     */
-    public void cleanupTempCheckpointDirectory() throws IgniteCheckedException 
{
-        checkpointMarkersStorage.cleanupTempCheckpointDirectory();
-    }
-
-    /**
-     * Clean checkpoint directory {@link CheckpointMarkersStorage#cpDir}. The 
operation is necessary when local node joined to
-     * baseline topology with different consistentId.
-     */
-    public void cleanupCheckpointDirectory() throws IgniteCheckedException {
-        checkpointMarkersStorage.cleanupCheckpointDirectory();
-    }
-
-    /**
      *
      */
     public Checkpointer getCheckpointer() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index 7dd1e80..b88cad8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -196,7 +196,7 @@ public class MetaStorage implements CheckpointListener, 
ReadWriteMetastorage {
                     /** {@inheritDoc} */
                     @Override public void beforeCheckpointBegin(Context ctx) {
                     }
-                });
+                }, dataRegion);
             }
         }
     }
@@ -291,7 +291,7 @@ public class MetaStorage implements CheckpointListener, 
ReadWriteMetastorage {
             );
 
             if (!readOnly)
-                
((GridCacheDatabaseSharedManager)db).addCheckpointListener(this);
+                
((GridCacheDatabaseSharedManager)db).addCheckpointListener(this, dataRegion);
         }
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointListenerForRegionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointListenerForRegionTest.java
new file mode 100644
index 0000000..fb10775
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointListenerForRegionTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Test;
+
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME;
+
+/**
+ *
+ */
+public class CheckpointListenerForRegionTest extends GridCommonAbstractTest {
+    /** This number show how many mandatory methods will be called on 
checkpoint listener during checkpoint. */
+    private static final int CALLS_COUNT_PER_CHECKPOINT = 3;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
+
+        storageCfg.setCheckpointFrequency(100_000);
+        storageCfg.getDefaultDataRegionConfiguration()
+            .setPersistenceEnabled(true)
+            .setMaxSize(300L * 1024 * 1024);
+
+        cfg.setDataStorageConfiguration(storageCfg)
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAffinity(new RendezvousAffinityFunction(false, 16)));
+
+        return cfg;
+    }
+
+    /**
+     * 1. Start the one node.
+     * 2. Configure the default cache.
+     * 3. Set the checkpoint listeners(for default region and for all regions) 
to watch the checkpoint.
+     * 4. Fill the data and trigger the checkpoint.
+     * 5. Expected: Both listeners should be called.
+     * 6. Remove the default region from the checkpoint.
+     * 7. Fill the data and trigger the checkpoint.
+     * 8. Expected: The only listener for all regions should be called.
+     * 9. Return default region back to the checkpoint.
+     * 10. Fill the data and trigger the checkpoint.
+     * 11. Expected: Both listeners should be called.
+     *
+     * @throws Exception if fail.
+     */
+    @Test
+    public void testCheckpointListenersInvokedOnlyIfRegionConfigured() throws 
Exception {
+        //given: One started node with default cache.
+        IgniteEx ignite0 = startGrid(0);
+
+        ignite0.cluster().active(true);
+
+        IgniteCache<Integer, Object> cache = ignite0.cache(DEFAULT_CACHE_NAME);
+
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)(ignite0.context().cache().context().database());
+
+        DataRegion defaultRegion = db.checkpointedDataRegions().stream()
+            .filter(region -> 
DFLT_DATA_REG_DEFAULT_NAME.equals(region.config().getName()))
+            .findFirst()
+            .orElse(null);
+
+        assertNotNull("Expected default data region in checkpoint list is not 
found.", defaultRegion);
+
+        //and: Configure the listeners(for default region and for all regions) 
for watching for checkpoint.
+        AtomicInteger checkpointListenerDefaultRegionCounter = 
checkpointListenerWatcher(db, defaultRegion);
+        AtomicInteger checkpointListenerAllRegionCounter = 
checkpointListenerWatcher(db, null);
+
+        //when: Checkpoint happened.
+        fillDataAndCheckpoint(ignite0, cache);
+
+        //then: Both listeners should be called.
+        assertEquals(CALLS_COUNT_PER_CHECKPOINT, 
checkpointListenerDefaultRegionCounter.get());
+        assertEquals(CALLS_COUNT_PER_CHECKPOINT, 
checkpointListenerAllRegionCounter.get());
+
+        //Remove the default region from checkpoint.
+        db.checkpointedDataRegions().remove(defaultRegion);
+
+        //when: Checkpoint happened.
+        fillDataAndCheckpoint(ignite0, cache);
+
+        //then: Only listener for all regions should be called.
+        assertEquals(CALLS_COUNT_PER_CHECKPOINT, 
checkpointListenerDefaultRegionCounter.get());
+        assertEquals(2 * CALLS_COUNT_PER_CHECKPOINT, 
checkpointListenerAllRegionCounter.get());
+
+        assertTrue(
+            "Expected default data region in all regions list is not found.",
+            db.dataRegions().stream().anyMatch(region -> 
DFLT_DATA_REG_DEFAULT_NAME.equals(region.config().getName()))
+        );
+
+        //Return default region back to the checkpoint.
+        db.checkpointedDataRegions().add(defaultRegion);
+
+        //when: Checkpoint happened.
+        fillDataAndCheckpoint(ignite0, cache);
+
+        //then: Both listeners should be called.
+        assertEquals(2 * CALLS_COUNT_PER_CHECKPOINT, 
checkpointListenerDefaultRegionCounter.get());
+        assertEquals(3 * CALLS_COUNT_PER_CHECKPOINT, 
checkpointListenerAllRegionCounter.get());
+    }
+
+    /**
+     * Fill the data and trigger the checkpoint after that.
+     */
+    private void fillDataAndCheckpoint(
+        IgniteEx ignite0,
+        IgniteCache<Integer, Object> cache
+    ) throws IgniteCheckedException {
+        for (int j = 0; j < 1024; j++)
+            cache.put(j, j);
+
+        forceCheckpoint(ignite0);
+    }
+
+    /**
+     * Add checkpoint listener which count the number of listener calls during 
each checkpoint.
+     *
+     * @param db Shared manager for manage the listeners.
+     * @param defaultRegion Region for which listener should be added.
+     * @return Integer which count the listener calls.
+     */
+    @NotNull
+    private AtomicInteger 
checkpointListenerWatcher(GridCacheDatabaseSharedManager db, DataRegion 
defaultRegion) {
+        AtomicInteger checkpointListenerCounter = new AtomicInteger();
+
+        db.addCheckpointListener(new CheckpointListener() {
+            @Override public void onMarkCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                checkpointListenerCounter.getAndIncrement();
+            }
+
+            @Override public void onCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                checkpointListenerCounter.getAndIncrement();
+            }
+
+            @Override public void beforeCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+                checkpointListenerCounter.getAndIncrement();
+            }
+        }, defaultRegion);
+        return checkpointListenerCounter;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java
index 51a4073..5d75b1f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointStartLoggingTest.java
@@ -44,7 +44,7 @@ public class CheckpointStartLoggingTest extends 
GridCommonAbstractTest {
         "walCpRecordFsyncDuration=" + VALID_MS_PATTERN + ", " +
         "writeCheckpointEntryDuration=" + VALID_MS_PATTERN + ", " +
         "splitAndSortCpPagesDuration=" + VALID_MS_PATTERN + ", " +
-        ".* pages=[1-9][0-9]*, " +
+        ".*pages=[1-9][0-9]*, " +
         "reason=.*";
 
     /** */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java
new file mode 100644
index 0000000..85da92c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.METASTORE_DATA_REGION_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+public class LightweightCheckpointTest extends GridCommonAbstractTest {
+    /** Data region which should not be checkpointed. */
+    public static final String NOT_CHECKPOINTED_REGION = 
"NotCheckpointedRegion";
+
+    /** Cache which should not be checkpointed. */
+    public static final String NOT_CHECKPOINTED_CACHE = "notCheckpointedCache";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+//        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
+
+        storageCfg.setWalMode(WALMode.NONE);
+        storageCfg.setCheckpointFrequency(100_000);
+        storageCfg.setDataRegionConfigurations(new DataRegionConfiguration()
+            .setName(NOT_CHECKPOINTED_REGION)
+            .setPersistenceEnabled(true)
+            .setMaxSize(300L * 1024 * 1024)
+
+        );
+        storageCfg.getDefaultDataRegionConfiguration()
+            .setPersistenceEnabled(true)
+            .setMaxSize(300L * 1024 * 1024);
+
+        cfg.setDataStorageConfiguration(storageCfg)
+
+            .setCacheConfiguration(
+                new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                    .setAffinity(new RendezvousAffinityFunction(false, 16))
+                    .setDataRegionName(DFLT_DATA_REG_DEFAULT_NAME),
+                new CacheConfiguration<>(NOT_CHECKPOINTED_CACHE)
+                    .setAffinity(new RendezvousAffinityFunction(false, 16))
+                    .setDataRegionName(NOT_CHECKPOINTED_REGION)
+            );
+
+        return cfg;
+    }
+
+    /**
+     * 1. Start the one node with disabled WAL and with two caches.
+     * 2. Disable default checkpoint.
+     * 3. Create light checkpoint for one cache and configure checkpoint 
listener for it.
+     * 4. Fill the both caches.
+     * 5. Trigger the light checkpoint and wait for the finish.
+     * 6. Stop the node and start it again.
+     * 7. Expected: Cache which was checkpointed would have the all data 
meanwhile second cache would be empty.
+     *
+     * @throws Exception if fail.
+     */
+    @Test
+    public void testLightCheckpointAbleToStoreOnlyGivenDataRegion() throws 
Exception {
+        //given: One started node with default cache and cache which won't be 
checkpointed.
+        IgniteEx ignite0 = startGrid(0);
+        ignite0.cluster().active(true);
+
+        IgniteCache<Integer, Object> checkpointedCache = 
ignite0.cache(DEFAULT_CACHE_NAME);
+        IgniteCache<Integer, Object> notCheckpointedCache = 
ignite0.cache(NOT_CHECKPOINTED_CACHE);
+
+        GridKernalContext context = ignite0.context();
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)(context.cache().context().database());
+
+        waitForCondition(() -> 
!db.getCheckpointer().currentProgress().inProgress(), 10_000);
+
+        //and: disable the default checkpoint.
+        db.enableCheckpoints(false);
+
+        DataRegion regionForCheckpoint = 
db.dataRegion(DFLT_DATA_REG_DEFAULT_NAME);
+
+        //and: Create light checkpoint with only one region.
+        LightweightCheckpointManager lightweightCheckpointManager = new 
LightweightCheckpointManager(
+            context::log,
+            context.igniteInstanceName(),
+            "light-test-checkpoint",
+            context.workersRegistry(),
+            context.config().getDataStorageConfiguration(),
+            () -> Arrays.asList(regionForCheckpoint),
+            grpId -> getPageMemoryForCacheGroup(grpId, db, context),
+            PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY,
+            context.cache().context().snapshot(),
+            db.persistentStoreMetricsImpl(),
+            context.longJvmPauseDetector(),
+            context.failure(),
+            context.cache(),
+            (FilePageStoreManager)context.cache().context().pageStore()
+        );
+
+        //and: Add checkpoint listener for DEFAULT_CACHE in order of storing 
the meta pages.
+        lightweightCheckpointManager.addCheckpointListener(
+            
(CheckpointListener)context.cache().cacheGroup(groupIdForCache(ignite0, 
DEFAULT_CACHE_NAME)).offheap(),
+            regionForCheckpoint
+        );
+
+        lightweightCheckpointManager.start();
+
+        //when: Fill the caches
+        for (int j = 0; j < 1024; j++) {
+            checkpointedCache.put(j, j);
+            notCheckpointedCache.put(j, j);
+        }
+
+        //and: Trigger and wait for the checkpoint.
+        lightweightCheckpointManager.forceCheckpoint("test", null)
+            .futureFor(CheckpointState.FINISHED)
+            .get();
+
+        //and: Stop and start node.
+        stopAllGrids();
+
+        ignite0 = startGrid(0);
+        ignite0.cluster().active(true);
+
+        checkpointedCache = ignite0.cache(DEFAULT_CACHE_NAME);
+        notCheckpointedCache = ignite0.cache(NOT_CHECKPOINTED_CACHE);
+
+        //then: Checkpointed cache should have all data meanwhile 
uncheckpointed cache should be empty.
+        for (int j = 1; j < 1024; j++) {
+            assertEquals(j, checkpointedCache.get(j));
+            assertNull(notCheckpointedCache.get(j));
+        }
+
+        GridCacheDatabaseSharedManager db2 = (GridCacheDatabaseSharedManager)
+            (ignite0.context().cache().context().database());
+
+        waitForCondition(() -> 
!db2.getCheckpointer().currentProgress().inProgress(), 10_000);
+
+        String nodeFolderName = 
ignite0.context().pdsFolderResolver().resolveFolders().folderName();
+        File cpMarkersDir = Paths.get(U.defaultWorkDirectory(), "db", 
nodeFolderName, "cp").toFile();
+
+        //then: Expected only two pairs checkpoint markers - both from the 
start of node.
+        assertEquals(4, cpMarkersDir.listFiles().length);
+    }
+
+    /**
+     * @return Page memory which corresponds to grpId.
+     */
+    private PageMemoryEx getPageMemoryForCacheGroup(
+        int grpId,
+        GridCacheDatabaseSharedManager db,
+        GridKernalContext context
+    ) throws IgniteCheckedException {
+        if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
+            return 
(PageMemoryEx)db.dataRegion(METASTORE_DATA_REGION_NAME).pageMemory();
+
+        if (grpId == TxLog.TX_LOG_CACHE_ID)
+            return 
(PageMemoryEx)db.dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
+
+        CacheGroupDescriptor desc = 
context.cache().cacheGroupDescriptors().get(grpId);
+
+        if (desc == null)
+            return null;
+
+        String memPlcName = desc.config().getDataRegionName();
+
+        return 
(PageMemoryEx)context.cache().context().database().dataRegion(memPlcName).pageMemory();
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 11b9ba5..5510c4b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,7 +20,6 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheStartStopWithFreqCheckpointTest;
@@ -52,8 +51,10 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdown
 import 
org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFreeListTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointListenerForRegionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointStartLoggingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.LightweightCheckpointTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.FsyncWalRolloverDoesNotBlockTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteLocalWalSizeTest;
@@ -212,6 +213,8 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, 
IgnitePdsCorruptedStoreTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CheckpointFailBeforeWriteMarkTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CheckpointFreeListTest.class, 
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CheckpointListenerForRegionTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, LightweightCheckpointTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CheckpointStartLoggingTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, FreeListCachingTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteWalIteratorSwitchSegmentTest.class, ignoredTests);

Reply via email to