This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c79a62f2d60 IGNITE-18425 Added CDC command to forcefully resend cache 
data  (#10524)
c79a62f2d60 is described below

commit c79a62f2d60667113dd77ba21b6d41d91f0e1864
Author: Nikita Amelchev <nsamelc...@gmail.com>
AuthorDate: Fri Apr 7 10:51:05 2023 +0300

    IGNITE-18425 Added CDC command to forcefully resend cache data  (#10524)
---
 docs/_docs/persistence/change-data-capture.adoc    |  22 +-
 .../internal/commandline/cdc/CdcCommand.java       |  97 +------
 .../internal/commandline/cdc/CdcSubcommands.java   |  66 +++++
 ...and.java => DeleteLostSegmentLinksCommand.java} |  23 +-
 .../internal/commandline/cdc/ResendCommand.java    | 115 ++++++++
 .../commandline/CommandHandlerParsingTest.java     |   4 +-
 .../testsuites/IgniteControlUtilityTestSuite2.java |   4 +-
 .../org/apache/ignite/util/CdcCommandTest.java     | 321 ++++++++++++++++++++-
 .../apache/ignite/util/CdcResendCommandTest.java   |  97 +++++++
 .../util/GridCommandHandlerClusterByClassTest.java |   4 +-
 .../org/apache/ignite/internal/cdc/CdcMain.java    |   3 +-
 .../internal/pagemem/wal/record/CdcDataRecord.java |  40 +++
 .../internal/pagemem/wal/record/WALRecord.java     |   5 +-
 .../persistence/wal/FileWriteAheadLogManager.java  |   3 +-
 .../wal/reader/StandaloneWalRecordsIterator.java   |   1 +
 .../wal/serializer/RecordDataV1Serializer.java     |   3 +-
 .../wal/serializer/RecordDataV2Serializer.java     |   3 +
 .../visor/cdc/VisorCdcCacheDataResendTask.java     | 248 ++++++++++++++++
 .../visor/cdc/VisorCdcCacheDataResendTaskArg.java  |  59 ++++
 .../org/apache/ignite/cdc/AbstractCdcTest.java     |  10 +-
 .../testframework/wal/record/RecordUtils.java      |   2 +
 ...ridCommandHandlerClusterByClassTest_help.output |   9 +-
 ...andHandlerClusterByClassWithSSLTest_help.output |   9 +-
 23 files changed, 1029 insertions(+), 119 deletions(-)

diff --git a/docs/_docs/persistence/change-data-capture.adoc 
b/docs/_docs/persistence/change-data-capture.adoc
index f04fffaee55..846e389cb5a 100644
--- a/docs/_docs/persistence/change-data-capture.adoc
+++ b/docs/_docs/persistence/change-data-capture.adoc
@@ -152,7 +152,7 @@ So when enabled there will be gap between segments: 
`0000000000000002.wal`, `000
 In this case `ignite-cdc.sh` will fail with the something like "Found missed 
segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]".
 
 NOTE: Make sure you need to sync data before restarting the CDC application. 
You can synchronize caches using
-snapshot or other methods.
+link:#forcefully-resend-all-cache-data-to-cdc[resend command], snapshot or 
other methods.
 
 To fix this error you can run the following link:tools/control-script[Control 
Script] command:
 
@@ -171,6 +171,26 @@ For example, CDC was turned off several times: 
`000000000000002.wal`, `000000000
 Then, after the command is executed, the following segment links will be 
deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`.
 The application will start from the `0000000000000010.wal` segment after being 
enabled.
 
+== Forcefully resend all cache data to CDC
+
+When the CDC has been forcefully disabled for a while, cache changes will be 
skipped.
+In this case it is necessary to resend data from existing caches.
+For example, this is important if you need to ensure consistency of cache data 
before a replication restart.
+
+NOTE: The command will be canceled if cluster was not rebalanced or topology 
changed (node left/joined, baseline changed).
+
+To forcefully resend all cache data to CDC you can run the following 
link:tools/control-script[Control Script] command:
+
+[source,shell]
+----
+# Forcefully resend all cache data to CDC. Iterates over caches and writes 
primary copies of data entries to the WAL to get captured by CDC:
+control.sh|bat --cdc resend --caches cache1,...,cacheN
+----
+
+The command will iterate over caches and writes primary copies of data entries 
to the WAL to get captured by the CDC application.
+
+NOTE: There are no guarantees of notifying the CDC consumer on concurrent 
cache updates: use the `CdcEvent#version` to resolve version.
+
 == cdc-ext
 
 Ignite extensions project has 
link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext]
 module which provides two way to setup cross cluster replication based on CDC.
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
index 8dbd2b963ab..9d7352d1a7b 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
@@ -17,115 +17,44 @@
 
 package org.apache.ignite.internal.commandline.cdc;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.client.GridClientConfiguration;
-import org.apache.ignite.internal.client.GridClientNode;
 import org.apache.ignite.internal.commandline.AbstractCommand;
-import org.apache.ignite.internal.commandline.Command;
 import org.apache.ignite.internal.commandline.CommandArgIterator;
-import org.apache.ignite.internal.commandline.CommandLogger;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.visor.VisorTaskArgument;
-import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;
-
-import static org.apache.ignite.internal.commandline.CommandList.CDC;
-import static org.apache.ignite.internal.commandline.CommandLogger.optional;
-import static 
org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION;
-import static 
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
 
 /**
  * CDC command.
  */
