This is an automated email from the ASF dual-hosted git repository. namelchev pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new c79a62f2d60 IGNITE-18425 Added CDC command to forcefully resend cache data (#10524) c79a62f2d60 is described below commit c79a62f2d60667113dd77ba21b6d41d91f0e1864 Author: Nikita Amelchev <nsamelc...@gmail.com> AuthorDate: Fri Apr 7 10:51:05 2023 +0300 IGNITE-18425 Added CDC command to forcefully resend cache data (#10524) --- docs/_docs/persistence/change-data-capture.adoc | 22 +- .../internal/commandline/cdc/CdcCommand.java | 97 +------ .../internal/commandline/cdc/CdcSubcommands.java | 66 +++++ ...and.java => DeleteLostSegmentLinksCommand.java} | 23 +- .../internal/commandline/cdc/ResendCommand.java | 115 ++++++++ .../commandline/CommandHandlerParsingTest.java | 4 +- .../testsuites/IgniteControlUtilityTestSuite2.java | 4 +- .../org/apache/ignite/util/CdcCommandTest.java | 321 ++++++++++++++++++++- .../apache/ignite/util/CdcResendCommandTest.java | 97 +++++++ .../util/GridCommandHandlerClusterByClassTest.java | 4 +- .../org/apache/ignite/internal/cdc/CdcMain.java | 3 +- .../internal/pagemem/wal/record/CdcDataRecord.java | 40 +++ .../internal/pagemem/wal/record/WALRecord.java | 5 +- .../persistence/wal/FileWriteAheadLogManager.java | 3 +- .../wal/reader/StandaloneWalRecordsIterator.java | 1 + .../wal/serializer/RecordDataV1Serializer.java | 3 +- .../wal/serializer/RecordDataV2Serializer.java | 3 + .../visor/cdc/VisorCdcCacheDataResendTask.java | 248 ++++++++++++++++ .../visor/cdc/VisorCdcCacheDataResendTaskArg.java | 59 ++++ .../org/apache/ignite/cdc/AbstractCdcTest.java | 10 +- .../testframework/wal/record/RecordUtils.java | 2 + ...ridCommandHandlerClusterByClassTest_help.output | 9 +- ...andHandlerClusterByClassWithSSLTest_help.output | 9 +- 23 files changed, 1029 insertions(+), 119 deletions(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index f04fffaee55..846e389cb5a 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -152,7 +152,7 @@ So when enabled there will be gap between segments: `0000000000000002.wal`, `000 In this case `ignite-cdc.sh` will fail with the something like "Found missed segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]". NOTE: Make sure you need to sync data before restarting the CDC application. You can synchronize caches using -snapshot or other methods. +link:#forcefully-resend-all-cache-data-to-cdc[resend command], snapshot or other methods. To fix this error you can run the following link:tools/control-script[Control Script] command: @@ -171,6 +171,26 @@ For example, CDC was turned off several times: `000000000000002.wal`, `000000000 Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`. The application will start from the `0000000000000010.wal` segment after being enabled. +== Forcefully resend all cache data to CDC + +When the CDC has been forcefully disabled for a while, cache changes will be skipped. +In this case it is necessary to resend data from existing caches. +For example, this is important if you need to ensure consistency of cache data before a replication restart. + +NOTE: The command will be canceled if cluster was not rebalanced or topology changed (node left/joined, baseline changed). + +To forcefully resend all cache data to CDC you can run the following link:tools/control-script[Control Script] command: + +[source,shell] +---- +# Forcefully resend all cache data to CDC. Iterates over caches and writes primary copies of data entries to the WAL to get captured by CDC: +control.sh|bat --cdc resend --caches cache1,...,cacheN +---- + +The command will iterate over caches and writes primary copies of data entries to the WAL to get captured by the CDC application. + +NOTE: There are no guarantees of notifying the CDC consumer on concurrent cache updates: use the `CdcEvent#version` to resolve version. + == cdc-ext Ignite extensions project has link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext] module which provides two way to setup cross cluster replication based on CDC. diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java index 8dbd2b963ab..9d7352d1a7b 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java @@ -17,115 +17,44 @@ package org.apache.ignite.internal.commandline.cdc; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.client.GridClientConfiguration; -import org.apache.ignite.internal.client.GridClientNode; import org.apache.ignite.internal.commandline.AbstractCommand; -import org.apache.ignite.internal.commandline.Command; import org.apache.ignite.internal.commandline.CommandArgIterator; -import org.apache.ignite.internal.commandline.CommandLogger; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.visor.VisorTaskArgument; -import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; - -import static org.apache.ignite.internal.commandline.CommandList.CDC; -import static org.apache.ignite.internal.commandline.CommandLogger.optional; -import static org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION; -import static org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode; /** * CDC command. */ -public class CdcCommand extends AbstractCommand<String> { - /** Command to delete lost segment links. */ - public static final String DELETE_LOST_SEGMENT_LINKS = "delete_lost_segment_links"; - - /** */ - public static final String NODE_ID = "--node-id"; - - /** Node ID. */ - private UUID nodeId; +public class CdcCommand extends AbstractCommand<Object> { + /** Cdc sub-command to execute. */ + private AbstractCommand<?> cmd; /** {@inheritDoc} */ @Override public Object execute(GridClientConfiguration clientCfg, IgniteLogger log) throws Exception { - try (GridClient client = Command.startClient(clientCfg)) { - executeTaskByNameOnNode( - client, - VisorCdcDeleteLostSegmentsTask.class.getName(), - null, - nodeId, - clientCfg - ); - - Collection<UUID> nodeIds = nodeId != null ? Collections.singletonList(nodeId) : - client.compute().nodes(node -> !node.isClient()).stream().map(GridClientNode::nodeId) - .collect(Collectors.toSet()); - - client.compute().execute(VisorCdcDeleteLostSegmentsTask.class.getName(), - new VisorTaskArgument<>(nodeIds, false)); - - String res = "Lost segment CDC links successfully removed."; - - log.info(res); - - return res; - } - catch (Throwable e) { - log.error("Failed to perform operation."); - log.error(CommandLogger.errorMessage(e)); - - throw e; - } + return cmd.execute(clientCfg, log); } /** {@inheritDoc} */ @Override public void parseArguments(CommandArgIterator argIter) { - nodeId = null; - - String cmd = argIter.nextArg("Expected command: " + DELETE_LOST_SEGMENT_LINKS); - - if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd)) - throw new IllegalArgumentException("Unexpected command: " + cmd); - - while (argIter.hasNextSubArg()) { - String opt = argIter.nextArg("Failed to read command argument."); + cmd = CdcSubcommands.of(argIter.nextArg("Expected CDC sub-command.")).subCommand(); - if (NODE_ID.equalsIgnoreCase(opt)) - nodeId = argIter.nextUuidArg(NODE_ID); - } + cmd.parseArguments(argIter); } /** {@inheritDoc} */ - @Override public String confirmationPrompt() { - return "Warning: The command will fix WAL segments gap in case CDC link creation was stopped by distributed " + - "property or excess of maximum CDC directory size. Gap will be fixed by deletion of WAL segment links" + - "previous to the last gap." + U.nl() + - "All changes in deleted segment links will be lost!" + U.nl() + - "Make sure you need to sync data before restarting the CDC application. You can synchronize caches " + - "using snapshot or other methods."; + @Override public Object arg() { + return cmd.arg(); } /** {@inheritDoc} */ - @Override public String arg() { - return null; + @Override public String confirmationPrompt() { + return cmd == null ? null : cmd.confirmationPrompt(); } /** {@inheritDoc} */ - @Override public void printUsage(IgniteLogger logger) { - Map<String, String> params = new LinkedHashMap<>(); - - params.put("node_id", "ID of the node to delete lost segment links from. If not set, the command will affect " + - "all server nodes."); - - usage(logger, "Delete lost segment CDC links:", CDC, params, DELETE_LOST_SEGMENT_LINKS, - optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION)); + @Override public void printUsage(IgniteLogger log) { + for (CdcSubcommands cmd : CdcSubcommands.values()) + cmd.subCommand().printUsage(log); } /** {@inheritDoc} */ diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java new file mode 100644 index 00000000000..a82d4ec6768 --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcSubcommands.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline.cdc; + +import org.apache.ignite.internal.commandline.AbstractCommand; +import org.apache.ignite.internal.util.typedef.F; + +/** + * CDC sub commands. + */ +public enum CdcSubcommands { + /** Sub-command to delete lost segment links. */ + DELETE_LOST_SEGMENT_LINKS(new DeleteLostSegmentLinksCommand()), + + /** Sub-command to forcefully resend cache data. */ + RESEND(new ResendCommand()); + + /** Sub-command. */ + private final AbstractCommand<?> cmd; + + /** @param cmd Sub-command. */ + CdcSubcommands(AbstractCommand<?> cmd) { + this.cmd = cmd; + } + + /** + * @param name Command name (case insensitive). + * @return Command for the specified name. + */ + public static CdcSubcommands of(String name) { + CdcSubcommands[] cmds = values(); + + for (CdcSubcommands cmd : cmds) { + if (cmd.subCommand().name().equalsIgnoreCase(name)) + return cmd; + } + + throw new IllegalArgumentException( + "Invalid argument: " + name + ". One of " + F.asList(cmds) + " is expected."); + } + + /** @return Sub-command. */ + public AbstractCommand<?> subCommand() { + return cmd; + } + + /** @return Sub-command name. */ + @Override public String toString() { + return cmd.name(); + } +} diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java similarity index 91% copy from modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java copy to modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java index 8dbd2b963ab..12bca8c9711 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/DeleteLostSegmentLinksCommand.java @@ -41,9 +41,9 @@ import static org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CO import static org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode; /** - * CDC command. + * Command to delete lost segment links. */ -public class CdcCommand extends AbstractCommand<String> { +public class DeleteLostSegmentLinksCommand extends AbstractCommand<Object> { /** Command to delete lost segment links. */ public static final String DELETE_LOST_SEGMENT_LINKS = "delete_lost_segment_links"; @@ -89,11 +89,6 @@ public class CdcCommand extends AbstractCommand<String> { @Override public void parseArguments(CommandArgIterator argIter) { nodeId = null; - String cmd = argIter.nextArg("Expected command: " + DELETE_LOST_SEGMENT_LINKS); - - if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd)) - throw new IllegalArgumentException("Unexpected command: " + cmd); - while (argIter.hasNextSubArg()) { String opt = argIter.nextArg("Failed to read command argument."); @@ -112,25 +107,25 @@ public class CdcCommand extends AbstractCommand<String> { "using snapshot or other methods."; } - /** {@inheritDoc} */ - @Override public String arg() { - return null; - } - /** {@inheritDoc} */ @Override public void printUsage(IgniteLogger logger) { Map<String, String> params = new LinkedHashMap<>(); - params.put("node_id", "ID of the node to delete lost segment links from. If not set, the command will affect " + + params.put(NODE_ID + " node_id", "ID of the node to delete lost segment links from. If not set, the command will affect " + "all server nodes."); usage(logger, "Delete lost segment CDC links:", CDC, params, DELETE_LOST_SEGMENT_LINKS, optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION)); } + /** {@inheritDoc} */ + @Override public Object arg() { + return null; + } + /** {@inheritDoc} */ @Override public String name() { - return "cdc"; + return DELETE_LOST_SEGMENT_LINKS; } /** {@inheritDoc} */ diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java new file mode 100644 index 00000000000..1ba381f347a --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/ResendCommand.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline.cdc; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.commandline.AbstractCommand; +import org.apache.ignite.internal.commandline.Command; +import org.apache.ignite.internal.commandline.CommandArgIterator; +import org.apache.ignite.internal.commandline.CommandLogger; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.visor.cdc.VisorCdcCacheDataResendTask; +import org.apache.ignite.internal.visor.cdc.VisorCdcCacheDataResendTaskArg; + +import static org.apache.ignite.internal.commandline.CommandList.CDC; +import static org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode; + +/** + * The command to forcefully resend all cache data to CDC. + * Iterates over given caches and writes data entries to the WAL to get captured by CDC. + */ +public class ResendCommand extends AbstractCommand<Object> { + /** Command name. */ + public static final String RESEND = "resend"; + + /** */ + public static final String CACHES = "--caches"; + + /** */ + private VisorCdcCacheDataResendTaskArg arg; + + /** {@inheritDoc} */ + @Override public Object execute(GridClientConfiguration clientCfg, IgniteLogger log) throws Exception { + try (GridClient client = Command.startClient(clientCfg)) { + executeTaskByNameOnNode(client, VisorCdcCacheDataResendTask.class.getName(), arg, null, clientCfg); + + String res = "Successfully resent all cache data to CDC."; + + log.info(res); + + return res; + } + catch (Throwable e) { + log.error("Failed to perform operation."); + log.error(CommandLogger.errorMessage(e)); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void parseArguments(CommandArgIterator argIter) { + Set<String> caches = null; + + while (argIter.hasNextSubArg()) { + String opt = argIter.nextArg("Failed to read command argument."); + + if (CACHES.equalsIgnoreCase(opt)) { + if (caches != null) + throw new IllegalArgumentException(CACHES + " arg specified twice."); + + caches = argIter.nextStringSet("comma-separated list of cache names."); + } + } + + if (F.isEmpty(caches)) + throw new IllegalArgumentException("At least one cache name should be specified."); + + arg = new VisorCdcCacheDataResendTaskArg(caches); + } + + /** {@inheritDoc} */ + @Override public void printUsage(IgniteLogger logger) { + Map<String, String> params = new LinkedHashMap<>(); + + params.put(CACHES + " cache1,...,cacheN", "specifies a comma-separated list of cache names."); + + usage(logger, "Forcefully resend all cache data to CDC. Iterates over caches and writes primary copies " + + "of data entries to the WAL to get captured by CDC:", CDC, params, RESEND, CACHES, "cache1,...,cacheN"); + } + + /** {@inheritDoc} */ + @Override public Object arg() { + return arg; + } + + /** {@inheritDoc} */ + @Override public String name() { + return RESEND; + } + + /** {@inheritDoc} */ + @Override public boolean experimental() { + return true; + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java index b06ac82fb4b..3a24c7a59ce 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java @@ -79,8 +79,8 @@ import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.FIND import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES; import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST; import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH; -import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; -import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; +import static org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index ce8571c4ef4..409a75db2a8 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import org.apache.ignite.internal.commandline.indexreader.IgniteIndexReaderTest; import org.apache.ignite.util.CacheMetricsCommandTest; import org.apache.ignite.util.CdcCommandTest; +import org.apache.ignite.util.CdcResendCommandTest; import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest; import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest; import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest; @@ -67,7 +68,8 @@ import org.junit.runners.Suite; IgniteIndexReaderTest.class, - CdcCommandTest.class + CdcCommandTest.class, + CdcResendCommandTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index b04090e485b..4604fccd878 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -23,40 +23,66 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; -import org.apache.ignite.cdc.AbstractCdcTest; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cdc.AbstractCdcTest.UserCdcConsumer; import org.apache.ignite.cdc.CdcConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridJobExecuteRequest; +import org.apache.ignite.internal.GridJobExecuteResponse; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.commandline.CommandList; +import org.apache.ignite.internal.commandline.cdc.CdcSubcommands; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; import org.junit.Test; +import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE; import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT; import static org.apache.ignite.cdc.CdcSelfTest.addData; import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; -import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; -import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; +import static org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID; +import static org.apache.ignite.internal.commandline.cdc.ResendCommand.CACHES; +import static org.apache.ignite.internal.commandline.cdc.ResendCommand.RESEND; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; import static org.apache.ignite.testframework.GridTestUtils.assertContains; +import static org.apache.ignite.testframework.GridTestUtils.stopThreads; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * CDC command tests. */ public class CdcCommandTest extends GridCommandHandlerAbstractTest { + /** */ + private static final String CDC_DISABLED_DATA_REGION = "cdc_disabled_data_region"; + /** */ private IgniteEx srv0; @@ -66,6 +92,9 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { /** */ private DistributedChangeableProperty<Serializable> cdcDisabled; + /** */ + private volatile IgniteThrowableConsumer<WALRecord> onLogLsnr; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -75,11 +104,34 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setWalForceArchiveTimeout(1000) + .setDataRegionConfigurations(new DataRegionConfiguration() + .setName(CDC_DISABLED_DATA_REGION) + .setCdcEnabled(false)) .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setCdcEnabled(true))); cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED); + cfg.setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "Test WAL provider"; + } + + @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) { + if (!IgniteWriteAheadLogManager.class.equals(cls)) + return null; + + return (T)new FileWriteAheadLogManager(((IgniteEx)ctx.grid()).context()) { + @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException { + if (rec instanceof CdcDataRecord && onLogLsnr != null) + onLogLsnr.accept(rec); + + return super.log(rec); + } + }; + } + }); + return cfg; } @@ -92,6 +144,8 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { srv0 = startGrid(0); srv1 = startGrid(1); + awaitPartitionMapExchange(); + cdcDisabled = srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED); } @@ -99,6 +153,8 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { @Override protected void afterTest() throws Exception { super.afterTest(); + stopThreads(log); + stopAllGrids(); cleanPersistenceDir(); @@ -111,7 +167,7 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CommandList.CDC.text(), "unexpected_command"), - "Unexpected command: unexpected_command"); + "Invalid argument: unexpected_command. One of " + F.asList(CdcSubcommands.values()) + " is expected."); assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID), @@ -131,7 +187,7 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { CdcConfiguration cfg = new CdcConfiguration(); - cfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() { + cfg.setConsumer(new UserCdcConsumer() { @Override public void start(MetricRegistry mreg) { appStarted.countDown(); } @@ -148,8 +204,6 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { "Failed to delete lost segment CDC links. Unable to acquire lock to lock CDC folder."); assertFalse(fut.isDone()); - - fut.cancel(); } /** */ @@ -222,4 +276,257 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); } + + /** */ + @Test + public void testParseResend() { + injectTestSystemOut(); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), "unexpected_command"), + "Invalid argument: unexpected_command. One of " + F.asList(CdcSubcommands.values()) + " is expected."); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), RESEND), + "At least one cache name should be specified."); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), RESEND, CACHES), + "At least one cache name should be specified."); + } + + /** */ + @Test + public void testResendCacheData() throws Exception { + UserCdcConsumer cnsmr0 = runCdc(srv0); + UserCdcConsumer cnsmr1 = runCdc(srv1); + + addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + waitForSize(cnsmr0, srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + waitForSize(cnsmr1, srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + + cnsmr0.clear(); + cnsmr1.clear(); + + executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME); + + waitForSize(cnsmr0, srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + waitForSize(cnsmr1, srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + } + + /** */ + @Test + public void testResendCachesNotExist() { + injectTestSystemOut(); + + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), RESEND, CACHES, "unknown_cache"), + "Cache does not exist"); + + String cdcDisabledCacheName = "cdcDisabledCache"; + + srv0.getOrCreateCache(new CacheConfiguration<>() + .setName(cdcDisabledCacheName) + .setDataRegionName(CDC_DISABLED_DATA_REGION)); + + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), RESEND, CACHES, cdcDisabledCacheName), + "CDC is not enabled for given cache"); + } + + /** */ + @Test + public void testResendCancelOnNodeLeft() { + injectTestSystemOut(); + + addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + for (Ignite srv : G.allGrids()) { + TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) -> { + if (msg instanceof GridJobExecuteResponse) { + GridTestUtils.runAsync(srv::close); + + return true; + } + + return false; + }); + } + + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME), + "CDC cache data resend cancelled. Failed to resend cache data on the node"); + } + + /** */ + @Test + public void testResendCancelOnRebalanceInProgress() throws Exception { + injectTestSystemOut(); + + addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + CountDownLatch rebalanceStarted = new CountDownLatch(1); + + for (Ignite srv : G.allGrids()) { + TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) -> { + if (msg instanceof GridDhtPartitionSupplyMessage) { + rebalanceStarted.countDown(); + + return true; + } + + return false; + }); + } + + GridTestUtils.runAsync(() -> startGrid(3)); + + rebalanceStarted.await(); + + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME), + "CDC cache data resend cancelled. Rebalance sheduled"); + } + + /** */ + @Test + public void testResendCancelOnTopologyChangeBeforeStart() throws Exception { + injectTestSystemOut(); + + addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + CountDownLatch blocked = new CountDownLatch(1); + + for (Ignite srv : G.allGrids()) { + TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) -> { + if (msg instanceof GridJobExecuteRequest) { + blocked.countDown(); + + return true; + } + + return false; + }); + } + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> { + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME), + "CDC cache data resend cancelled. Topology changed"); + }); + + blocked.await(); + + startGrid(3); + awaitPartitionMapExchange(); + + for (Ignite srv : G.allGrids()) + TestRecordingCommunicationSpi.spi(srv).stopBlock(); + + fut.get(); + } + + /** */ + @Test + public void testResendCancelOnTopologyChange() throws Exception { + injectTestSystemOut(); + + addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + CountDownLatch preload = new CountDownLatch(1); + CountDownLatch topologyChanged = new CountDownLatch(1); + + AtomicInteger cnt = new AtomicInteger(); + + onLogLsnr = rec -> { + if (cnt.incrementAndGet() < KEYS_CNT / 2) + return; + + preload.countDown(); + + U.await(topologyChanged); + }; + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> { + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME), + "CDC cache data resend cancelled. Topology changed"); + }); + + preload.await(); + + startGrid(3); + + topologyChanged.countDown(); + + fut.get(); + } + + /** */ + @Test + public void testResendOnClientJoin() throws Exception { + UserCdcConsumer cnsmr0 = runCdc(srv0); + UserCdcConsumer cnsmr1 = runCdc(srv1); + + addData(srv0.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + waitForSize(cnsmr0, srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + waitForSize(cnsmr1, srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + + cnsmr0.clear(); + cnsmr1.clear(); + + CountDownLatch blocked = new CountDownLatch(1); + + for (Ignite srv : G.allGrids()) { + TestRecordingCommunicationSpi.spi(srv).blockMessages((node, msg) -> { + if (msg instanceof GridJobExecuteRequest) { + blocked.countDown(); + + return true; + } + + return false; + }); + } + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(() -> { + executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME); + }); + + blocked.await(); + + startClientGrid("client"); + + for (Ignite srv : G.allGrids()) + TestRecordingCommunicationSpi.spi(srv).stopBlock(); + + fut.get(); + + waitForSize(cnsmr0, srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + waitForSize(cnsmr1, srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + } + + /** */ + public static UserCdcConsumer runCdc(Ignite ign) { + UserCdcConsumer cnsmr = new UserCdcConsumer(); + + CdcConfiguration cfg = new CdcConfiguration(); + + cfg.setConsumer(cnsmr); + cfg.setKeepBinary(false); + + CdcMain cdc = new CdcMain(ign.configuration(), null, cfg); + + GridTestUtils.runAsync(cdc); + + return cnsmr; + } + + /** */ + public static void waitForSize(UserCdcConsumer cnsmr, int expSize) throws Exception { + assertTrue(waitForCondition(() -> expSize == cnsmr.data(UPDATE, CU.cacheId(DEFAULT_CACHE_NAME)).size(), + 60_000)); + } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java new file mode 100644 index 00000000000..55416388c89 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import org.apache.ignite.cdc.AbstractCdcTest; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.commandline.CommandList; +import org.junit.Test; + +import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT; +import static org.apache.ignite.cdc.CdcSelfTest.addData; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.internal.commandline.cdc.ResendCommand.CACHES; +import static org.apache.ignite.internal.commandline.cdc.ResendCommand.RESEND; +import static org.apache.ignite.testframework.GridTestUtils.stopThreads; +import static org.apache.ignite.util.CdcCommandTest.runCdc; +import static org.apache.ignite.util.CdcCommandTest.waitForSize; + +/** + * CDC resend command tests. + */ +public class CdcResendCommandTest extends GridCommandHandlerAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalForceArchiveTimeout(1000) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setCdcEnabled(true) + .setPersistenceEnabled(true))); + + cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setBackups(1)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopThreads(log); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testResendCacheDataRestoreFromWal() throws Exception { + IgniteEx ign = startGrid(0); + + ign.cluster().state(ACTIVE); + + enableCheckpoints(ign, false); + + addData(ign.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + AbstractCdcTest.UserCdcConsumer cnsmr = runCdc(ign); + + waitForSize(cnsmr, KEYS_CNT); + + cnsmr.clear(); + + executeCommand(EXIT_CODE_OK, CommandList.CDC.text(), RESEND, CACHES, DEFAULT_CACHE_NAME); + + waitForSize(cnsmr, KEYS_CNT); + + stopAllGrids(); + + ign = startGrid(0); + + assertEquals(KEYS_CNT, ign.cache(DEFAULT_CACHE_NAME).size()); + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java index 339247850f2..a3cf746a826 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java @@ -119,8 +119,8 @@ import static org.apache.ignite.internal.commandline.cache.CacheDestroy.DESTROY_ import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.CLEAR; import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.DESTROY; import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.HELP; -import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; -import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; +import static org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.DeleteLostSegmentLinksCommand.NODE_ID; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.CACHE; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.PARTITIONS; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.STRATEGY; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 9ff1fc7adb3..3b187a0593d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -79,6 +79,7 @@ import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer; import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; @@ -489,7 +490,7 @@ public class CdcMain implements Runnable { .marshallerMappingFileStoreDir(marshaller) .keepBinary(cdcCfg.isKeepBinary()) .filesOrDirs(segment.toFile()) - .addFilter((type, ptr) -> type == DATA_RECORD_V2); + .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD); if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0) builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java new file mode 100644 index 00000000000..25e0defe66b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDataRecord.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * The record to forcefully resend cache data to the CDC application. + */ +public class CdcDataRecord extends DataRecord { + /** */ + public CdcDataRecord(DataEntry writeEntry) { + super(writeEntry); + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.CDC_DATA_RECORD; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CdcDataRecord.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index baa51c35286..3457251e0f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -288,7 +288,10 @@ public abstract class WALRecord { INCREMENTAL_SNAPSHOT_START_RECORD(76, LOGICAL), /** Incremental snapshot finish record. */ - INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL); + INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL), + + /** CDC data record. */ + CDC_DATA_RECORD(78, CUSTOM); /** Index for serialization. Should be consistent throughout all versions. */ private final int idx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index edbc38d5dc3..f68b366fbb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -152,6 +152,7 @@ import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; @@ -990,7 +991,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Only data records handled by CDC. // No need to forcefully rollover for other record types. - if (walForceArchiveTimeout > 0 && rec.type() == DATA_RECORD_V2) + if (walForceArchiveTimeout > 0 && (rec.type() == DATA_RECORD_V2 || rec.type() == CDC_DATA_RECORD)) lastDataRecordLoggedMs.set(millis); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 051be3616a4..df6ca772291 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -360,6 +360,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { if (processor != null && (rec.type() == RecordType.DATA_RECORD || rec.type() == RecordType.DATA_RECORD_V2 + || rec.type() == RecordType.CDC_DATA_RECORD || rec.type() == RecordType.MVCC_DATA_RECORD)) { try { return postProcessDataRecord((DataRecord)rec, kernalCtx, processor); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index f902295dcc9..3907bf2c98f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -126,6 +126,7 @@ import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.ENCRYPTED_DATA_RECORD_V2; @@ -2171,7 +2172,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer { int partId = in.readInt(); long partCntr = in.readLong(); long expireTime = in.readLong(); - byte flags = type == DATA_RECORD_V2 ? in.readByte() : (byte)0; + byte flags = type == DATA_RECORD_V2 || type == CDC_DATA_RECORD ? in.readByte() : (byte)0; GridCacheContext cacheCtx = cctx.cacheContext(cacheId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java index 62fc50ef73f..77cc5eb0cfe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java @@ -91,6 +91,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer { return 18 + cacheStatesSize + (walPtr == null ? 0 : 16); case MVCC_DATA_RECORD: + case CDC_DATA_RECORD: return 4/*entry count*/ + 8/*timestamp*/ + dataSize((DataRecord)rec); case DATA_RECORD_V2: @@ -162,6 +163,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer { case DATA_RECORD: case DATA_RECORD_V2: + case CDC_DATA_RECORD: int entryCnt = in.readInt(); long timeStamp = in.readLong(); @@ -272,6 +274,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer { case MVCC_DATA_RECORD: case DATA_RECORD_V2: + case CDC_DATA_RECORD: DataRecord dataRec = (DataRecord)rec; int entryCnt = dataRec.entryCount(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java new file mode 100644 index 00000000000..20f5fe1c82f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTask.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.cdc; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.lang.GridIterator; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; + +/** + * Task to forcefully resend all cache data to CDC. + * Iterates over caches and writes primary copies of data entries to the WAL to get captured by CDC. + */ +@GridInternal +public class VisorCdcCacheDataResendTask extends VisorMultiNodeTask<VisorCdcCacheDataResendTaskArg, Void, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** Topology version when task was started. */ + private AffinityTopologyVersion topVer; + + /** {@inheritDoc} */ + @Override protected VisorJob<VisorCdcCacheDataResendTaskArg, Void> job(VisorCdcCacheDataResendTaskArg arg) { + return new VisorCdcCacheDataResendJob(arg, topVer); + } + + /** {@inheritDoc} */ + @Override protected Collection<UUID> jobNodes(VisorTaskArgument<VisorCdcCacheDataResendTaskArg> arg) { + // Check there is no rebalance. + GridDhtPartitionsExchangeFuture fut = ignite.context().cache().context().exchange().lastFinishedFuture(); + + if (!fut.rebalanced()) { + throw new IgniteException("CDC cache data resend cancelled. Rebalance sheduled " + + "[topVer=" + fut.topologyVersion() + ']'); + } + + // Cancel resend if affinity will change. + topVer = ignite.context().cache().context().exchange().lastAffinityChangedTopologyVersion(fut.topologyVersion()); + + return F.nodeIds(ignite.cluster().forServers().nodes()); + } + + /** {@inheritDoc} */ + @Override protected @Nullable Void reduce0(List<ComputeJobResult> results) throws IgniteException { + for (ComputeJobResult res : results) { + if (res.getException() != null) { + throw new IgniteException("CDC cache data resend cancelled. Failed to resend cache data " + + "on the node [nodeId=" + res.getNode().id() + ']', res.getException()); + } + } + + return null; + } + + /** */ + private static class VisorCdcCacheDataResendJob extends VisorJob<VisorCdcCacheDataResendTaskArg, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected logger. */ + @LoggerResource + protected IgniteLogger log; + + /** */ + private IgniteWriteAheadLogManager wal; + + /** */ + private GridCachePartitionExchangeManager<Object, Object> exchange; + + /** Topology version when task was started. */ + private final AffinityTopologyVersion topVer; + + /** */ + private GridDhtPartitionsExchangeFuture lastFut; + + /** + * @param arg Job argument. + * @param topVer Topology version when task was started. + */ + protected VisorCdcCacheDataResendJob(VisorCdcCacheDataResendTaskArg arg, AffinityTopologyVersion topVer) { + super(arg, false); + + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override protected Void run(VisorCdcCacheDataResendTaskArg arg) throws IgniteException { + if (F.isEmpty(arg.caches())) + throw new IllegalArgumentException("Caches are not specified."); + + List<IgniteInternalCache<?, ?>> caches = new ArrayList<>(); + + for (String name : arg.caches()) { + IgniteInternalCache<?, ?> cache = ignite.context().cache().cache(name); + + if (cache == null) + throw new IgniteException("Cache does not exist [cacheName=" + name + ']'); + + if (!cache.context().dataRegion().config().isCdcEnabled()) { + throw new IgniteException("CDC is not enabled for given cache [cacheName=" + name + + ", dataRegionName=" + cache.context().dataRegion().config().getName() + ']'); + } + + if (cache.context().mvccEnabled()) + throw new UnsupportedOperationException("The TRANSACTIONAL_SNAPSHOT mode is not supported."); + + caches.add(cache); + } + + if (log.isInfoEnabled()) + log.info("CDC cache data resend started [caches=" + String.join(", ", arg.caches()) + ']'); + + wal = ignite.context().cache().context().wal(true); + exchange = ignite.context().cache().context().exchange(); + + try { + Iterator<IgniteInternalCache<?, ?>> iter = caches.iterator(); + + while (iter.hasNext() && !isCancelled()) + resendCacheData(iter.next()); + + wal.flush(null, true); + + if (log.isInfoEnabled()) { + log.info("CDC cache data resend " + (isCancelled() ? "cancelled" : "finished") + + " [caches=" + String.join(", ", arg.caches()) + ']'); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + return null; + } + + /** @param cache Cache. */ + private void resendCacheData(IgniteInternalCache<?, ?> cache) throws IgniteCheckedException { + if (log.isInfoEnabled()) + log.info("CDC cache data resend started [cacheName=" + cache.name() + ']'); + + GridCacheContext<?, ?> cctx = cache.context(); + + GridIterator<CacheDataRow> localRows = cctx.offheap() + .cacheIterator(cctx.cacheId(), true, false, AffinityTopologyVersion.NONE, null, null); + + long cnt = 0; + Set<Integer> parts = new TreeSet<>(); + + for (CacheDataRow row : localRows) { + if (isCancelled()) + break; + + ensureTopologyNotChanged(); + + KeyCacheObject key = row.key(); + + if (log.isTraceEnabled()) + log.trace("Resend key: " + key); + + CdcDataRecord rec = new CdcDataRecord(new DataEntry( + cctx.cacheId(), + key, + row.value(), + GridCacheOperation.CREATE, + null, + row.version(), + row.expireTime(), + key.partition(), + -1, + DataEntry.flags(true)) + ); + + wal.log(rec); + + parts.add(key.partition()); + + if ((++cnt % 1_000 == 0) && log.isDebugEnabled()) + log.debug("Resend entries count: " + cnt); + } + + if (log.isInfoEnabled()) { + if (isCancelled()) + log.info("CDC cache data resend cancelled."); + else { + log.info("CDC cache data resend finished [cacheName=" + cache.name() + + ", entriesCnt=" + cnt + ", parts=" + parts + ']'); + } + } + } + + /** */ + private void ensureTopologyNotChanged() { + GridDhtPartitionsExchangeFuture fut = exchange.lastFinishedFuture(); + + if (lastFut != fut) { + AffinityTopologyVersion lastChanged = exchange.lastAffinityChangedTopologyVersion(fut.topologyVersion()); + + if (!topVer.equals(lastChanged)) { + throw new IgniteException("CDC cache data resend cancelled. Topology changed during resend " + + "[startTopVer=" + topVer + ", currentTopVer=" + fut.topologyVersion() + ']'); + } + + lastFut = fut; + } + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java new file mode 100644 index 00000000000..9f64f42955d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcCacheDataResendTaskArg.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.cdc; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Set; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class VisorCdcCacheDataResendTaskArg extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache names. */ + private Set<String> caches; + + /** */ + public VisorCdcCacheDataResendTaskArg() { + // No-op. + } + + /** */ + public VisorCdcCacheDataResendTaskArg(Set<String> caches) { + this.caches = caches; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeCollection(out, caches); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + caches = U.readSet(in); + } + + /** @return Cache names. */ + public Set<String> caches() { + return caches; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index 6222fb2a68f..3c8c32c7a07 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -344,7 +344,12 @@ public abstract class AbstractCdcTest extends GridCommonAbstractTest { /** @return Read keys. */ public List<T> data(ChangeEventType op, int cacheId) { - return data.get(F.t(op, cacheId)); + return data.computeIfAbsent(F.t(op, cacheId), k -> new ArrayList<>()); + } + + /** */ + public void clear() { + data.clear(); } /** */ @@ -405,7 +410,8 @@ public abstract class AbstractCdcTest extends GridCommonAbstractTest { String typeName = m.typeName(); assertFalse(typeName.isEmpty()); - assertEquals(mapper.typeId(typeName), m.typeId()); + // Can also be registered by OptimizedMarshaller. + assertTrue(m.typeId() == mapper.typeId(typeName) || m.typeId() == typeName.hashCode()); }); } } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java index 63ee1a405ad..60ffcee8686 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java @@ -112,6 +112,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_RECYCLE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REMOVE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REPLACE; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CONSISTENT_CUT; @@ -191,6 +192,7 @@ public class RecordUtils { put(PAGE_RECORD, RecordUtils::buildPageSnapshot); put(DATA_RECORD, RecordUtils::buildDataRecord); put(DATA_RECORD_V2, RecordUtils::buildDataRecord); + put(CDC_DATA_RECORD, RecordUtils::buildDataRecord); put(CHECKPOINT_RECORD, RecordUtils::buildCheckpointRecord); put(HEADER_RECORD, buildUpsupportedWalRecord(HEADER_RECORD)); put(INIT_NEW_PAGE_RECORD, RecordUtils::buildInitNewPageRecord); diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index ef8ff50fb03..6987c0be58c 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -351,7 +351,14 @@ If the file name isn't specified the output file name is: '<typeId>.bin' control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: - node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. + --node-id node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. + + [EXPERIMENTAL] + Forcefully resend all cache data to CDC. Iterates over caches and writes primary copies of data entries to the WAL to get captured by CDC: + control.(sh|bat) --cdc resend --caches cache1,...,cacheN + + Parameters: + --caches cache1,...,cacheN - specifies a comma-separated list of cache names. By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index ef8ff50fb03..6987c0be58c 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -351,7 +351,14 @@ If the file name isn't specified the output file name is: '<typeId>.bin' control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: - node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. + --node-id node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. + + [EXPERIMENTAL] + Forcefully resend all cache data to CDC. Iterates over caches and writes primary copies of data entries to the WAL to get captured by CDC: + control.(sh|bat) --cdc resend --caches cache1,...,cacheN + + Parameters: + --caches cache1,...,cacheN - specifies a comma-separated list of cache names. By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it.