This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b041e2f32fa IGNITE-16757 Introduce cache change events for CDCCosumer 
(#9948)
b041e2f32fa is described below

commit b041e2f32fabf2d06a224259a4c2647bbb8156b8
Author: Nikolay <[email protected]>
AuthorDate: Thu Jul 7 12:05:38 2022 +0300

    IGNITE-16757 Introduce cache change events for CDCCosumer (#9948)
---
 .../java/org/apache/ignite/cdc/CdcCacheEvent.java  |  62 +++++
 .../java/org/apache/ignite/cdc/CdcConsumer.java    |  34 ++-
 .../ignite/internal/cdc/CdcConsumerState.java      |  37 +++
 .../org/apache/ignite/internal/cdc/CdcMain.java    | 114 ++++++++-
 .../ignite/internal/cdc/WalRecordsConsumer.java    |  19 ++
 .../pagemem/store/IgnitePageStoreManager.java      |  10 -
 .../internal/processors/cache/CachesRegistry.java  |  21 +-
 .../processors/cache/GridCacheProcessor.java       |   4 +-
 .../internal/processors/cache/GridCacheUtils.java  |  16 ++
 .../processors/cache/GridLocalConfigManager.java   | 142 ++++++----
 .../internal/processors/cache/StoredCacheData.java |  16 +-
 .../IgniteCacheDatabaseSharedManager.java          |  15 +-
 .../persistence/file/FilePageStoreManager.java     |  11 +-
 .../org/apache/ignite/cdc/AbstractCdcTest.java     |  73 +++++-
 .../ignite/cdc/CdcCacheConfigOnRestartTest.java    | 185 +++++++++++++
 .../java/org/apache/ignite/cdc/CdcSelfTest.java    |  17 ++
 .../persistence/pagemem/NoOpPageStoreManager.java  |   5 -
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   2 +
 .../ignite/internal/cdc/CacheEventsCdcTest.java    | 285 +++++++++++++++++++++
 .../org/apache/ignite/internal/cdc/SqlCdcTest.java | 109 ++++++--
 .../IgniteBinaryCacheQueryTestSuite3.java          |   4 +-
 .../apache/ignite/cdc/CdcConfigurationTest.java    |  10 +
 22 files changed, 1063 insertions(+), 128 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/cdc/CdcCacheEvent.java 
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcCacheEvent.java
new file mode 100644
index 00000000000..d006c7b18ea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcCacheEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.spi.systemview.view.CacheView;
+
+/**
+ * Notification of {@link CdcConsumer} about cache creation/change events.
+ *
+ * @see CdcConsumer
+ * @see Ignite#createCache(String)
+ * @see IgniteCache
+ * @see CacheConfiguration
+ * @see QueryEntity
+ */
+@IgniteExperimental
+public interface CdcCacheEvent {
+    /**
+     * @return Cache ID.
+     * @see CacheView#cacheId()
+     */
+    public int cacheId();
+
+    /**
+     * Note, {@link CacheConfiguration#getQueryEntities()} value not changed 
on table schema change.
+     * Current table schema can be obtained by {@link #queryEntities()}.
+     *
+     * @return Initial cache configuration.
+     */
+    public CacheConfiguration<?, ?> configuration();
+
+    /**
+     * Returns current state of configured {@link QueryEntity}.
+     * {@link QueryEntity} can be changed by executing DDL on SQL tables.
+     *
+     * Note, {@link CacheConfiguration#getQueryEntities()} returns initial 
definition of {@link QueryEntity}.
+     *
+     * @return Query entities for cache.
+     */
+    public Collection<QueryEntity> queryEntities();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java 
b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
index c95bc5ed12c..8e14dfd1e5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
+++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cdc;
 
 import java.util.Iterator;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryIdMapper;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.systemview.view.CacheView;
 
 /**
  * Consumer of WAL data change events.
@@ -50,6 +52,14 @@ import org.apache.ignite.resources.LoggerResource;
  *
  * Note, consumption of the {@link CdcEvent} will be started from the last 
saved offset.
  * The offset of consumptions is saved on the disk every time {@link 
#onEvents(Iterator)} returns {@code true}.
+ * Note, order of notifications are following:
+ * <ul>
+ *     <li>{@link #onMappings(Iterator)}</li>
+ *     <li>{@link #onTypes(Iterator)}</li>
+ *     <li>{@link #onCacheChange(Iterator)}</li>
+ *     <li>{@link #onCacheDestroy(Iterator)}</li>
+ * </ul>
+ * Note, {@link CdcConsumer} receive notifications on each running CDC 
application(node).
  *
  * @see CdcMain
  * @see CdcEvent
@@ -77,7 +87,6 @@ public interface CdcConsumer {
     /**
      * Handles new binary types. State of the types processing will be stored 
after method invocation
      * and ongoing notifications after CDC application fail/restart will be 
continued for newly created/updates types.
-     * Invoked before {@link #onEvents(Iterator)}.
      *
      * Note, unlike {@link #onEvents(Iterator)} this method MUST process all 
types or CDC will fail.
      * Because, in time of invocation {@link #onEvents(Iterator)} all changed 
types must be available on destionation.
@@ -94,7 +103,6 @@ public interface CdcConsumer {
     /**
      * Handles new mappings from type name to id. State of the types 
processing will be stored after method invocation
      * and ongoing notifications after CDC application fail/restart will be 
continued for newly created/updates mappings.
-     * Invoked before both {@link #onEvents(Iterator)} and {@link 
#onTypes(Iterator)}.
      *
      * @param mappings Binary mapping iterator.
      * @see IgniteBinary
@@ -103,6 +111,28 @@ public interface CdcConsumer {
      */
     public void onMappings(Iterator<TypeMapping> mappings);
 
+    /**
+     * Handles caches changes(create, edit) events. State of cache processing 
will be stored after method invocation
+     * and ongoing notifications after CDC application fail/restart will be 
continued for newly changed caches.
+     *
+     * @param cacheEvents Cache change events.
+     * @see Ignite#createCache(String)
+     * @see Ignite#getOrCreateCache(String)
+     * @see CdcCacheEvent
+     */
+    public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents);
+
+    /**
+     * Handles cache destroy events. State of cache processing will be stored 
after method invocation
+     * and ongoing notifications after CDC application fail/restart will be 
continued for newly changed caches.
+     *
+     * @param caches Destroyed caches.
+     * @see Ignite#destroyCache(String)
+     * @see CdcCacheEvent
+     * @see CacheView#cacheId()
+     */
+    public void onCacheDestroy(Iterator<Integer> caches);
+
     /**
      * Stops the consumer.
      * This method can be invoked only after {@link #start(MetricRegistry)}.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
index e00391cab71..379c2bb6f95 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
@@ -68,6 +68,9 @@ public class CdcConsumerState {
     /** */
     public static final String MAPPINGS_STATE_FILE_NAME = "cdc-mappings-state" 
+ FILE_SUFFIX;
 
+    /** */
+    public static final String CACHES_STATE_FILE_NAME = "cdc-caches-state" + 
FILE_SUFFIX;
+
     /** Log. */
     private final IgniteLogger log;
 
@@ -89,6 +92,12 @@ public class CdcConsumerState {
     /** Mappings types state file. */
     private final Path tmpMappings;
 
+    /** Cache state file. */
+    private final Path caches;
+
+    /** Mappings types state file. */
+    private final Path tmpCaches;
+
     /**
      * @param stateDir State directory.
      */
@@ -100,6 +109,8 @@ public class CdcConsumerState {
         tmpTypes = stateDir.resolve(TYPES_STATE_FILE_NAME + TMP_SUFFIX);
         mappings = stateDir.resolve(MAPPINGS_STATE_FILE_NAME);
         tmpMappings = stateDir.resolve(MAPPINGS_STATE_FILE_NAME + TMP_SUFFIX);
+        caches = stateDir.resolve(CACHES_STATE_FILE_NAME);
+        tmpCaches = stateDir.resolve(CACHES_STATE_FILE_NAME + TMP_SUFFIX);
     }
 
     /**
@@ -179,6 +190,32 @@ public class CdcConsumerState {
         return state;
     }
 
+    /**
+     * Loads CDC caches state from file.
+     *
+     * @return Saved state.
+     */
+    public Map<Integer, Long> loadCaches() {
+        Map<Integer, Long> state = load(caches, HashMap::new);
+
+        log.info("Initial caches state loaded [cachesCnt=" + state.size() + 
']');
+
+        if (log.isDebugEnabled()) {
+            for (Map.Entry<Integer, Long> entry : state.entrySet())
+                log.debug("Cache [cacheId=" + entry.getKey() + ", 
lastModified=" + entry.getValue() + ']');
+        }
+
+        return state;
+    }
+
+    /**
+     * Saves caches state to file.
+     * @param cachesState State of caches.
+     */
+    public void saveCaches(Map<Integer, Long> cachesState) throws IOException {
+        save(cachesState, tmpCaches, caches);
+    }
+
     /**
      * Loads types mappings state from file.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index dfbf9a0b712..91abaa7faa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
 import org.apache.ignite.cdc.CdcConfiguration;
 import org.apache.ignite.cdc.CdcConsumer;
 import org.apache.ignite.cdc.CdcEvent;
@@ -49,6 +50,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.MarshallerContextImpl;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
@@ -63,6 +65,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.platform.PlatformType;
 import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
 
@@ -73,6 +76,10 @@ import static 
org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
 import static 
org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
 import static 
org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 
@@ -185,6 +192,9 @@ public class CdcMain implements Runnable {
     /** Change Data Capture directory. */
     private Path cdcDir;
 
+    /** Database directory. */
+    private File dbDir;
+
     /** Binary meta directory. */
     private File binaryMeta;
 
@@ -206,6 +216,9 @@ public class CdcMain implements Runnable {
     /** Mappings state. */
     private Set<T2<Integer, Byte>> mappingsState;
 
+    /** Caches state. */
+    private Map<Integer, Long> cachesState;
+
     /** Stopped flag. */
     private volatile boolean stopped;
 
@@ -292,6 +305,7 @@ public class CdcMain implements Runnable {
                 walState = state.loadWalState();
                 typesState = state.loadTypesState();
                 mappingsState = state.loadMappingsState();
+                cachesState = state.loadCaches();
 
                 if (walState != null) {
                     committedSegmentIdx.value(walState.get1().index());
@@ -404,7 +418,7 @@ public class CdcMain implements Runnable {
             AtomicLong lastSgmnt = new AtomicLong(-1);
 
             while (!stopped) {
-                try (Stream<Path> cdcFiles = Files.walk(cdcDir, 1)) {
+                try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
                     Set<Path> exists = new HashSet<>();
 
                     cdcFiles
@@ -423,6 +437,9 @@ public class CdcMain implements Runnable {
                         .forEach(this::consumeSegment); // Consuming segments.
 
                     seen.removeIf(p -> !exists.contains(p)); // Clean up seen 
set.
+
+                    if (lastSgmnt.get() == -1) //Forcefully updating metadata 
if no new segments found.
+                        updateMetadata();
                 }
 
                 if (!stopped)
@@ -436,11 +453,11 @@ public class CdcMain implements Runnable {
 
     /** Reads all available records from segment. */
     private void consumeSegment(Path segment) {
+        updateMetadata();
+
         if (log.isInfoEnabled())
             log.info("Processing WAL segment [segment=" + segment + ']');
 
-        updateMetadata();
-
         IgniteWalIteratorFactory.IteratorParametersBuilder builder =
             new IgniteWalIteratorFactory.IteratorParametersBuilder()
                 .log(log)
@@ -548,17 +565,24 @@ public class CdcMain implements Runnable {
 
         updateTypes();
 
+        updateCaches();
+
         metaUpdate.value(System.currentTimeMillis() - start);
     }
 
     /** Search for new or changed {@link BinaryType} and notifies the 
consumer. */
     private void updateTypes() {
         try {
-            Iterator<BinaryType> changedTypes = Files.list(binaryMeta.toPath())
+            File[] files = binaryMeta.listFiles();
+
+            if (files == null)
+                return;
+
+            Iterator<BinaryType> changedTypes = Arrays.stream(files)
                 .filter(p -> p.toString().endsWith(METADATA_FILE_SUFFIX))
-                .map(p -> {
-                    int typeId = 
BinaryUtils.typeId(p.getFileName().toString());
-                    long lastModified = p.toFile().lastModified();
+                .map(f -> {
+                    int typeId = BinaryUtils.typeId(f.getName());
+                    long lastModified = f.lastModified();
 
                     // Filter out files already in `typesState` with the same 
last modify date.
                     if (typesState.containsKey(typeId) && lastModified == 
typesState.get(typeId))
@@ -638,6 +662,74 @@ public class CdcMain implements Runnable {
         }
     }
 
+    /** Search for new or changed {@link CdcCacheEvent} and notifies the 
consumer. */
+    private void updateCaches() {
+        try {
+            if (!dbDir.exists())
+                return;
+
+            File[] files = dbDir.listFiles();
+
+            if (files == null)
+                return;
+
+            Set<Integer> destroyed = new HashSet<>(cachesState.keySet());
+
+            Iterator<CdcCacheEvent> cacheEvts = Arrays.stream(files)
+                .filter(f -> f.isDirectory() &&
+                    (f.getName().startsWith(CACHE_DIR_PREFIX) || 
f.getName().startsWith(CACHE_GRP_DIR_PREFIX)) &&
+                    !f.getName().equals(CACHE_DIR_PREFIX + UTILITY_CACHE_NAME))
+                .filter(File::exists)
+                // Cache group directory can contain several cache data files.
+                // See 
GridLocalConfigManager#cacheConfigurationFile(CacheConfiguration<?, ?>)
+                .flatMap(cacheDir -> Arrays.stream(cacheDir.listFiles(f -> 
f.getName().endsWith(CACHE_DATA_FILENAME))))
+                .map(f -> {
+                    try {
+                        CdcCacheEvent evt = 
GridLocalConfigManager.readCacheData(
+                            f,
+                            
MarshallerUtils.jdkMarshaller(kctx.igniteInstanceName()),
+                            igniteCfg
+                        );
+
+                        destroyed.remove(evt.cacheId());
+
+                        Long lastModified0 = cachesState.get(evt.cacheId());
+
+                        if (lastModified0 != null && lastModified0 == 
f.lastModified())
+                            return null;
+
+                        cachesState.put(evt.cacheId(), f.lastModified());
+
+                        return evt;
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                })
+                .filter(Objects::nonNull)
+                .iterator();
+
+            consumer.onCacheEvents(cacheEvts);
+
+            if (cacheEvts.hasNext())
+                throw new IllegalStateException("Consumer should handle all 
cache change events");
+
+            if (!destroyed.isEmpty()) {
+                Iterator<Integer> destroyedIter = destroyed.iterator();
+
+                consumer.onCacheDestroyEvents(destroyedIter);
+
+                if (destroyedIter.hasNext())
+                    throw new IllegalStateException("Consumer should handle 
all cache destroy events");
+            }
+
+            state.saveCaches(cachesState);
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
     /**
      * Try locks Change Data Capture directory.
      *
@@ -645,6 +737,13 @@ public class CdcMain implements Runnable {
      * @return Lock or null if lock failed.
      */
     private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) {
+        if (!dbStoreDirWithSubdirectory.exists()) {
+            log.warning("DB store directory not exists. Should be created by 
Ignite Node " +
+                " [dir=" + dbStoreDirWithSubdirectory + ']');
+
+            return null;
+        }
+
         File cdcRoot = new 
File(igniteCfg.getDataStorageConfiguration().getCdcWalPath());
 
         if (!cdcRoot.isAbsolute()) {
@@ -671,6 +770,7 @@ public class CdcMain implements Runnable {
         }
 
         this.cdcDir = cdcDir;
+        this.dbDir = dbStoreDirWithSubdirectory;
 
         CdcFileLockHolder lock = new CdcFileLockHolder(cdcDir.toString(), 
"cdc.lock", log);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
index 7a3c4a1b517..1a752970bca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
 import org.apache.ignite.cdc.CdcConsumer;
 import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.cdc.TypeMapping;
@@ -155,6 +156,24 @@ public class WalRecordsConsumer<K, V> {
         consumer.onMappings(mappings);
     }
 
+    /**
+     * Handles new cache events.
+     *
+     * @param cacheEvts Cache events iterator.
+     */
+    public void onCacheEvents(Iterator<CdcCacheEvent> cacheEvts) {
+        consumer.onCacheChange(cacheEvts);
+    }
+
+    /**
+     * Handles destroy cache events.
+     *
+     * @param caches Destroyed cache iterator.
+     */
+    public void onCacheDestroyEvents(Iterator<Integer> caches) {
+        consumer.onCacheDestroy(caches);
+    }
+
     /**
      * Starts the consumer.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 30f058a46ed..957ff0b58ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -208,14 +208,4 @@ public interface IgnitePageStoreManager extends 
GridCacheSharedManager, IgniteCh
      * @param cleanFiles {@code True} to delete all persisted files related to 
particular store.
      */
     public void cleanupPageStoreIfMatch(Predicate<Integer> cacheGrpPred, 
boolean cleanFiles);
-
-    /**
-     * Creates and initializes cache work directory retrieved from {@code 
cacheCfg}.
-     *
-     * @param cacheCfg Cache configuration.
-     * @return {@code True} if work directory already exists.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public boolean checkAndInitCacheWorkDir(CacheConfiguration cacheCfg) 
throws IgniteCheckedException;
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
index 31aa0ac4693..65868e2a94c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
@@ -26,10 +26,10 @@ import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -249,7 +249,7 @@ public class CachesRegistry {
             registerCache(cacheDesc);
 
         List<DynamicCacheDescriptor> cachesToPersist = 
cacheDescriptors.stream()
-            .filter(cacheDesc -> shouldPersist(cacheDesc.cacheConfiguration()))
+            .filter(cacheDesc -> CU.storeCacheConfig(cctx, 
cacheDesc.cacheConfiguration()))
             .collect(Collectors.toList());
 
         if (cachesToPersist.isEmpty())
@@ -262,18 +262,6 @@ public class CachesRegistry {
         return cachesConfPersistFuture = 
persistCacheConfigurations(cacheConfigsToPersist);
     }
 
-    /**
-     * Checks whether given cache configuration should be persisted.
-     *
-     * @param cacheCfg Cache config.
-     * @return {@code True} if cache configuration should be persisted, {@code 
false} in other case.
-     */
-    private boolean shouldPersist(CacheConfiguration<?, ?> cacheCfg) {
-        return cctx.pageStore() != null &&
-            CU.isPersistentCache(cacheCfg, 
cctx.gridConfig().getDataStorageConfiguration()) &&
-            !cctx.kernalContext().clientNode();
-    }
-
     /**
      * Persists cache configurations.
      *
@@ -284,7 +272,10 @@ public class CachesRegistry {
         // Pre-create cache work directories if they don't exist.
         for (StoredCacheData data : cacheConfigsToPersist) {
             try {
-                cctx.pageStore().checkAndInitCacheWorkDir(data.config());
+                FilePageStoreManager.checkAndInitCacheWorkDir(
+                    cctx.cache().configManager().cacheWorkDir(data.config()),
+                    log
+                );
             }
             catch (IgniteCheckedException e) {
                 if (!cctx.kernalContext().isStopping()) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 25728dcd104..98c00118710 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1118,9 +1118,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             U.stopLifecycleAware(log, lifecycleAwares(ctx.group(), 
cache.configuration(), ctx.store().configuredStore()));
 
-            IgnitePageStoreManager pageStore;
-
-            if (callDestroy && (pageStore = sharedCtx.pageStore()) != null) {
+            if (callDestroy && CU.storeCacheConfig(sharedCtx, ctx.config())) {
                 try {
                     locCfgMgr.removeCacheData(new 
StoredCacheData(ctx.config()));
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d0be19c626e..96123a3856d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -2118,6 +2118,22 @@ public class GridCacheUtils {
         return false;
     }
 
+    /**
+     * Checks whether given cache configuration should be persisted.
+     *
+     * @param cacheCfg Cache config.
+     * @return {@code True} if cache configuration should be persisted, {@code 
false} in other case.
+     */
+    public static boolean storeCacheConfig(GridCacheSharedContext<?, ?> cctx, 
CacheConfiguration<?, ?> cacheCfg) {
+        if (cctx.kernalContext().clientNode())
+            return false;
+
+        DataRegionConfiguration drCfg =
+            findDataRegion(cctx.gridConfig().getDataStorageConfiguration(), 
cacheCfg.getDataRegionName());
+
+        return drCfg != null && (drCfg.isPersistenceEnabled() || 
drCfg.isCdcEnabled());
+    }
+
     /**
      * @param pageSize Page size.
      * @param encSpi Encryption spi.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
index ed69095434e..6cc56335480 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
@@ -43,15 +43,16 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import org.apache.ignite.internal.util.typedef.F;
@@ -64,7 +65,6 @@ import org.apache.ignite.marshaller.MarshallerUtils;
 
 import static java.nio.file.Files.newDirectoryStream;
 import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
@@ -187,29 +187,12 @@ public class GridLocalConfigManager {
     }
 
     /**
-     * @param dir Cache (group) directory.
-     * @param ccfgs Cache configurations.
+     * @param conf File with stored cache data.
+     * @return Cache data.
      * @throws IgniteCheckedException If failed.
      */
-    public void readCacheConfigurations(File dir, Map<String, StoredCacheData> 
ccfgs) throws IgniteCheckedException {
-        if (dir.getName().startsWith(CACHE_DIR_PREFIX)) {
-            File conf = new File(dir, CACHE_DATA_FILENAME);
-
-            if (conf.exists() && conf.length() > 0) {
-                StoredCacheData cacheData = readCacheData(conf);
-
-                String cacheName = cacheData.config().getName();
-
-                if (!ccfgs.containsKey(cacheName))
-                    ccfgs.put(cacheName, cacheData);
-                else {
-                    U.warn(log, "Cache with name=" + cacheName + " is already 
registered, skipping config file "
-                        + dir.getName());
-                }
-            }
-        }
-        else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
-            readCacheGroupCaches(dir, ccfgs);
+    public StoredCacheData readCacheData(File conf) throws 
IgniteCheckedException {
+        return readCacheData(conf, marshaller, ctx.config());
     }
 
     /**
@@ -217,9 +200,13 @@ public class GridLocalConfigManager {
      * @return Cache data.
      * @throws IgniteCheckedException If failed.
      */
-    public StoredCacheData readCacheData(File conf) throws 
IgniteCheckedException {
+    public static StoredCacheData readCacheData(
+        File conf,
+        Marshaller marshaller,
+        IgniteConfiguration cfg
+    ) throws IgniteCheckedException {
         try (InputStream stream = new BufferedInputStream(new 
FileInputStream(conf))) {
-            return marshaller.unmarshal(stream, 
U.resolveClassLoader(ctx.config()));
+            return marshaller.unmarshal(stream, U.resolveClassLoader(cfg));
         }
         catch (IgniteCheckedException | IOException e) {
             throw new IgniteCheckedException("An error occurred during cache 
configuration loading from file [file=" +
@@ -255,19 +242,14 @@ public class GridLocalConfigManager {
     ) throws IgniteCheckedException {
         assert cacheData != null;
 
-        GridCacheSharedContext<Object, Object> sharedContext = 
cacheProcessor.context();
-
-        boolean shouldStore = sharedContext.pageStore() != null
-            && !sharedContext.kernalContext().clientNode()
-            && isPersistentCache(cacheData.config(), 
sharedContext.gridConfig().getDataStorageConfiguration());
+        CacheConfiguration<?, ?> ccfg = cacheData.config();
 
-        if (!shouldStore)
+        if (!CU.storeCacheConfig(cacheProcessor.context(), ccfg))
             return;
 
-        CacheConfiguration<?, ?> ccfg = cacheData.config();
         File cacheWorkDir = cacheWorkDir(ccfg);
 
-        cacheProcessor.context().pageStore().checkAndInitCacheWorkDir(ccfg);
+        FilePageStoreManager.checkAndInitCacheWorkDir(cacheWorkDir, log);
 
         assert cacheWorkDir.exists() : "Work directory does not exist: " + 
cacheWorkDir;
 
@@ -355,7 +337,7 @@ public class GridLocalConfigManager {
 
         Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new 
HashMap<>();
 
-        restoreCaches(caches, templates, ctx.config(), 
ctx.cache().context().pageStore());
+        restoreCaches(caches, templates, ctx.config());
 
         CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
             IgniteUuid.randomUuid(),
@@ -423,19 +405,79 @@ public class GridLocalConfigManager {
             return;
 
         for (File file : files) {
-            if (!file.isDirectory() && 
file.getName().endsWith(CACHE_DATA_FILENAME) && file.length() > 0) {
-                StoredCacheData cacheData = readCacheData(file);
+            if (!file.isDirectory() && 
file.getName().endsWith(CACHE_DATA_FILENAME) && file.length() > 0)
+                readAndAdd(
+                    ccfgs,
+                    file,
+                    cacheName -> "Cache with name=" + cacheName + " is already 
registered, " +
+                        "skipping config file " + file.getName() + " in group 
directory " + grpDir.getName()
+                );
+        }
+    }
 
-                String cacheName = cacheData.config().getName();
+    /**
+     * @param dir Cache (group) directory.
+     * @param ccfgs Cache configurations.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void readCacheConfigurations(File dir, Map<String, StoredCacheData> 
ccfgs) throws IgniteCheckedException {
+        if (dir.getName().startsWith(CACHE_DIR_PREFIX)) {
+            File conf = new File(dir, CACHE_DATA_FILENAME);
 
-                if (!ccfgs.containsKey(cacheName))
-                    ccfgs.put(cacheName, cacheData);
-                else {
-                    U.warn(log, "Cache with name=" + cacheName + " is already 
registered, skipping config file "
-                        + file.getName() + " in group directory " + 
grpDir.getName());
-                }
+            if (conf.exists() && conf.length() > 0) {
+                readAndAdd(
+                    ccfgs,
+                    conf,
+                    cache -> "Cache with name=" + cache + " is already 
registered, skipping config file " + dir.getName()
+                );
             }
         }
+        else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            readCacheGroupCaches(dir, ccfgs);
+    }
+
+    /**
+     * @param ccfgs Loaded configurations.
+     * @param file Storead cache data file.
+     * @param msg Warning message producer.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void readAndAdd(
+        Map<String, StoredCacheData> ccfgs,
+        File file,
+        Function<String, String> msg
+    ) throws IgniteCheckedException {
+        StoredCacheData cacheData = readCacheData(file, marshaller, 
ctx.config());
+
+        String cacheName = cacheData.config().getName();
+
+        // In-memory CDC stored data must be removed on node failover.
+        if (inMemoryCdcCache(cacheData.config())) {
+            removeCacheData(cacheData);
+
+            U.warn(
+                log,
+                "Stored data for in-memory CDC cache removed[name=" + 
cacheName + ", file=" + file.getName() + ']'
+            );
+
+            return;
+        }
+
+        if (!ccfgs.containsKey(cacheName))
+            ccfgs.put(cacheName, cacheData);
+        else
+            U.warn(log, msg.apply(cacheName));
+    }
+
+    /**
+     * @param cfg Cache configuration.
+     * @return {@code True} if cache placed in in-memory and CDC enabled data 
region.
+     */
+    private boolean inMemoryCdcCache(CacheConfiguration<?, ?> cfg) {
+        DataRegionConfiguration drCfg =
+            CU.findDataRegion(ctx.config().getDataStorageConfiguration(), 
cfg.getDataRegionName());
+
+        return drCfg != null && !drCfg.isPersistenceEnabled() && 
drCfg.isCdcEnabled();
     }
 
     /**
@@ -453,7 +495,7 @@ public class GridLocalConfigManager {
      * @param ccfg Cache configuration.
      * @return Store dir for given cache.
      */
-    private File cacheWorkDir(CacheConfiguration<?, ?> ccfg) {
+    public File cacheWorkDir(CacheConfiguration<?, ?> ccfg) {
         return FilePageStoreManager.cacheWorkDir(storeWorkDir, 
FilePageStoreManager.cacheDirName(ccfg));
     }
 
@@ -476,18 +518,16 @@ public class GridLocalConfigManager {
     /**
      * @param caches Caches accumulator.
      * @param templates Templates accumulator.
-     * @param config Ignite configuration.
-     * @param pageStoreManager Page store manager.
+     * @param igniteCfg Ignite configuration.
      */
     private void restoreCaches(
         Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
         Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
-        IgniteConfiguration config,
-        IgnitePageStoreManager pageStoreManager
+        IgniteConfiguration igniteCfg
     ) throws IgniteCheckedException {
-        assert !config.isDaemon() : "Trying to restore cache configurations on 
daemon node.";
+        assert !igniteCfg.isDaemon() : "Trying to restore cache configurations 
on daemon node.";
 
-        CacheConfiguration[] cfgs = config.getCacheConfiguration();
+        CacheConfiguration[] cfgs = igniteCfg.getCacheConfiguration();
 
         for (int i = 0; i < cfgs.length; i++) {
             CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
@@ -498,7 +538,7 @@ public class GridLocalConfigManager {
             addCacheFromConfiguration(cfg, false, caches, templates);
         }
 
-        if (CU.isPersistenceEnabled(config) && pageStoreManager != null) {
+        if ((CU.isPersistenceEnabled(igniteCfg) && 
ctx.cache().context().pageStore() != null) || CU.isCdcEnabled(igniteCfg)) {
             Map<String, StoredCacheData> storedCaches = 
readCacheConfigurations();
 
             if (!F.isEmpty(storedCaches)) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
index 08328d76d3c..bc2c835cf3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.cache;
 import java.io.Serializable;
 import java.util.Collection;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cdc.CdcCacheEvent;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 
@@ -35,7 +37,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
  * This class is {@link Serializable} and is intended to be read-written with 
{@link JdkMarshaller}
  * in order to be serialization wise agnostic to further additions or removals 
of fields.
  */
-public class StoredCacheData implements Serializable {
+public class StoredCacheData implements Serializable, CdcCacheEvent {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -102,7 +104,7 @@ public class StoredCacheData implements Serializable {
     /**
      * @return Query entities.
      */
-    public Collection<QueryEntity> queryEntities() {
+    @Override public Collection<QueryEntity> queryEntities() {
         return qryEntities;
     }
 
@@ -202,4 +204,14 @@ public class StoredCacheData implements Serializable {
     @Override public String toString() {
         return S.toString(StoredCacheData.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public int cacheId() {
+        return CU.cacheId(ccfg.getName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheConfiguration<?, ?> configuration() {
+        return ccfg;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index d25458bf116..75ba6cce6c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1190,7 +1190,20 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
      * @param stoppedGrps A collection of tuples (cache group, destroy flag).
      */
     public void 
onCacheGroupsStopped(Collection<IgniteBiTuple<CacheGroupContext, Boolean>> 
stoppedGrps) {
-        // No-op.
+        for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
+            CacheGroupContext grp = tup.get1();
+
+            try {
+                boolean destroy = tup.get2();
+
+                if (destroy && CU.storeCacheConfig(cctx, grp.config()))
+                    
cctx.cache().configManager().removeCacheGroupConfigurationData(grp);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to gracefully clean page store resources 
for destroyed cache " +
+                    "[cache=" + grp.cacheOrGroupName() + "]", e);
+            }
+        }
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index aa332e2ba7c..02c1304a320 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -181,7 +181,7 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
     private final Set<Integer> grpsWithoutIdx = Collections.newSetFromMap(new 
ConcurrentHashMap<Integer, Boolean>());
 
     /** */
-    private final GridStripedReadWriteLock initDirLock =
+    private static final GridStripedReadWriteLock initDirLock =
         new 
GridStripedReadWriteLock(Math.max(Runtime.getRuntime().availableProcessors(), 
8));
 
     /**
@@ -665,7 +665,7 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
         PageMetrics pageMetrics,
         boolean encrypted) throws IgniteCheckedException {
         try {
-            boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir);
+            boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir, log);
 
             if (dirExisted) {
                 MaintenanceRegistry mntcReg = 
cctx.kernalContext().maintenanceRegistry();
@@ -739,15 +739,10 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
         return partId == INDEX_PARTITION ? INDEX_FILE_NAME : 
format(PART_FILE_TEMPLATE, partId);
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean checkAndInitCacheWorkDir(CacheConfiguration 
cacheCfg) throws IgniteCheckedException {
-        return checkAndInitCacheWorkDir(cacheWorkDir(cacheCfg));
-    }
-
     /**
      * @param cacheWorkDir Cache work directory.
      */
-    private boolean checkAndInitCacheWorkDir(File cacheWorkDir) throws 
IgniteCheckedException {
+    public static boolean checkAndInitCacheWorkDir(File cacheWorkDir, 
IgniteLogger log) throws IgniteCheckedException {
         boolean dirExisted = false;
 
         ReadWriteLock lock = 
initDirLock.getLock(cacheWorkDir.getName().hashCode());
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
index 10d4a8c5176..c37d1aa40cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -276,9 +277,12 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
 
     /** */
     public abstract static class TestCdcConsumer<T> implements CdcConsumer {
-        /** Keys */
+        /** Keys. */
         final ConcurrentMap<IgniteBiTuple<ChangeEventType, Integer>, List<T>> 
data = new ConcurrentHashMap<>();
 
+        /** Cache events. */
+        protected final ConcurrentMap<Integer, CdcCacheEvent> caches = new 
ConcurrentHashMap<>();
+
         /** */
         private volatile boolean stopped;
 
@@ -302,6 +306,8 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
                     F.t(evt.value() == null ? DELETE : UPDATE, evt.cacheId()),
                     k -> new ArrayList<>()).add(extract(evt));
 
+                assertTrue(caches.containsKey(evt.cacheId()));
+
                 checkEvent(evt);
             });
 
@@ -313,6 +319,20 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
             types.forEachRemaining(t -> assertNotNull(t));
         }
 
+        /** {@inheritDoc} */
+        @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts) 
{
+            cacheEvts.forEachRemaining(evt -> {
+                assertFalse(caches.containsKey(evt.cacheId()));
+
+                caches.put(evt.cacheId(), evt);
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheDestroy(Iterator<Integer> caches) {
+            caches.forEachRemaining(cacheId -> 
assertNotNull(this.caches.remove(cacheId)));
+        }
+
         /** */
         public abstract void checkEvent(CdcEvent evt);
 
@@ -392,6 +412,57 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /** */
+    public static class TrackCacheEventsConsumer implements CdcConsumer {
+        /** Cache events. */
+        public final Map<Integer, CdcCacheEvent> evts = new 
ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void onCacheChange(Iterator<CdcCacheEvent> 
cacheEvents) {
+            cacheEvents.forEachRemaining(e -> {
+                log.info("TrackCacheEventsConsumer.add[cacheId=" + e.cacheId() 
+ ", e=" + e + ']');
+                evts.put(e.cacheId(), e);
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheDestroy(Iterator<Integer> caches) {
+            caches.forEachRemaining(cacheId -> {
+                log.info("TrackCacheEventsConsumer.remove[cacheId=" + cacheId 
+ ']');
+
+                evts.remove(cacheId);
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start(MetricRegistry mreg) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+            evts.forEachRemaining(e -> { /* No-op. */ });
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTypes(Iterator<BinaryType> types) {
+            types.forEachRemaining(e -> { /* No-op. */ });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMappings(Iterator<TypeMapping> mappings) {
+            mappings.forEachRemaining(e -> { /* No-op. */ });
+        }
+
+
+        /** {@inheritDoc} */
+        @Override public void stop() {
+            // No-op.
+        }
+    }
+
     /** */
     protected static User createUser(int i) {
         byte[] bytes = new byte[1024];
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheConfigOnRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheConfigOnRestartTest.java
new file mode 100644
index 00000000000..5895c64273d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheConfigOnRestartTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Test cache stored data for in-memory CDC caches cleared on node restart.
+ */
+public class CdcCacheConfigOnRestartTest extends AbstractCdcTest {
+    /** Ignite node. */
+    private IgniteEx node;
+
+    /** CDC future. */
+    private IgniteInternalFuture<?> cdcFut;
+
+    /** Consumer. */
+    private TrackCacheEventsConsumer cnsmr;
+
+    /** */
+    public static final String PERSISTENCE = "persistence";
+
+    /** */
+    public static final String IN_MEMORY_CDC = "in-memory-cdc";
+
+    /** */
+    public static final String PURE_IN_MEMORY = "pure-in-memory";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        DataRegionConfiguration persistenceDr = new DataRegionConfiguration()
+            .setName(PERSISTENCE)
+            .setPersistenceEnabled(true);
+
+        DataRegionConfiguration inMemoryCdc = new DataRegionConfiguration()
+            .setName(IN_MEMORY_CDC)
+            .setPersistenceEnabled(false)
+            .setCdcEnabled(true);
+
+        DataRegionConfiguration pureInMemory = new DataRegionConfiguration()
+            .setName(PURE_IN_MEMORY)
+            .setPersistenceEnabled(false)
+            .setCdcEnabled(false);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDataRegionConfigurations(persistenceDr, inMemoryCdc, 
pureInMemory));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        node = startGrid();
+
+        node.cluster().state(ClusterState.ACTIVE);
+
+        cnsmr = new TrackCacheEventsConsumer();
+
+        cdcFut = runAsync(createCdc(cnsmr, node.configuration()));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (cdcFut != null) {
+            assertFalse(cdcFut.isDone());
+
+            cdcFut.cancel();
+        }
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testInMemoryCdcClearedOnRestart() throws Exception {
+        String pureInMemoryCache = "grouped-pure-in-memory-cache";
+        String inMemoryCdcCache = "grouped-in-memory-cdc-cache";
+
+        node.createCache(pureInMemory(PURE_IN_MEMORY));
+        node.createCache(pureInMemory(pureInMemoryCache).setGroupName("grp"));
+        node.createCache(inMemoryCdc(IN_MEMORY_CDC));
+        node.createCache(inMemoryCdc(inMemoryCdcCache).setGroupName("grp2"));
+
+        assertTrue(waitForCondition(
+            () -> cnsmr.evts.containsKey(CU.cacheId(IN_MEMORY_CDC))
+                && cnsmr.evts.containsKey(CU.cacheId(inMemoryCdcCache)),
+            getTestTimeout()
+        ));
+
+        // Pure in-memory caches must node store data.
+        assertFalse(cnsmr.evts.containsKey(CU.cacheId(PURE_IN_MEMORY)));
+        assertFalse(cnsmr.evts.containsKey(CU.cacheId(pureInMemoryCache)));
+
+        stopAllGrids();
+        node = startGrid();
+
+        // Cache config must be removed on node restart.
+        assertTrue(waitForCondition(() -> cnsmr.evts.isEmpty(), 
getTestTimeout()));
+    }
+
+    /** */
+    @Test
+    public void testPersistenceNotClearedOnRestart() throws Exception {
+        String persistenceCache = "grouped-persistence-cache";
+        String persistenceCache2 = "persistence-2";
+        String inMemoryCdcCache = "grouped-in-memory-cdc-cache";
+
+        node.createCache(persistence(PERSISTENCE));
+        node.createCache(persistence(persistenceCache).setGroupName("grp"));
+        node.createCache(inMemoryCdc(IN_MEMORY_CDC));
+        node.createCache(inMemoryCdc(inMemoryCdcCache).setGroupName("grp2"));
+
+        assertTrue(waitForCondition(
+            () -> cnsmr.evts.containsKey(CU.cacheId(IN_MEMORY_CDC))
+                && cnsmr.evts.containsKey(CU.cacheId(inMemoryCdcCache))
+                && cnsmr.evts.containsKey(CU.cacheId(PERSISTENCE))
+                && cnsmr.evts.containsKey(CU.cacheId(persistenceCache)),
+            getTestTimeout()
+        ));
+
+        stopAllGrids();
+        node = startGrid();
+
+        // Create one more cache to know for sure CDC updated cache data.
+        node.createCache(persistence(persistenceCache2));
+
+        assertTrue(waitForCondition(() -> 
cnsmr.evts.containsKey(CU.cacheId(persistenceCache2)), getTestTimeout()));
+
+        // Cache config for in-memory CDC caches must be removed on node 
restart.
+        assertFalse(cnsmr.evts.containsKey(CU.cacheId(IN_MEMORY_CDC)));
+        assertFalse(cnsmr.evts.containsKey(CU.cacheId(inMemoryCdcCache)));
+
+        assertTrue(cnsmr.evts.containsKey(CU.cacheId(PERSISTENCE)));
+        assertTrue(cnsmr.evts.containsKey(CU.cacheId(persistenceCache)));
+
+        assertEquals(3, cnsmr.evts.size());
+    }
+
+    /** */
+    private CacheConfiguration<?, ?> persistence(String cacheName) {
+        return new CacheConfiguration<Integer, 
Integer>(cacheName).setDataRegionName(PERSISTENCE);
+    }
+
+    /** */
+    private CacheConfiguration<?, ?> inMemoryCdc(String cacheName) {
+        return new CacheConfiguration<Integer, 
Integer>(cacheName).setDataRegionName(IN_MEMORY_CDC);
+    }
+
+    /** */
+    private CacheConfiguration<?, ?> pureInMemory(String cacheName) {
+        return new CacheConfiguration<Integer, 
Integer>(cacheName).setDataRegionName(PURE_IN_MEMORY);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index a5eed340c5e..eda1082d0d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -284,6 +284,15 @@ public class CdcSelfTest extends AbstractCdcTest {
                     mappings.forEachRemaining(m -> assertNotNull(m));
                 }
 
+                @Override public void onCacheChange(Iterator<CdcCacheEvent> 
cacheEvents) {
+                    cacheEvents.forEachRemaining(ce -> assertNotNull(ce));
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onCacheDestroy(Iterator<Integer> caches) 
{
+                    caches.forEachRemaining(ce -> assertNotNull(ce));
+                }
+
                 @Override public void stop() {
                     // No-op.
                 }
@@ -367,6 +376,14 @@ public class CdcSelfTest extends AbstractCdcTest {
                     mappings.forEachRemaining(m -> assertNotNull(m));
                 }
 
+                @Override public void onCacheChange(Iterator<CdcCacheEvent> 
cacheEvents) {
+                    cacheEvents.forEachRemaining(ce -> assertNotNull(ce));
+                }
+
+                @Override public void onCacheDestroy(Iterator<Integer> caches) 
{
+                    caches.forEachRemaining(ce -> assertNotNull(ce));
+                }
+
                 @Override public void stop() {
                     // No-op.
                 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
index 33d6f11c41b..03312bbb0ec 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
@@ -217,9 +217,4 @@ public class NoOpPageStoreManager implements 
IgnitePageStoreManager {
     @Override public void cleanupPageStoreIfMatch(Predicate<Integer> 
cacheGrpPred, boolean cleanFiles) {
         // No-op.
     }
-
-    /** {@inheritDoc} */
-    @Override public boolean checkAndInitCacheWorkDir(CacheConfiguration 
cacheCfg) throws IgniteCheckedException {
-        return false;
-    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 96a1ec075a0..ad5fc72e8b1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.cdc.CdcCacheConfigOnRestartTest;
 import org.apache.ignite.cdc.CdcCacheVersionTest;
 import org.apache.ignite.cdc.CdcSelfTest;
 import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
@@ -152,6 +153,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, 
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CdcCacheConfigOnRestartTest.class, ignoredTests);
 
         // new style folders with generated consistent ID test
         GridTestUtils.addTestIfNeeded(suite, 
IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CacheEventsCdcTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CacheEventsCdcTest.java
new file mode 100644
index 00000000000..de553ec1d91
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CacheEventsCdcTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.internal.cdc.SqlCdcTest.executeSql;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests cache events for CDC.
+ */
+@RunWith(Parameterized.class)
+public class CacheEventsCdcTest extends AbstractCdcTest {
+    /** Ignite node. */
+    private IgniteEx node;
+
+    /** */
+    private IgniteCache<Integer, Integer> dummy;
+
+    /** CDC future. */
+    private IgniteInternalFuture<?> cdcFut;
+
+    /** Consumer. */
+    private TrackCacheEventsConsumer cnsmr;
+
+    /** */
+    @Parameterized.Parameter
+    public boolean persistenceEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "persistence={0}")
+    public static Object[] parameters() {
+        return new Object[] {true, false};
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)
+                .setCdcEnabled(true)));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        node = startGrid();
+
+        node.cluster().state(ClusterState.ACTIVE);
+
+        cnsmr = new TrackCacheEventsConsumer();
+
+        cdcFut = runAsync(createCdc(cnsmr, node.configuration()));
+
+        dummy = node.getOrCreateCache("dummy");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (cdcFut != null) {
+            assertFalse(cdcFut.isDone());
+
+            cdcFut.cancel();
+        }
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testCreateDestroyCache() throws Exception {
+        node.createCache(DEFAULT_CACHE_NAME);
+
+        assertTrue(waitForCondition(() -> 
cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+
+        node.destroyCache(DEFAULT_CACHE_NAME);
+
+        assertTrue(waitForCondition(() -> 
!cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+    }
+
+    /** */
+    @Test
+    public void testCreateDestroyCachesInGroup() throws Exception {
+        String otherCache = "other-cache";
+
+        node.createCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME).setGroupName("group"));
+        node.createCache(new CacheConfiguration<Integer, 
Integer>(otherCache).setGroupName("group"));
+
+        dummy.put(1, 1); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(() -> 
cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+        assertTrue(waitForCondition(() -> 
cnsmr.evts.containsKey(CU.cacheId(otherCache)), getTestTimeout()));
+
+        node.destroyCache(DEFAULT_CACHE_NAME);
+
+        dummy.put(2, 2); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(() -> 
!cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)), getTestTimeout()));
+        assertTrue(cnsmr.evts.containsKey(CU.cacheId(otherCache)));
+
+        node.destroyCache(otherCache);
+
+        dummy.put(3, 3); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(() -> 
!cnsmr.evts.containsKey(CU.cacheId(otherCache)), getTestTimeout()));
+        assertFalse(cnsmr.evts.containsKey(CU.cacheId(DEFAULT_CACHE_NAME)));
+    }
+
+    /** */
+    @Test
+    public void testCreateDropSQLTable() throws Exception {
+        AtomicReference<QueryEntity> fromCfg = new AtomicReference<>();
+
+        Function<Integer, GridAbsPredicate> checker = fldCnt -> () -> {
+            CdcCacheEvent evt = cnsmr.evts.get(CU.cacheId("T1"));
+
+            if (evt == null)
+                return false;
+
+            assertNotNull(evt.configuration().getQueryEntities());
+            assertNotNull(evt.queryEntities());
+
+            assertEquals(1, evt.queryEntities().size());
+
+            QueryEntity qryEntity = evt.queryEntities().iterator().next();
+
+            if (qryEntity.getFields().size() != fldCnt)
+                return false;
+
+            QueryEntity fromCfg0 = 
evt.configuration().getQueryEntities().iterator().next();
+
+            fromCfg.set(fromCfg0);
+
+            assertTrue(qryEntity.getFields().containsKey("ID"));
+            assertTrue(qryEntity.getFields().containsKey("NAME"));
+
+            assertTrue(fromCfg0.getFields().containsKey("ID"));
+            assertTrue(fromCfg0.getFields().containsKey("NAME"));
+
+            assertFalse(
+                "Saved config must not change on schema change",
+                fromCfg0.getFields().containsKey("CITY_ID")
+            );
+
+            if (fldCnt == 3)
+                assertTrue(qryEntity.getFields().containsKey("CITY_ID"));
+
+            assertTrue(qryEntity.getIndexes().isEmpty());
+            assertTrue(fromCfg0.getIndexes().isEmpty());
+
+            return true;
+        };
+
+        executeSql(node, "CREATE TABLE T1(ID INT, NAME VARCHAR, PRIMARY KEY 
(ID)) WITH \"CACHE_NAME=T1\"");
+
+        dummy.put(1, 1); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(checker.apply(2), getTestTimeout()));
+
+        executeSql(node, "ALTER TABLE T1 ADD COLUMN CITY_ID INT");
+
+        dummy.put(2, 2); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(checker.apply(3), getTestTimeout()));
+
+        executeSql(node, "CREATE INDEX I1 ON T1(CITY_ID)");
+
+        dummy.put(3, 3); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(() -> {
+            CdcCacheEvent evt = cnsmr.evts.get(CU.cacheId("T1"));
+
+            QueryEntity qryEntity = evt.queryEntities().iterator().next();
+
+            if (F.isEmpty(qryEntity.getIndexes()))
+                return false;
+
+            QueryEntity fromCfg0 = 
evt.configuration().getQueryEntities().iterator().next();
+
+            assertEquals(fromCfg.get(), fromCfg0);
+            assertTrue(fromCfg0.getIndexes().isEmpty());
+
+            QueryIndex idx = qryEntity.getIndexes().iterator().next();
+
+            assertEquals("I1", idx.getName());
+            assertEquals(1, idx.getFields().size());
+            assertEquals("CITY_ID", 
idx.getFields().keySet().iterator().next());
+
+            return true;
+        }, getTestTimeout()));
+
+        executeSql(node, "DROP TABLE T1");
+
+        dummy.put(4, 4); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(() -> 
!cnsmr.evts.containsKey(CU.cacheId("T1")), getTestTimeout()));
+    }
+
+    /** */
+    @Test
+    public void testCreateTableForExistingCache() throws Exception {
+        Function<Boolean, GridAbsPredicate> checker = chkTblExist -> () -> {
+            CdcCacheEvent evt = cnsmr.evts.get(CU.cacheId(DEFAULT_CACHE_NAME));
+
+            if (evt == null)
+                return false;
+
+            if (!chkTblExist)
+                return true;
+
+            if (F.isEmpty(evt.queryEntities()))
+                return false;
+
+            assertEquals(1, evt.queryEntities().size());
+
+            QueryEntity qryEntity = evt.queryEntities().iterator().next();
+
+            if (qryEntity.getFields().size() != 2)
+                return false;
+
+            assertTrue(qryEntity.getFields().containsKey("ID"));
+            assertTrue(qryEntity.getFields().containsKey("NAME"));
+
+            return true;
+        };
+
+        node.createCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME));
+
+        dummy.put(5, 5); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(checker.apply(false), getTestTimeout()));
+
+        executeSql(
+            node,
+            "CREATE TABLE T1(ID INT, NAME VARCHAR, PRIMARY KEY (ID)) WITH 
\"CACHE_NAME=" + DEFAULT_CACHE_NAME + "\""
+        );
+
+        dummy.put(6, 6); // Dummy entry to force automatic WAL rollover.
+
+        assertTrue(waitForCondition(checker.apply(true), getTestTimeout()));
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java 
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
index db33d1cd9be..0cd6b7964ce 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java
@@ -24,8 +24,10 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.binary.BinaryBasicIdMapper;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.cdc.CdcCacheEvent;
 import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -72,6 +74,21 @@ public class SqlCdcTest extends AbstractCdcTest {
     /** */
     public static final String CITY_VAL_TYPE = "TestCity";
 
+    /** */
+    public static final String ID = "ID";
+
+    /** */
+    public static final String CITY_ID = "CITY_ID";
+
+    /** */
+    public static final String NAME = "NAME";
+
+    /** */
+    public static final String ZIP_CODE = "ZIP_CODE";
+
+    /** */
+    public static final String REGION = "REGION";
+
     /** */
     @Parameterized.Parameter
     public boolean persistenceEnabled;
@@ -95,6 +112,13 @@ public class SqlCdcTest extends AbstractCdcTest {
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
     /** Simplest CDC test. */
     @Test
     public void testReadAllSQLRows() throws Exception {
@@ -118,13 +142,13 @@ public class SqlCdcTest extends AbstractCdcTest {
         executeSql(
             ign,
             "CREATE TABLE USER(id int, city_id int, name varchar, PRIMARY KEY 
(id, city_id)) " +
-                "WITH \"CACHE_NAME=user,VALUE_TYPE=" + USER_VAL_TYPE + 
",KEY_TYPE=" + USER_KEY_TYPE + "\""
+                "WITH \"CACHE_NAME=" + USER + ",VALUE_TYPE=" + USER_VAL_TYPE + 
",KEY_TYPE=" + USER_KEY_TYPE + "\""
         );
 
         executeSql(
             ign,
             "CREATE TABLE CITY(id int, name varchar, zip_code varchar(6), 
PRIMARY KEY (id)) " +
-               "WITH \"CACHE_NAME=city,VALUE_TYPE=TestCity\""
+               "WITH \"CACHE_NAME=" + CITY + ",VALUE_TYPE=TestCity\""
         );
 
         for (int i = 0; i < KEYS_CNT; i++) {
@@ -160,7 +184,7 @@ public class SqlCdcTest extends AbstractCdcTest {
 
         cdc = createCdc(cnsmr, cfg);
 
-        IgniteInternalFuture<?> rmvFut = runAsync(cdc);
+        IgniteInternalFuture<?> cdcFut = runAsync(cdc);
 
         waitForSize(KEYS_CNT, USER, DELETE, cnsmr);
 
@@ -172,13 +196,15 @@ public class SqlCdcTest extends AbstractCdcTest {
             ign,
             "INSERT INTO CITY VALUES(?, ?, ?, ?)",
             KEYS_CNT + 1,
-            MSK,
+            SPB,
             Integer.toString(127000 + KEYS_CNT + 1),
-            "Moscow region");
+            "Saint Petersburg");
 
         waitForSize(KEYS_CNT + 1, CITY, UPDATE, cnsmr);
 
-        rmvFut.cancel();
+        assertFalse(cdcFut.isDone());
+
+        cdcFut.cancel();
 
         assertTrue(cnsmr.stopped());
     }
@@ -207,12 +233,12 @@ public class SqlCdcTest extends AbstractCdcTest {
                 return;
 
             if (evt.cacheId() == cacheId(USER)) {
-                int id = ((BinaryObject)evt.key()).field("ID");
-                int cityId = ((BinaryObject)evt.key()).field("CITY_ID");
+                int id = ((BinaryObject)evt.key()).field(ID);
+                int cityId = ((BinaryObject)evt.key()).field(CITY_ID);
 
                 assertEquals(42 * id, cityId);
 
-                String name = ((BinaryObject)evt.value()).field("NAME");
+                String name = ((BinaryObject)evt.value()).field(NAME);
 
                 if (id % 2 == 0)
                     assertTrue(name.startsWith(JOHN));
@@ -221,8 +247,8 @@ public class SqlCdcTest extends AbstractCdcTest {
             }
             else {
                 int id = (Integer)evt.key();
-                String name = ((BinaryObject)evt.value()).field("NAME");
-                String zipCode = ((BinaryObject)evt.value()).field("ZIP_CODE");
+                String name = ((BinaryObject)evt.value()).field(NAME);
+                String zipCode = ((BinaryObject)evt.value()).field(ZIP_CODE);
 
                 assertEquals(Integer.toString(127000 + id), zipCode);
 
@@ -249,34 +275,34 @@ public class SqlCdcTest extends AbstractCdcTest {
 
                 switch (type.typeName()) {
                     case USER_KEY_TYPE:
-                        
assertTrue(type.fieldNames().containsAll(Arrays.asList("ID", "CITY_ID")));
+                        
assertTrue(type.fieldNames().containsAll(Arrays.asList(ID, CITY_ID)));
                         assertEquals(2, type.fieldNames().size());
-                        assertEquals(int.class.getSimpleName(), 
type.fieldTypeName("ID"));
-                        assertEquals(int.class.getSimpleName(), 
type.fieldTypeName("CITY_ID"));
+                        assertEquals(int.class.getSimpleName(), 
type.fieldTypeName(ID));
+                        assertEquals(int.class.getSimpleName(), 
type.fieldTypeName(CITY_ID));
 
                         userKeyType = true;
 
                         break;
 
                     case USER_VAL_TYPE:
-                        assertTrue(type.fieldNames().contains("NAME"));
+                        assertTrue(type.fieldNames().contains(NAME));
                         assertEquals(1, type.fieldNames().size());
-                        assertEquals(String.class.getSimpleName(), 
type.fieldTypeName("NAME"));
+                        assertEquals(String.class.getSimpleName(), 
type.fieldTypeName(NAME));
 
                         userValType = true;
 
                         break;
 
                     case CITY_VAL_TYPE:
-                        
assertTrue(type.fieldNames().containsAll(Arrays.asList("NAME", "ZIP_CODE")));
+                        
assertTrue(type.fieldNames().containsAll(Arrays.asList(NAME, ZIP_CODE)));
                         assertEquals(cityValType ? 3 : 2, 
type.fieldNames().size());
-                        assertEquals(String.class.getSimpleName(), 
type.fieldTypeName("NAME"));
-                        assertEquals(String.class.getSimpleName(), 
type.fieldTypeName("ZIP_CODE"));
+                        assertEquals(String.class.getSimpleName(), 
type.fieldTypeName(NAME));
+                        assertEquals(String.class.getSimpleName(), 
type.fieldTypeName(ZIP_CODE));
 
                         // Alter table happen.
                         if (cityValType) {
-                            assertTrue(type.fieldNames().contains("REGION"));
-                            assertEquals(String.class.getSimpleName(), 
type.fieldTypeName("REGION"));
+                            assertTrue(type.fieldNames().contains(REGION));
+                            assertEquals(String.class.getSimpleName(), 
type.fieldTypeName(REGION));
                         }
 
                         cityValType = true;
@@ -308,10 +334,49 @@ public class SqlCdcTest extends AbstractCdcTest {
                 assertEquals(mapper.typeId(typeName), m.typeId());
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvts) 
{
+            cacheEvts.forEachRemaining(evt -> {
+                if (evt.configuration().getName().equals(CITY)) {
+                    assertNotNull(evt.queryEntities());
+                    assertEquals(1, evt.queryEntities().size());
+
+                    QueryEntity tbl = evt.queryEntities().iterator().next();
+
+                    assertEquals(CITY.toUpperCase(), tbl.getTableName());
+                    assertEquals(caches.containsKey(evt.cacheId()) ? 4 : 3, 
tbl.getFields().size());
+                    assertEquals(Integer.class.getName(), tbl.getKeyType());
+                    assertEquals(ID, tbl.getKeyFieldName());
+                    
assertTrue(tbl.getFields().keySet().containsAll(Arrays.asList(ID, NAME, 
ZIP_CODE)));
+
+                    if (caches.containsKey(evt.cacheId()))
+                        assertTrue(tbl.getFields().containsKey(REGION));
+
+                    assertEquals((Integer)6, 
tbl.getFieldsPrecision().get(ZIP_CODE));
+                }
+                else if (evt.configuration().getName().equals(USER)) {
+                    assertNotNull(evt.queryEntities());
+                    assertEquals(1, evt.queryEntities().size());
+
+                    QueryEntity tbl = evt.queryEntities().iterator().next();
+
+                    assertEquals(USER.toUpperCase(), tbl.getTableName());
+                    assertEquals(3, tbl.getFields().size());
+                    assertEquals(USER_KEY_TYPE, tbl.getKeyType());
+                    assertEquals(USER_VAL_TYPE, tbl.getValueType());
+                    
assertTrue(tbl.getFields().keySet().containsAll(Arrays.asList(ID, CITY_ID, 
NAME)));
+                }
+                else
+                    fail("Unknown cache[" + evt.configuration().getName() + 
']');
+
+                caches.put(evt.cacheId(), evt);
+            });
+        }
     }
 
     /** */
-    private List<List<?>> executeSql(IgniteEx node, String sqlText, Object... 
args) {
+    static List<List<?>> executeSql(IgniteEx node, String sqlText, Object... 
args) {
         return node.context().query().querySqlFields(new 
SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
     }
 
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
index 10905a573fa..b182ee0efe1 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite3.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyTypeRegistryTest;
+import org.apache.ignite.internal.cdc.CacheEventsCdcTest;
 import org.apache.ignite.internal.cdc.SqlCdcTest;
 import org.apache.ignite.internal.metric.SystemViewSelfTest;
 import org.apache.ignite.internal.processors.cache.BigEntryQueryTest;
@@ -380,7 +381,8 @@ import org.junit.runners.Suite;
     IgniteStatisticsTestSuite.class,
 
     // CDC tests.
-    SqlCdcTest.class
+    SqlCdcTest.class,
+    CacheEventsCdcTest.class
 
 })
 public class IgniteBinaryCacheQueryTestSuite3 {
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java 
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
index 84ac26bcf88..59e65c4848d 100644
--- 
a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
+++ 
b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java
@@ -144,6 +144,16 @@ public class CdcConfigurationTest extends 
GridCommonAbstractTest {
             // No-Op.
         }
 
+        /** {@inheritDoc} */
+        @Override public void onCacheChange(Iterator<CdcCacheEvent> 
cacheEvents) {
+            // No-Op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheDestroy(Iterator<Integer> caches) {
+            // No-Op.
+        }
+
         /** {@inheritDoc} */
         @Override public void stop() {
             // No-Op.

Reply via email to