-public class CdcCommand extends AbstractCommand<String> {
-    /** Command to delete lost segment links. */
-    public static final String DELETE_LOST_SEGMENT_LINKS = 
"delete_lost_segment_links";
-
-    /** */
-    public static final String NODE_ID = "--node-id";
-
-    /** Node ID. */
-    private UUID nodeId;
+public class CdcCommand extends AbstractCommand<Object> {
+    /** Cdc sub-command to execute. */
+    private AbstractCommand<?> cmd;
 
     /** {@inheritDoc} */
     @Override public Object execute(GridClientConfiguration clientCfg, 
IgniteLogger log) throws Exception {
-        try (GridClient client = Command.startClient(clientCfg)) {
-            executeTaskByNameOnNode(
-                client,
-                VisorCdcDeleteLostSegmentsTask.class.getName(),
-                null,
-                nodeId,
-                clientCfg
-            );
-
-            Collection<UUID> nodeIds = nodeId != null ? 
Collections.singletonList(nodeId) :
-                client.compute().nodes(node -> 
!node.isClient()).stream().map(GridClientNode::nodeId)
-                    .collect(Collectors.toSet());
-
-            
client.compute().execute(VisorCdcDeleteLostSegmentsTask.class.getName(),
-                new VisorTaskArgument<>(nodeIds, false));
-
-            String res = "Lost segment CDC links successfully removed.";
-
-            log.info(res);
-
-            return res;
-        }
-        catch (Throwable e) {
-            log.error("Failed to perform operation.");
-            log.error(CommandLogger.errorMessage(e));
-
-            throw e;
-        }
+        return cmd.execute(clientCfg, log);
     }
 
     /** {@inheritDoc} */
     @Override public void parseArguments(CommandArgIterator argIter) {
-        nodeId = null;
-
-        String cmd = argIter.nextArg("Expected command: " + 
DELETE_LOST_SEGMENT_LINKS);
-
-        if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd))
-            throw new IllegalArgumentException("Unexpected command: " + cmd);
-
-        while (argIter.hasNextSubArg()) {
-            String opt = argIter.nextArg("Failed to read command argument.");
+        cmd = CdcSubcommands.of(argIter.nextArg("Expected CDC 
sub-command.")).subCommand();
 
-            if (NODE_ID.equalsIgnoreCase(opt))
-                nodeId = argIter.nextUuidArg(NODE_ID);
-        }
+        cmd.parseArguments(argIter);
     }
 
     /** {@inheritDoc} */
-    @Override public String confirmationPrompt() {
-        return "Warning: The command will fix WAL segments gap in case CDC 
link creation was stopped by distributed " +
-            "property or excess of maximum CDC directory size. Gap will be 
fixed by deletion of WAL segment links" +
-            "previous to the last gap." + U.nl() +
-            "All changes in deleted segment links will be lost!" + U.nl() +
-            "Make sure you need to sync data before restarting the CDC 
application. You can synchronize caches " +
-            "using snapshot or other methods.";
+    @Override public Object arg() {
+        return cmd.arg();
     }
 
     /** {@inheritDoc} */
-    @Override public String arg() {
-        return null;
+    @Override public String confirmationPrompt() {
+        return cmd == null ? null : cmd.confirmationPrompt();
     }
 
     /** {@inheritDoc} */
-    @Override public void printUsage(IgniteLogger logger) {
-        Map<String, String> params = new LinkedHashMap<>();
-
-        params.put("node_id", "ID of the node to delete lost segment links 
from. If not set, the command will affect " +
-            "all server nodes.");
-
-        usage(logger, "Delete lost segment CDC links:", CDC, params, 
DELETE_LOST_SEGMENT_LINKS,
-            optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION));
+    @Override public void printUsage(IgniteLogger log) {
+        for (CdcSubcommands cmd : CdcSubcommands.values())
+            cmd.subCommand().printUsage(log);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java
new file mode 100644
index 00000000000..a82d4ec6768
--- /dev/null
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.cdc;
+
+import org.apache.ignite.internal.commandline.AbstractCommand;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * CDC sub commands.
+ */
+public enum CdcSubcommands {
+    /** Sub-command to delete lost segment links. */
+    DELETE_LOST_SEGMENT_LINKS(new DeleteLostSegmentLinksCommand()),
+
+    /** Sub-command to forcefully resend cache data. */
+    RESEND(new ResendCommand());
+
+    /** Sub-command. */
+    private final AbstractCommand<?> cmd;
+
+    /** @param cmd Sub-command. */
+    CdcSubcommands(AbstractCommand<?> cmd) {
+        this.cmd = cmd;
+    }
+
+    /**
+     * @param name Command name (case insensitive).
+     * @return Command for the specified name.
+     */
+    public static CdcSubcommands of(String name) {
+        CdcSubcommands[] cmds = values();
+
+        for (CdcSubcommands cmd : cmds) {
+            if (cmd.subCommand().name().equalsIgnoreCase(name))
+                return cmd;
+        }
+
+        throw new IllegalArgumentException(
+            "Invalid argument: " + name + ". One of " + F.asList(cmds) + " is 
expected.");
+    }
+
+    /** @return Sub-command. */
+    public AbstractCommand<?> subCommand() {
+        return cmd;
+    }
+
+    /** @return Sub-command name. */
+    @Override public String toString() {
+        return cmd.name();
+    }
+}
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java
similarity index 91%
copy from 
modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
copy to 
modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java
index 8dbd2b963ab..12bca8c9711 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java
@@ -41,9 +41,9 @@ import static 
org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CO
 import static 
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
 
 /**
- * CDC command.
+ * Command to delete lost segment links.
  */
-public class CdcCommand extends AbstractCommand<String> {
+public class DeleteLostSegmentLinksCommand extends AbstractCommand<Object> {
     /** Command to delete lost segment links. */
     public static final String DELETE_LOST_SEGMENT_LINKS = 
"delete_lost_segment_links";
 
@@ -89,11 +89,6 @@ public class CdcCommand extends AbstractCommand<String> {
     @Override public void parseArguments(CommandArgIterator argIter) {
         nodeId = null;
 
-        String cmd = argIter.nextArg("Expected command: " + 
DELETE_LOST_SEGMENT_LINKS);
-
-        if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd))
-            throw new IllegalArgumentException("Unexpected command: " + cmd);
-
         while (argIter.hasNextSubArg()) {
             String opt = argIter.nextArg("Failed to read command argument.");
 
@@ -112,25 +107,25 @@ public class CdcCommand extends AbstractCommand<String> {
             "using snapshot or other methods.";
     }
 
-    /** {@inheritDoc} */
-    @Override public String arg() {
-        return null;
-    }
-
     /** {@inheritDoc} */
     @Override public void printUsage(IgniteLogger logger) {
         Map<String, String> params = new LinkedHashMap<>();
 
-        params.put("node_id", "ID of the node to delete lost segment links 
from. If not set, the command will affect " +
+        params.put(NODE_ID + " node_id", "ID of the node to delete lost 
segment links from. If not set, the command will affect " +
             "all server nodes.");
 
         usage(logger, "Delete lost segment CDC links:", CDC, params, 
DELETE_LOST_SEGMENT_LINKS,
             optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION));
     }
 
+    /** {@inheritDoc} */
+    @Override public Object arg() {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public String name() {
-        return "cdc";
+        return DELETE_LOST_SEGMENT_LINKS;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java
new file mode 100644
index 00000000000..1ba381f347a
--- /dev/null
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.cdc;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.commandline.AbstractCommand;
+import org.apache.ignite.internal.commandline.Command;
+import org.apache.ignite.internal.commandline.CommandArgIterator;
+import org.apache.ignite.internal.commandline.CommandLogger;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.cdc.VisorCdcCacheDataResendTask;
+import org.apache.ignite.internal.visor.cdc.VisorCdcCacheDataResendTaskArg;
+
+import static org.apache.ignite.internal.commandline.CommandList.CDC;
+import static 
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
+
+/**
+ * The command to forcefully resend all cache data to CDC.
+ * Iterates over given caches and writes data entries to the WAL to get 
captured by CDC.
+ */
+public class ResendCommand extends AbstractCommand<Object> {
+    /** Command name. */
+    public static final String RESEND = "resend";
+
+    /** */
+    public static final String CACHES = "--caches";
+
+    /** */
+    private VisorCdcCacheDataResendTaskArg arg;
+
+    /** {@inheritDoc} */
+    @Override public Object execute(GridClientConfiguration clientCfg, 
IgniteLogger log) throws Exception {
+        try (GridClient client = Command.startClient(clientCfg)) {
+            executeTaskByNameOnNode(client, 
VisorCdcCacheDataResendTask.class.getName(), arg, null, clientCfg);
+
+            String res = "Successfully resent all cache data to CDC.";
+
+            log.info(res);
+
+            return res;
+        }
+        catch (Throwable e) {
+            log.error("Failed to perform operation.");
+            log.error(CommandLogger.errorMessage(e));
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void parseArguments(CommandArgIterator argIter) {
+        Set<String> caches = null;
+
+        while (argIter.hasNextSubArg()) {
+            String opt = argIter.nextArg("Failed to read command argument.");
+
+            if (CACHES.equalsIgnoreCase(opt)) {
+                if (caches != null)
+                    throw new IllegalArgumentException(CACHES + " arg 
specified twice.");
+
+                caches = argIter.nextStringSet("comma-separated list of cache 
names.");
+            }
+        }
+
+        if (F.isEmpty(caches))
+            throw new IllegalArgumentException("At least one cache name should 
be specified.");
+
+        arg = new VisorCdcCacheDataResendTaskArg(caches);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printUsage(IgniteLogger logger) {
+        Map<String, String> params = new LinkedHashMap<>();
+
+        params.put(CACHES + " cache1,...,cacheN", "specifies a comma-separated 
list of cache names.");
+
+        usage(logger, "Forcefully resend all cache data to CDC. Iterates over 
caches and writes primary copies " +
+            "of data entries to the WAL to get captured by CDC:", CDC, params, 
RESEND, CACHES, "cache1,...,cacheN");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object arg() {
+        return arg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return RESEND;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean experimental() {
+        return true;
+    }
+}
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
index b06ac82fb4b..3a24c7a59ce 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
@@ -79,8 +79,8 @@ import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.FIND
 import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES;
 import static 
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST;
 import static 
org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH;
-import static 
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
-import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
+import static 
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS;
+import static 
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
index ce8571c4ef4..409a75db2a8 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import 
org.apache.ignite.internal.commandline.indexreader.IgniteIndexReaderTest;
 import org.apache.ignite.util.CacheMetricsCommandTest;
 import org.apache.ignite.util.CdcCommandTest;
+import org.apache.ignite.util.CdcResendCommandTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest;
 import 
org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest;
@@ -67,7 +68,8 @@ import org.junit.runners.Suite;
 
     IgniteIndexReaderTest.class,
 
-    CdcCommandTest.class
+    CdcCommandTest.class,
+    CdcResendCommandTest.class
 })
 public class IgniteControlUtilityTestSuite2 {
 }
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
index b04090e485b..4604fccd878 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java
@@ -23,40 +23,66 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cdc.AbstractCdcTest.UserCdcConsumer;
 import org.apache.ignite.cdc.CdcConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridJobExecuteRequest;
+import org.apache.ignite.internal.GridJobExecuteResponse;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.commandline.CommandList;
+import org.apache.ignite.internal.commandline.cdc.CdcSubcommands;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
+import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
 import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
 import static org.apache.ignite.cdc.CdcSelfTest.addData;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
 import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
 import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
 import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
-import static 
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
-import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
+import static 
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS;
+import static 
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.CACHES;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.RESEND;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
 import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.GridTestUtils.stopThreads;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  * CDC command tests.
  */
 public class CdcCommandTest extends GridCommandHandlerAbstractTest {
+    /** */
+    private static final String CDC_DISABLED_DATA_REGION = 
"cdc_disabled_data_region";
+
     /** */
     private IgniteEx srv0;
 
@@ -66,6 +92,9 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
     /** */
     private DistributedChangeableProperty<Serializable> cdcDisabled;
 
+    /** */
+    private volatile IgniteThrowableConsumer<WALRecord> onLogLsnr;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -75,11 +104,34 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setWalForceArchiveTimeout(1000)
+            .setDataRegionConfigurations(new DataRegionConfiguration()
+                .setName(CDC_DISABLED_DATA_REGION)
+                .setCdcEnabled(false))
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                 .setCdcEnabled(true)));
 
         cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED);
 
+        cfg.setPluginProviders(new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "Test WAL provider";
+            }
+
+            @Override public <T> @Nullable T createComponent(PluginContext 
ctx, Class<T> cls) {
+                if (!IgniteWriteAheadLogManager.class.equals(cls))
+                    return null;
+
+                return (T)new 
FileWriteAheadLogManager(((IgniteEx)ctx.grid()).context()) {
+                    @Override public WALPointer log(WALRecord rec) throws 
IgniteCheckedException {
+                        if (rec instanceof CdcDataRecord && onLogLsnr != null)
+                            onLogLsnr.accept(rec);
+
+                        return super.log(rec);
+                    }
+                };
+            }
+        });
+
         return cfg;
     }
 
@@ -92,6 +144,8 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
         srv0 = startGrid(0);
         srv1 = startGrid(1);
 
+        awaitPartitionMapExchange();
+
         cdcDisabled = 
srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED);
     }
 
@@ -99,6 +153,8 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
+        stopThreads(log);
+
         stopAllGrids();
 
         cleanPersistenceDir();
@@ -111,7 +167,7 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
 
         assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
                 CommandList.CDC.text(), "unexpected_command"),
-            "Unexpected command: unexpected_command");
+            "Invalid argument: unexpected_command. One of " + 
F.asList(CdcSubcommands.values()) + " is expected.");
 
         assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
                 CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID),
@@ -131,7 +187,7 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
 
         CdcConfiguration cfg = new CdcConfiguration();
 
-        cfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() {
+        cfg.setConsumer(new UserCdcConsumer() {
             @Override public void start(MetricRegistry mreg) {
                 appStarted.countDown();
             }
@@ -148,8 +204,6 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
             "Failed to delete lost segment CDC links. Unable to acquire lock 
to lock CDC folder.");
 
         assertFalse(fut.isDone());
-
-        fut.cancel();
     }
 
     /** */
@@ -222,4 +276,257 @@ public class CdcCommandTest extends 
GridCommandHandlerAbstractTest {
 
         latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
     }
+
+    /** */
+    @Test
+    public void testParseResend() {
+        injectTestSystemOut();
+
+        assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+                CommandList.CDC.text(), "unexpected_command"),
+            "Invalid argument: unexpected_command. One of " + 
F.asList(CdcSubcommands.values()) + " is expected.");
+
+        assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+                CommandList.CDC.text(), RESEND),
+            "At least one cache name should be specified.");
+
+        assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS,
+                CommandList.CDC.text(), RESEND, CACHES),
+            "At least one cache name should be specified.");
+    }
+
+    /** */
+    @Test
+    public void testResendCacheData() throws Exception {
+        UserCdcConsumer cnsmr0 = runCdc(srv0);
+        UserCdcConsumer cnsmr1 = runCdc(srv1);
+
+        addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        waitForSize(cnsmr0, 
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+        waitForSize(cnsmr1, 
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+
+        cnsmr0.clear();
+        cnsmr1.clear();
+
+        executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, CACHES, 
DEFAULT_CACHE_NAME);
+
+        waitForSize(cnsmr0, 
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+        waitForSize(cnsmr1, 
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+    }
+
+    /** */
+    @Test
+    public void testResendCachesNotExist() {
+        injectTestSystemOut();
+
+        assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+                CommandList.CDC.text(), RESEND, CACHES, "unknown_cache"),
+            "Cache does not exist");
+
+        String cdcDisabledCacheName = "cdcDisabledCache";
+
+        srv0.getOrCreateCache(new CacheConfiguration<>()
+            .setName(cdcDisabledCacheName)
+            .setDataRegionName(CDC_DISABLED_DATA_REGION));
+
+        assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+                CommandList.CDC.text(), RESEND, CACHES, cdcDisabledCacheName),
+            "CDC is not enabled for given cache");
+    }
+
+    /** */
+    @Test
+    public void testResendCancelOnNodeLeft() {
+        injectTestSystemOut();
+
+        addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        for (Ignite srv : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) 
-> {
+                if (msg instanceof GridJobExecuteResponse) {
+                    GridTestUtils.runAsync(srv::close);
+
+                    return true;
+                }
+
+                return false;
+            });
+        }
+
+        assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+                CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME),
+            "CDC cache data resend cancelled. Failed to resend cache data on 
the node");
+    }
+
+    /** */
+    @Test
+    public void testResendCancelOnRebalanceInProgress() throws Exception {
+        injectTestSystemOut();
+
+        addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        CountDownLatch rebalanceStarted = new CountDownLatch(1);
+
+        for (Ignite srv : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) 
-> {
+                if (msg instanceof GridDhtPartitionSupplyMessage) {
+                    rebalanceStarted.countDown();
+
+                    return true;
+                }
+
+                return false;
+            });
+        }
+
+        GridTestUtils.runAsync(() -> startGrid(3));
+
+        rebalanceStarted.await();
+
+        assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+                CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME),
+            "CDC cache data resend cancelled. Rebalance sheduled");
+    }
+
+    /** */
+    @Test
+    public void testResendCancelOnTopologyChangeBeforeStart() throws Exception 
{
+        injectTestSystemOut();
+
+        addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        CountDownLatch blocked = new CountDownLatch(1);
+
+        for (Ignite srv : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) 
-> {
+                if (msg instanceof GridJobExecuteRequest) {
+                    blocked.countDown();
+
+                    return true;
+                }
+
+                return false;
+            });
+        }
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
+            assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+                    CommandList.CDC.text(), RESEND, CACHES, 
DEFAULT_CACHE_NAME),
+                "CDC cache data resend cancelled. Topology changed");
+        });
+
+        blocked.await();
+
+        startGrid(3);
+        awaitPartitionMapExchange();
+
+        for (Ignite srv : G.allGrids())
+            TestRecordingCommunicationSpi.spi(srv).stopBlock();
+
+        fut.get();
+    }
+
+    /** */
+    @Test
+    public void testResendCancelOnTopologyChange() throws Exception {
+        injectTestSystemOut();
+
+        addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        CountDownLatch preload = new CountDownLatch(1);
+        CountDownLatch topologyChanged = new CountDownLatch(1);
+
+        AtomicInteger cnt = new AtomicInteger();
+
+        onLogLsnr = rec -> {
+            if (cnt.incrementAndGet() < KEYS_CNT / 2)
+                return;
+
+            preload.countDown();
+
+            U.await(topologyChanged);
+        };
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
+            assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR,
+                    CommandList.CDC.text(), RESEND, CACHES, 
DEFAULT_CACHE_NAME),
+                "CDC cache data resend cancelled. Topology changed");
+        });
+
+        preload.await();
+
+        startGrid(3);
+
+        topologyChanged.countDown();
+
+        fut.get();
+    }
+
+    /** */
+    @Test
+    public void testResendOnClientJoin() throws Exception {
+        UserCdcConsumer cnsmr0 = runCdc(srv0);
+        UserCdcConsumer cnsmr1 = runCdc(srv1);
+
+        addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        waitForSize(cnsmr0, 
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+        waitForSize(cnsmr1, 
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+
+        cnsmr0.clear();
+        cnsmr1.clear();
+
+        CountDownLatch blocked = new CountDownLatch(1);
+
+        for (Ignite srv : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) 
-> {
+                if (msg instanceof GridJobExecuteRequest) {
+                    blocked.countDown();
+
+                    return true;
+                }
+
+                return false;
+            });
+        }
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> {
+            executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, 
CACHES, DEFAULT_CACHE_NAME);
+        });
+
+        blocked.await();
+
+        startClientGrid("client");
+
+        for (Ignite srv : G.allGrids())
+            TestRecordingCommunicationSpi.spi(srv).stopBlock();
+
+        fut.get();
+
+        waitForSize(cnsmr0, 
srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+        waitForSize(cnsmr1, 
srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY));
+    }
+
+    /** */
+    public static UserCdcConsumer runCdc(Ignite ign) {
+        UserCdcConsumer cnsmr = new UserCdcConsumer();
+
+        CdcConfiguration cfg = new CdcConfiguration();
+
+        cfg.setConsumer(cnsmr);
+        cfg.setKeepBinary(false);
+
+        CdcMain cdc = new CdcMain(ign.configuration(), null, cfg);
+
+        GridTestUtils.runAsync(cdc);
+
+        return cnsmr;
+    }
+
+    /** */
+    public static void waitForSize(UserCdcConsumer cnsmr, int expSize) throws 
Exception {
+        assertTrue(waitForCondition(() -> expSize == cnsmr.data(UPDATE, 
CU.cacheId(DEFAULT_CACHE_NAME)).size(),
+            60_000));
+    }
 }
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
new file mode 100644
index 00000000000..55416388c89
--- /dev/null
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.commandline.CommandList;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
+import static org.apache.ignite.cdc.CdcSelfTest.addData;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.CACHES;
+import static org.apache.ignite.internal.commandline.cdc.ResendCommand.RESEND;
+import static org.apache.ignite.testframework.GridTestUtils.stopThreads;
+import static org.apache.ignite.util.CdcCommandTest.runCdc;
+import static org.apache.ignite.util.CdcCommandTest.waitForSize;
+
+/**
+ * CDC resend command tests.
+ */
+public class CdcResendCommandTest extends GridCommandHandlerAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalForceArchiveTimeout(1000)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setCdcEnabled(true)
+                .setPersistenceEnabled(true)));
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setBackups(1));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopThreads(log);
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testResendCacheDataRestoreFromWal() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        ign.cluster().state(ACTIVE);
+
+        enableCheckpoints(ign, false);
+
+        addData(ign.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
+
+        AbstractCdcTest.UserCdcConsumer cnsmr = runCdc(ign);
+
+        waitForSize(cnsmr, KEYS_CNT);
+
+        cnsmr.clear();
+
+        executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, CACHES, 
DEFAULT_CACHE_NAME);
+
+        waitForSize(cnsmr, KEYS_CNT);
+
+        stopAllGrids();
+
+        ign = startGrid(0);
+
+        assertEquals(KEYS_CNT, ign.cache(DEFAULT_CACHE_NAME).size());
+    }
+}
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
index 339247850f2..a3cf746a826 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java
@@ -119,8 +119,8 @@ import static 
org.apache.ignite.internal.commandline.cache.CacheDestroy.DESTROY_
 import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.CLEAR;
 import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.DESTROY;
 import static 
org.apache.ignite.internal.commandline.cache.CacheSubcommands.HELP;
-import static 
org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
-import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
+import static 
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS;
+import static 
org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID;
 import static 
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.CACHE;
 import static 
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.PARTITIONS;
 import static 
org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.STRATEGY;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 9ff1fc7adb3..3b187a0593d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -79,6 +79,7 @@ import static 
org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
 import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
 import static 
org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
 import static 
org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
@@ -489,7 +490,7 @@ public class CdcMain implements Runnable {
                 .marshallerMappingFileStoreDir(marshaller)
                 .keepBinary(cdcCfg.isKeepBinary())
                 .filesOrDirs(segment.toFile())
-                .addFilter((type, ptr) -> type == DATA_RECORD_V2);
+                .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == 
CDC_DATA_RECORD);
 
         if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0)
             
builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java
new file mode 100644
index 00000000000..25e0defe66b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * The record to forcefully resend cache data to the CDC application.
+ */
+public class CdcDataRecord extends DataRecord {
+    /** */
+    public CdcDataRecord(DataEntry writeEntry) {
+        super(writeEntry);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.CDC_DATA_RECORD;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CdcDataRecord.class, this, "super", 
super.toString());
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index baa51c35286..3457251e0f4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -288,7 +288,10 @@ public abstract class WALRecord {
         INCREMENTAL_SNAPSHOT_START_RECORD(76, LOGICAL),
 
         /** Incremental snapshot finish record. */
-        INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL);
+        INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL),
+
+        /** CDC data record. */
+        CDC_DATA_RECORD(78, CUSTOM);
 
         /** Index for serialization. Should be consistent throughout all 
versions. */
         private final int idx;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index edbc38d5dc3..f68b366fbb6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -152,6 +152,7 @@ import static 
org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
@@ -990,7 +991,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                     // Only data records handled by CDC.
                     // No need to forcefully rollover for other record types.
-                    if (walForceArchiveTimeout > 0 && rec.type() == 
DATA_RECORD_V2)
+                    if (walForceArchiveTimeout > 0 && (rec.type() == 
DATA_RECORD_V2 || rec.type() == CDC_DATA_RECORD))
                         lastDataRecordLoggedMs.set(millis);
                 }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 051be3616a4..df6ca772291 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -360,6 +360,7 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
 
         if (processor != null && (rec.type() == RecordType.DATA_RECORD
             || rec.type() == RecordType.DATA_RECORD_V2
+            || rec.type() == RecordType.CDC_DATA_RECORD
             || rec.type() == RecordType.MVCC_DATA_RECORD)) {
             try {
                 return postProcessDataRecord((DataRecord)rec, kernalCtx, 
processor);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index f902295dcc9..3907bf2c98f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -126,6 +126,7 @@ import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2;
@@ -2171,7 +2172,7 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
         int partId = in.readInt();
         long partCntr = in.readLong();
         long expireTime = in.readLong();
-        byte flags = type == DATA_RECORD_V2 ? in.readByte() : (byte)0;
+        byte flags = type == DATA_RECORD_V2 || type == CDC_DATA_RECORD ? 
in.readByte() : (byte)0;
 
         GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 62fc50ef73f..77cc5eb0cfe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -91,6 +91,7 @@ public class RecordDataV2Serializer extends 
RecordDataV1Serializer {
                 return 18 + cacheStatesSize + (walPtr == null ? 0 : 16);
 
             case MVCC_DATA_RECORD:
+            case CDC_DATA_RECORD:
                 return 4/*entry count*/ + 8/*timestamp*/ + 
dataSize((DataRecord)rec);
 
             case DATA_RECORD_V2:
@@ -162,6 +163,7 @@ public class RecordDataV2Serializer extends 
RecordDataV1Serializer {
 
             case DATA_RECORD:
             case DATA_RECORD_V2:
+            case CDC_DATA_RECORD:
                 int entryCnt = in.readInt();
                 long timeStamp = in.readLong();
 
@@ -272,6 +274,7 @@ public class RecordDataV2Serializer extends 
RecordDataV1Serializer {
 
             case MVCC_DATA_RECORD:
             case DATA_RECORD_V2:
+            case CDC_DATA_RECORD:
                 DataRecord dataRec = (DataRecord)rec;
 
                 int entryCnt = dataRec.entryCount();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java
new file mode 100644
index 00000000000..20f5fe1c82f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to forcefully resend all cache data to CDC.
+ * Iterates over caches and writes primary copies of data entries to the WAL 
to get captured by CDC.
+ */
+@GridInternal
+public class VisorCdcCacheDataResendTask extends 
VisorMultiNodeTask<VisorCdcCacheDataResendTaskArg, Void, Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Topology version when task was started. */
+    private AffinityTopologyVersion topVer;
+
+    /** {@inheritDoc} */
+    @Override protected VisorJob<VisorCdcCacheDataResendTaskArg, Void> 
job(VisorCdcCacheDataResendTaskArg arg) {
+        return new VisorCdcCacheDataResendJob(arg, topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<UUID> 
jobNodes(VisorTaskArgument<VisorCdcCacheDataResendTaskArg> arg) {
+        // Check there is no rebalance.
+        GridDhtPartitionsExchangeFuture fut = 
ignite.context().cache().context().exchange().lastFinishedFuture();
+
+        if (!fut.rebalanced()) {
+            throw new IgniteException("CDC cache data resend cancelled. 
Rebalance sheduled " +
+                "[topVer=" + fut.topologyVersion() + ']');
+        }
+
+        // Cancel resend if affinity will change.
+        topVer = 
ignite.context().cache().context().exchange().lastAffinityChangedTopologyVersion(fut.topologyVersion());
+
+        return F.nodeIds(ignite.cluster().forServers().nodes());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected @Nullable Void reduce0(List<ComputeJobResult> results) 
throws IgniteException {
+        for (ComputeJobResult res : results) {
+            if (res.getException() != null) {
+                throw new IgniteException("CDC cache data resend cancelled. 
Failed to resend cache data " +
+                    "on the node [nodeId=" + res.getNode().id() + ']', 
res.getException());
+            }
+        }
+
+        return null;
+    }
+
+    /** */
+    private static class VisorCdcCacheDataResendJob extends 
VisorJob<VisorCdcCacheDataResendTaskArg, Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Injected logger. */
+        @LoggerResource
+        protected IgniteLogger log;
+
+        /** */
+        private IgniteWriteAheadLogManager wal;
+
+        /** */
+        private GridCachePartitionExchangeManager<Object, Object> exchange;
+
+        /** Topology version when task was started. */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        private GridDhtPartitionsExchangeFuture lastFut;
+
+        /**
+         * @param arg Job argument.
+         * @param topVer Topology version when task was started.
+         */
+        protected VisorCdcCacheDataResendJob(VisorCdcCacheDataResendTaskArg 
arg, AffinityTopologyVersion topVer) {
+            super(arg, false);
+
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Void run(VisorCdcCacheDataResendTaskArg arg) 
throws IgniteException {
+            if (F.isEmpty(arg.caches()))
+                throw new IllegalArgumentException("Caches are not 
specified.");
+
+            List<IgniteInternalCache<?, ?>> caches = new ArrayList<>();
+
+            for (String name : arg.caches()) {
+                IgniteInternalCache<?, ?> cache = 
ignite.context().cache().cache(name);
+
+                if (cache == null)
+                    throw new IgniteException("Cache does not exist 
[cacheName=" + name + ']');
+
+                if (!cache.context().dataRegion().config().isCdcEnabled()) {
+                    throw new IgniteException("CDC is not enabled for given 
cache [cacheName=" + name +
+                        ", dataRegionName=" + 
cache.context().dataRegion().config().getName() + ']');
+                }
+
+                if (cache.context().mvccEnabled())
+                    throw new UnsupportedOperationException("The 
TRANSACTIONAL_SNAPSHOT mode is not supported.");
+
+                caches.add(cache);
+            }
+
+            if (log.isInfoEnabled())
+                log.info("CDC cache data resend started [caches=" + 
String.join(", ", arg.caches()) + ']');
+
+            wal = ignite.context().cache().context().wal(true);
+            exchange = ignite.context().cache().context().exchange();
+
+            try {
+                Iterator<IgniteInternalCache<?, ?>> iter = caches.iterator();
+
+                while (iter.hasNext() && !isCancelled())
+                    resendCacheData(iter.next());
+
+                wal.flush(null, true);
+
+                if (log.isInfoEnabled()) {
+                    log.info("CDC cache data resend " + (isCancelled() ? 
"cancelled" : "finished") +
+                        " [caches=" + String.join(", ", arg.caches()) + ']');
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+
+            return null;
+        }
+
+        /** @param cache Cache. */
+        private void resendCacheData(IgniteInternalCache<?, ?> cache) throws 
IgniteCheckedException {
+            if (log.isInfoEnabled())
+                log.info("CDC cache data resend started [cacheName=" + 
cache.name() + ']');
+
+            GridCacheContext<?, ?> cctx = cache.context();
+
+            GridIterator<CacheDataRow> localRows = cctx.offheap()
+                .cacheIterator(cctx.cacheId(), true, false, 
AffinityTopologyVersion.NONE, null, null);
+
+            long cnt = 0;
+            Set<Integer> parts = new TreeSet<>();
+
+            for (CacheDataRow row : localRows) {
+                if (isCancelled())
+                    break;
+
+                ensureTopologyNotChanged();
+
+                KeyCacheObject key = row.key();
+
+                if (log.isTraceEnabled())
+                    log.trace("Resend key: " + key);
+
+                CdcDataRecord rec = new CdcDataRecord(new DataEntry(
+                    cctx.cacheId(),
+                    key,
+                    row.value(),
+                    GridCacheOperation.CREATE,
+                    null,
+                    row.version(),
+                    row.expireTime(),
+                    key.partition(),
+                    -1,
+                    DataEntry.flags(true))
+                );
+
+                wal.log(rec);
+
+                parts.add(key.partition());
+
+                if ((++cnt % 1_000 == 0) && log.isDebugEnabled())
+                    log.debug("Resend entries count: " + cnt);
+            }
+
+            if (log.isInfoEnabled()) {
+                if (isCancelled())
+                    log.info("CDC cache data resend cancelled.");
+                else {
+                    log.info("CDC cache data resend finished [cacheName=" + 
cache.name() +
+                        ", entriesCnt=" + cnt + ", parts=" + parts + ']');
+                }
+            }
+        }
+
+        /** */
+        private void ensureTopologyNotChanged() {
+            GridDhtPartitionsExchangeFuture fut = 
exchange.lastFinishedFuture();
+
+            if (lastFut != fut) {
+                AffinityTopologyVersion lastChanged = 
exchange.lastAffinityChangedTopologyVersion(fut.topologyVersion());
+
+                if (!topVer.equals(lastChanged)) {
+                    throw new IgniteException("CDC cache data resend 
cancelled. Topology changed during resend " +
+                        "[startTopVer=" + topVer + ", currentTopVer=" + 
fut.topologyVersion() + ']');
+                }
+
+                lastFut = fut;
+            }
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java
new file mode 100644
index 00000000000..9f64f42955d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cdc;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Set;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+public class VisorCdcCacheDataResendTaskArg extends IgniteDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache names. */
+    private Set<String> caches;
+
+    /** */
+    public VisorCdcCacheDataResendTaskArg() {
+        // No-op.
+    }
+
+    /** */
+    public VisorCdcCacheDataResendTaskArg(Set<String> caches) {
+        this.caches = caches;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        U.writeCollection(out, caches);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
+        caches = U.readSet(in);
+    }
+
+    /** @return Cache names. */
+    public Set<String> caches() {
+        return caches;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
index 6222fb2a68f..3c8c32c7a07 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
@@ -344,7 +344,12 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
 
         /** @return Read keys. */
         public List<T> data(ChangeEventType op, int cacheId) {
-            return data.get(F.t(op, cacheId));
+            return data.computeIfAbsent(F.t(op, cacheId), k -> new 
ArrayList<>());
+        }
+
+        /** */
+        public void clear() {
+            data.clear();
         }
 
         /** */
@@ -405,7 +410,8 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
                 String typeName = m.typeName();
 
                 assertFalse(typeName.isEmpty());
-                assertEquals(mapper.typeId(typeName), m.typeId());
+                // Can also be registered by OptimizedMarshaller.
+                assertTrue(m.typeId() == mapper.typeId(typeName) || m.typeId() 
== typeName.hashCode());
             });
         }
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 63ee1a405ad..60ffcee8686 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -112,6 +112,7 @@ import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_RECYCLE;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REMOVE;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REPLACE;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CONSISTENT_CUT;
@@ -191,6 +192,7 @@ public class RecordUtils {
         put(PAGE_RECORD, RecordUtils::buildPageSnapshot);
         put(DATA_RECORD, RecordUtils::buildDataRecord);
         put(DATA_RECORD_V2, RecordUtils::buildDataRecord);
+        put(CDC_DATA_RECORD, RecordUtils::buildDataRecord);
         put(CHECKPOINT_RECORD, RecordUtils::buildCheckpointRecord);
         put(HEADER_RECORD, buildUpsupportedWalRecord(HEADER_RECORD));
         put(INIT_NEW_PAGE_RECORD, RecordUtils::buildInitNewPageRecord);
diff --git 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
index ef8ff50fb03..6987c0be58c 100644
--- 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
+++ 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
@@ -351,7 +351,14 @@ If the file name isn't specified the output file name is: 
'<typeId>.bin'
     control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] 
[--yes]
 
     Parameters:
-      node_id  - ID of the node to delete lost segment links from. If not set, 
the command will affect all server nodes.
+      --node-id node_id  - ID of the node to delete lost segment links from. 
If not set, the command will affect all server nodes.
+
+  [EXPERIMENTAL]
+  Forcefully resend all cache data to CDC. Iterates over caches and writes 
primary copies of data entries to the WAL to get captured by CDC:
+    control.(sh|bat) --cdc resend --caches cache1,...,cacheN
+
+    Parameters:
+      --caches cache1,...,cacheN  - specifies a comma-separated list of cache 
names.
 
 By default commands affecting the cluster require interactive confirmation.
 Use --yes option to disable it.
diff --git 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
index ef8ff50fb03..6987c0be58c 100644
--- 
a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
+++ 
b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
@@ -351,7 +351,14 @@ If the file name isn't specified the output file name is: 
'<typeId>.bin'
     control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] 
[--yes]
 
     Parameters:
-      node_id  - ID of the node to delete lost segment links from. If not set, 
the command will affect all server nodes.
+      --node-id node_id  - ID of the node to delete lost segment links from. 
If not set, the command will affect all server nodes.
+
+  [EXPERIMENTAL]
+  Forcefully resend all cache data to CDC. Iterates over caches and writes 
primary copies of data entries to the WAL to get captured by CDC:
+    control.(sh|bat) --cdc resend --caches cache1,...,cacheN
+
+    Parameters:
+      --caches cache1,...,cacheN  - specifies a comma-separated list of cache 
names.
 
 By default commands affecting the cluster require interactive confirmation.
 Use --yes option to disable it.

Reply via email to