This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 0de72a9 IGNITE-13697 Schedule and cancel control utility commands for defragmentation feature - Fixes #8449. 0de72a9 is described below commit 0de72a908eac64d274aafdb59dd2f5598d0c470e Author: ibessonov <bessonov...@gmail.com> AuthorDate: Mon Dec 7 10:48:07 2020 +0300 IGNITE-13697 Schedule and cancel control utility commands for defragmentation feature - Fixes #8449. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../ignite/internal/commandline/CommandList.java | 5 +- .../commandline/DefragmentationCommand.java | 241 +++++++++++ .../defragmentation/DefragmentationArguments.java | 63 +++ .../DefragmentationSubcommands.java | 68 ++++ .../commandline/CommandHandlerParsingTest.java | 3 +- .../testsuites/IgniteControlUtilityTestSuite.java | 3 + .../GridCommandHandlerDefragmentationTest.java | 331 +++++++++++++++ .../internal/maintenance/MaintenanceProcessor.java | 27 +- .../GridCacheDatabaseSharedManager.java | 28 +- .../CachePartitionDefragmentationManager.java | 444 +++++++++++++-------- .../defragmentation/DefragmentationFileUtils.java | 29 +- .../maintenance/DefragmentationParameters.java | 34 +- .../DefragmentationWorkflowCallback.java | 13 +- .../maintenance/ExecuteDefragmentationAction.java | 33 +- ...nAction.java => StopDefragmentationAction.java} | 39 +- .../processors/query/GridQueryIndexing.java | 4 +- .../apache/ignite/internal/util/IgniteUtils.java | 16 +- .../VisorDefragmentationOperation.java | 28 ++ .../defragmentation/VisorDefragmentationTask.java | 211 ++++++++++ .../VisorDefragmentationTaskArg.java | 91 +++++ .../VisorDefragmentationTaskResult.java | 72 ++++ .../ignite/maintenance/MaintenanceRegistry.java | 3 +- .../main/resources/META-INF/classnames.properties | 5 + .../cache/WalModeChangeAdvancedSelfTest.java | 13 +- .../persistence/IgnitePdsDefragmentationTest.java | 108 +++-- .../processors/query/DummyQueryIndexing.java | 3 +- ...ridCommandHandlerClusterByClassTest_help.output | 9 + ...andHandlerClusterByClassWithSSLTest_help.output | 9 + .../processors/query/h2/IgniteH2Indexing.java | 5 +- .../defragmentation/IndexingDefragmentation.java | 6 + .../IgnitePdsIndexingDefragmentationTest.java | 2 + 31 files changed, 1659 insertions(+), 287 deletions(-) diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java index e16acaa..e9f8cd3 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java @@ -92,7 +92,10 @@ public enum CommandList { METRIC("--metric", new MetricCommand()), /** */ - PERSISTENCE("--persistence", new PersistenceCommand()); + PERSISTENCE("--persistence", new PersistenceCommand()), + + /** Command to manage PDS defragmentation. */ + DEFRAGMENTATION("--defragmentation", new DefragmentationCommand()); /** Private values copy so there's no need in cloning it every time. */ private static final CommandList[] VALUES = CommandList.values(); diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java new file mode 100644 index 0000000..c2fa8e9 --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientNode; +import org.apache.ignite.internal.commandline.defragmentation.DefragmentationArguments; +import org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation; +import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask; +import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskArg; +import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskResult; + +import static org.apache.ignite.internal.commandline.Command.usage; +import static org.apache.ignite.internal.commandline.CommandList.DEFRAGMENTATION; +import static org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands.CANCEL; +import static org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands.SCHEDULE; + +/** */ +public class DefragmentationCommand implements Command<DefragmentationArguments> { + /** */ + private static final String NODES_ARG = "--nodes"; + + /** */ + private static final String CACHES_ARG = "--caches"; + + /** */ + private DefragmentationArguments args; + + /** {@inheritDoc} */ + @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception { + try (GridClient client = Command.startClient(clientCfg)) { + Optional<GridClientNode> firstNodeOpt = client.compute().nodes().stream().filter(GridClientNode::connectable).findFirst(); + + if (firstNodeOpt.isPresent()) { + VisorDefragmentationTaskResult res; + + if (args.nodeIds() == null) { + res = TaskExecutor.executeTaskByNameOnNode( + client, + VisorDefragmentationTask.class.getName(), + convertArguments(), + null, // Use node from clientCfg. + clientCfg + ); + } + else { + VisorTaskArgument<?> visorArg = new VisorTaskArgument<>( + client.compute().nodes().stream().filter( + node -> args.nodeIds().contains(node.consistentId().toString()) + ).map(GridClientNode::nodeId).collect(Collectors.toList()), + convertArguments(), + false + ); + + res = client.compute() + .projection(firstNodeOpt.get()) + .execute( + VisorDefragmentationTask.class.getName(), + visorArg + ); + } + + printResult(res, log); + } + else + log.warning("No nodes found in topology, command won't be executed."); + } + catch (Throwable t) { + log.severe("Failed to execute defragmentation command='" + args.subcommand().text() + "'"); + log.severe(CommandLogger.errorMessage(t)); + + throw t; + } + + return null; + } + + /** */ + private void printResult(VisorDefragmentationTaskResult res, Logger log) { + assert res != null; + + log.info(res.getMessage()); + } + + /** {@inheritDoc} */ + @Override public void parseArguments(CommandArgIterator argIter) { + DefragmentationSubcommands cmd = DefragmentationSubcommands.of(argIter.nextArg("Expected defragmentation subcommand.")); + + if (cmd == null || cmd == DefragmentationSubcommands.STATUS) // Status subcommand is not yet completed. + throw new IllegalArgumentException("Expected correct defragmentation subcommand."); + + args = new DefragmentationArguments(cmd); + + switch (cmd) { + case SCHEDULE: + List<String> consistentIds = null; + List<String> cacheNames = null; + + String subarg; + + do { + subarg = argIter.nextArg("Expected one of subcommand arguments.").toLowerCase(Locale.ENGLISH); + + switch (subarg) { + case NODES_ARG: { + Set<String> ids = argIter.nextStringSet(NODES_ARG); + + if (ids.isEmpty()) + throw new IllegalArgumentException("Consistent ids list is empty."); + + consistentIds = new ArrayList<>(ids); + + break; + } + + case CACHES_ARG: { + Set<String> ids = argIter.nextStringSet(CACHES_ARG); + + if (ids.isEmpty()) + throw new IllegalArgumentException("Caches list is empty."); + + cacheNames = new ArrayList<>(ids); + + break; + } + + default: + subarg = null; + } + } + while (subarg != null); + + if (consistentIds == null) + throw new IllegalArgumentException("--nodes argument is missing."); + + args.setNodeIds(consistentIds); + args.setCacheNames(cacheNames); + + break; + + case STATUS: + case CANCEL: + // No-op. + } + } + + /** {@inheritDoc} */ + @Override public DefragmentationArguments arg() { + return args; + } + + /** {@inheritDoc} */ + @Override public void printUsage(Logger log) { + String consistentIds = "consistentId0,consistentId1"; + + String cacheNames = "cache1,cache2,cache3"; + + usage( + log, + "Schedule PDS defragmentation on given nodes for all caches:", + DEFRAGMENTATION, + SCHEDULE.text(), + NODES_ARG, + consistentIds + ); + + usage( + log, + "Schedule PDS defragmentation on given nodes but only for given caches:", + DEFRAGMENTATION, + SCHEDULE.text(), + NODES_ARG, + consistentIds, + CACHES_ARG, + cacheNames + ); + + usage( + log, + "Cancel scheduled or active PDS defragmentation on underlying node:", + DEFRAGMENTATION, + CANCEL.text() + ); + } + + /** {@inheritDoc} */ + @Override public String name() { + return DEFRAGMENTATION.toCommandName(); + } + + /** */ + private VisorDefragmentationTaskArg convertArguments() { + return new VisorDefragmentationTaskArg( + convertSubcommand(args.subcommand()), + args.nodeIds(), + args.cacheNames() + ); + } + + /** */ + private static VisorDefragmentationOperation convertSubcommand(DefragmentationSubcommands subcmd) { + switch (subcmd) { + case SCHEDULE: + return VisorDefragmentationOperation.SCHEDULE; + + case STATUS: + return VisorDefragmentationOperation.STATUS; + + case CANCEL: + return VisorDefragmentationOperation.CANCEL; + + default: + throw new IllegalArgumentException(subcmd.name()); + } + } +} diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java new file mode 100644 index 0000000..e82e578 --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline.defragmentation; + +import java.util.List; + +/** */ +@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") +public class DefragmentationArguments { + /** */ + private final DefragmentationSubcommands subcmd; + + /** */ + private List<String> nodeIds; + + /** */ + private List<String> cacheNames; + + /** */ + public DefragmentationArguments(DefragmentationSubcommands subcmd) { + this.subcmd = subcmd; + } + + /** */ + public DefragmentationSubcommands subcommand() { + return subcmd; + } + + /** */ + public void setNodeIds(List<String> nodeIds) { + this.nodeIds = nodeIds; + } + + /** */ + public List<String> nodeIds() { + return nodeIds; + } + + /** */ + public void setCacheNames(List<String> cacheNames) { + this.cacheNames = cacheNames; + } + + /** */ + public List<String> cacheNames() { + return cacheNames; + } +} diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java new file mode 100644 index 0000000..86ec775 --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline.defragmentation; + +import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation; +import org.jetbrains.annotations.Nullable; + +/** */ +public enum DefragmentationSubcommands { + /** */ + SCHEDULE("schedule", VisorDefragmentationOperation.SCHEDULE), + + /** */ + STATUS("status", VisorDefragmentationOperation.STATUS), + + /** */ + CANCEL("cancel", VisorDefragmentationOperation.CANCEL); + + /** */ + private final String name; + + /** */ + private final VisorDefragmentationOperation visorOperation; + + /** */ + DefragmentationSubcommands(String name, VisorDefragmentationOperation visorOperation) { + this.name = name; + this.visorOperation = visorOperation; + } + + /** + * @param strRep String representation of subcommand. + * @return Subcommand for its string representation. + */ + public static @Nullable DefragmentationSubcommands of(String strRep) { + for (DefragmentationSubcommands cmd : values()) { + if (cmd.text().equalsIgnoreCase(strRep)) + return cmd; + } + + return null; + } + + /** */ + public String text() { + return name; + } + + /** */ + public VisorDefragmentationOperation operation() { + return visorOperation; + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java index b3e51cd..b1ab5f8 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java @@ -1033,6 +1033,7 @@ public class CommandHandlerParsingTest { cmd == CommandList.WARM_UP || cmd == CommandList.PROPERTY || cmd == CommandList.SYSTEM_VIEW || - cmd == CommandList.METRIC; + cmd == CommandList.METRIC || + cmd == CommandList.DEFRAGMENTATION; } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java index dbccfa5..cac12d4 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.util.GridCommandHandlerBrokenIndexTest; import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest; import org.apache.ignite.util.GridCommandHandlerClusterByClassTest; import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest; +import org.apache.ignite.util.GridCommandHandlerDefragmentationTest; import org.apache.ignite.util.GridCommandHandlerIndexForceRebuildTest; import org.apache.ignite.util.GridCommandHandlerIndexListTest; import org.apache.ignite.util.GridCommandHandlerIndexRebuildStatusTest; @@ -82,6 +83,8 @@ import org.junit.runners.Suite; GridCommandHandlerPropertiesTest.class, + GridCommandHandlerDefragmentationTest.class, + SystemViewCommandTest.class, MetricCommandTest.class }) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java new file mode 100644 index 0000000..adce9e2 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.function.UnaryOperator; +import java.util.logging.Formatter; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import java.util.logging.StreamHandler; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.commandline.CommandHandler; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.maintenance.MaintenanceTask; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; + +/** */ +public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClusterPerMethodAbstractTest { + /** */ + private static CountDownLatch blockCdl; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getDataStorageConfiguration().setWalSegmentSize(512 * 1024).setWalSegments(3); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDefragmentationSchedule() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().state(ACTIVE); + + assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute("--defragmentation", "schedule")); + + String grid0ConsId = grid(0).configuration().getConsistentId().toString(); + String grid1ConsId = grid(1).configuration().getConsistentId().toString(); + + ListeningTestLogger testLog = new ListeningTestLogger(); + + CommandHandler cmd = createCommandHandler(testLog); + + LogListener logLsnr = LogListener.matches("Scheduling completed successfully.").build(); + + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--defragmentation", + "schedule", + "--nodes", + grid0ConsId + )); + + assertTrue(logLsnr.check()); + + MaintenanceTask mntcTask = DefragmentationParameters.toStore(Collections.emptyList()); + + assertNotNull(grid(0).context().maintenanceRegistry().registerMaintenanceTask(mntcTask)); + assertNull(grid(1).context().maintenanceRegistry().registerMaintenanceTask(mntcTask)); + + stopGrid(0); + startGrid(0); + + logLsnr = LogListener.matches("Node is already in Maintenance Mode").build(); + + testLog.clearListeners(); + + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--defragmentation", + "schedule", + "--nodes", + grid0ConsId + )); + + assertTrue(logLsnr.check()); + + stopGrid(0); + startGrid(0); + + stopGrid(1); + startGrid(1); + + stopAllGrids(); + + startGrids(2); + + logLsnr = LogListener.matches("Scheduling completed successfully.").times(2).build(); + + testLog.clearListeners(); + + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--defragmentation", + "schedule", + "--nodes", + String.join(",", grid0ConsId, grid1ConsId) + )); + + assertTrue(logLsnr.check()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDefragmentationCancel() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().state(ACTIVE); + + String grid0ConsId = grid(0).configuration().getConsistentId().toString(); + + ListeningTestLogger testLog = new ListeningTestLogger(); + + CommandHandler cmd = createCommandHandler(testLog); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--defragmentation", + "schedule", + "--nodes", + grid0ConsId + )); + + LogListener logLsnr = LogListener.matches("Scheduled defragmentation task cancelled successfully.").atLeast(1).build(); + + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--port", + grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(), + "--defragmentation", + "cancel" + )); + + assertTrue(logLsnr.check()); + + testLog.clearListeners(); + + logLsnr = LogListener.matches("Scheduled defragmentation task is not found.").build(); + + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--port", + grid(1).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(), + "--defragmentation", + "cancel" + )); + + assertTrue(logLsnr.check()); + } + + /** */ + @Test + public void testDefragmentationCancelInProgress() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().state(ClusterState.ACTIVE); + + IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 1024; i++) + cache.put(i, i); + + forceCheckpoint(ig); + + String grid0ConsId = ig.configuration().getConsistentId().toString(); + + ListeningTestLogger testLog = new ListeningTestLogger(); + + CommandHandler cmd = createCommandHandler(testLog); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--defragmentation", + "schedule", + "--nodes", + grid0ConsId + )); + + String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(); + + stopGrid(0); + + blockCdl = new CountDownLatch(128); + + UnaryOperator<IgniteConfiguration> cfgOp = cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + + FileIOFactory delegate = dsCfg.getFileIOFactory(); + + dsCfg.setFileIOFactory((file, modes) -> { + if (file.getName().contains("dfrg")) { + if (blockCdl.getCount() == 0) { + try { + // Slow down defragmentation process. + // This'll be enough for the test since we have, like, 900 partitions left. + Thread.sleep(100); + } + catch (InterruptedException ignore) { + // No-op. + } + } + else + blockCdl.countDown(); + } + + return delegate.create(file, modes); + }); + + return cfg; + }; + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> { + try { + startGrid(0, cfgOp); + } + catch (Exception e) { + // No-op. + throw new RuntimeException(e); + } + }); + + blockCdl.await(); + + LogListener logLsnr = LogListener.matches("Defragmentation cancelled successfully.").build(); + + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--port", + port, + "--defragmentation", + "cancel" + )); + + assertTrue(logLsnr.check()); + + fut.get(); + + testLog.clearListeners(); + + logLsnr = LogListener.matches("Defragmentation is already completed or has been cancelled previously.").build(); + + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--port", + port, + "--defragmentation", + "cancel" + )); + + assertTrue(logLsnr.check()); + } + + /** */ + private CommandHandler createCommandHandler(ListeningTestLogger testLog) { + Logger log = CommandHandler.initLogger(null); + + log.addHandler(new StreamHandler(System.out, new Formatter() { + /** {@inheritDoc} */ + @Override public String format(LogRecord record) { + String msg = record.getMessage(); + + testLog.info(msg); + + return msg + "\n"; + } + })); + + return new CommandHandler(log); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java index 063bd47..5328d13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java @@ -61,6 +61,9 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte /** */ private final boolean inMemoryMode; + /** */ + private volatile boolean maintenanceMode; + /** * @param ctx Kernal context. */ @@ -99,7 +102,7 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte log.info( "Maintenance Task with name " + task.name() + " is already registered" + - oldTask.parameters() != null ? " with parameters " + oldTask.parameters() : "" + "." + + (oldTask.parameters() != null ? " with parameters " + oldTask.parameters() : ".") + " It will be replaced with new task" + task.parameters() != null ? " with parameters " + task.parameters() : "" + "." ); @@ -134,6 +137,8 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte fileStorage.init(); activeTasks.putAll(fileStorage.getAllTasks()); + + maintenanceMode = !activeTasks.isEmpty(); } catch (Throwable t) { log.warning("Caught exception when starting MaintenanceProcessor," + @@ -213,18 +218,20 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte /** {@inheritDoc} */ @Override public boolean isMaintenanceMode() { - return !activeTasks.isEmpty(); + return maintenanceMode; } /** {@inheritDoc} */ - @Override public void unregisterMaintenanceTask(String maintenanceTaskName) { + @Override public boolean unregisterMaintenanceTask(String maintenanceTaskName) { if (inMemoryMode) - return; + return false; + + boolean deleted; if (isMaintenanceMode()) - activeTasks.remove(maintenanceTaskName); + deleted = activeTasks.remove(maintenanceTaskName) != null; else - requestedTasks.remove(maintenanceTaskName); + deleted = requestedTasks.remove(maintenanceTaskName) != null; try { fileStorage.deleteMaintenanceTask(maintenanceTaskName); @@ -237,6 +244,8 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte fileStorage.clear(); } + + return deleted; } /** {@inheritDoc} */ @@ -247,14 +256,14 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte List<MaintenanceAction<?>> actions = cb.allActions(); if (actions == null || actions.isEmpty()) - throw new IgniteException("Maintenance workflow callback should provide at least one mainetance action"); + throw new IgniteException("Maintenance workflow callback should provide at least one maintenance action"); int size = actions.size(); - long distinctSize = actions.stream().map(a -> a.name()).distinct().count(); + long distinctSize = actions.stream().map(MaintenanceAction::name).distinct().count(); if (distinctSize < size) throw new IgniteException("All actions of a single workflow should have unique names: " + - actions.stream().map(a -> a.name()).collect(Collectors.joining(", "))); + actions.stream().map(MaintenanceAction::name).collect(Collectors.joining(", "))); Optional<String> wrongActionName = actions .stream() diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 2c366eb..1484e30 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -748,7 +748,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** */ - private void prepareCacheDefragmentation(List<Integer> cacheGroupIds) throws IgniteCheckedException { + private void prepareCacheDefragmentation(List<String> cacheNames) throws IgniteCheckedException { GridKernalContext kernalCtx = cctx.kernalContext(); DataStorageConfiguration dsCfg = kernalCtx.config().getDataStorageConfiguration(); @@ -778,7 +778,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan lightCheckpointMgr.start(); defrgMgr = new CachePartitionDefragmentationManager( - cacheGroupIds, + cacheNames, cctx, this, (FilePageStoreManager)cctx.pageStore(), @@ -788,6 +788,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan ); } + /** */ + public CachePartitionDefragmentationManager defragmentationManager() { + return defrgMgr; + } + /** {@inheritDoc} */ @Override public DataRegion addDataRegion(DataStorageConfiguration dataStorageCfg, DataRegionConfiguration dataRegionCfg, boolean trackable, PageReadWriteManager pmPageMgr) throws IgniteCheckedException { @@ -826,9 +831,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan .registerWorkflowCallbackIfTaskExists( DEFRAGMENTATION_MNTC_TASK_NAME, task -> { - prepareCacheDefragmentation(fromStore(task).cacheGroupIds()); + prepareCacheDefragmentation(fromStore(task).cacheNames()); - return new DefragmentationWorkflowCallback(cctx.kernalContext()::log, defrgMgr); + return new DefragmentationWorkflowCallback( + cctx.kernalContext()::log, + defrgMgr, + cctx.kernalContext().failure() + ); } ); } @@ -1097,6 +1106,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { + if (defrgMgr != null) + defrgMgr.cancel(); + checkpointManager.stop(cancel); super.onKernalStop0(cancel); @@ -1395,6 +1407,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** {@inheritDoc} */ @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture exchangeFut) { + if (defrgMgr != null) + return; + rebuildIndexes(cctx.cacheContexts(), (cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion())); } @@ -1887,6 +1902,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cctx.wal().resumeLogging(walTail); } + /** */ + public void preserveWalTailPointer() throws IgniteCheckedException { + walTail = cctx.wal().flush(null, true); + } + /** * @param grpId Cache group id. * @param partId Partition ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java index 006fa8e..41999fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; @@ -102,7 +103,7 @@ public class CachePartitionDefragmentationManager { public static final String DEFRAGMENTATION_MNTC_TASK_NAME = "defragmentationMaintenanceTask"; /** */ - private final Set<Integer> cacheGroupsForDefragmentation; + private final Set<String> cachesForDefragmentation; /** Cache shared context. */ private final GridCacheSharedContext<?, ?> sharedCtx; @@ -137,8 +138,14 @@ public class CachePartitionDefragmentationManager { /** */ private final DataRegion mappingDataRegion; + /** */ + private final AtomicBoolean cancel = new AtomicBoolean(); + + /** */ + private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>(); + /** - * @param cacheGrpIds + * @param cacheNames Names of caches to be defragmented. Empty means "all". * @param sharedCtx Cache shared context. * @param dbMgr Database manager. * @param filePageStoreMgr File page store manager. @@ -147,7 +154,7 @@ public class CachePartitionDefragmentationManager { * @param pageSize Page size. */ public CachePartitionDefragmentationManager( - List<Integer> cacheGrpIds, + List<String> cacheNames, GridCacheSharedContext<?, ?> sharedCtx, GridCacheDatabaseSharedManager dbMgr, FilePageStoreManager filePageStoreMgr, @@ -155,7 +162,7 @@ public class CachePartitionDefragmentationManager { LightweightCheckpointManager defragmentationCheckpoint, int pageSize ) throws IgniteCheckedException { - cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds); + cachesForDefragmentation = new HashSet<>(cacheNames); this.dbMgr = dbMgr; this.filePageStoreMgr = filePageStoreMgr; @@ -172,19 +179,24 @@ public class CachePartitionDefragmentationManager { } /** */ - public void executeDefragmentation() throws IgniteCheckedException { - log.info("Defragmentation started."); + public void beforeDefragmentation() throws IgniteCheckedException { + // Checkpointer must be enabled so all pages on disk are in their latest valid state. + dbMgr.resumeWalLogging(); - try { - // Checkpointer must be enabled so all pages on disk are in their latest valid state. - dbMgr.resumeWalLogging(); + dbMgr.onStateRestored(null); - dbMgr.onStateRestored(null); + nodeCheckpoint.forceCheckpoint("beforeDefragmentation", null).futureFor(FINISHED).get(); - nodeCheckpoint.forceCheckpoint("beforeDefragmentation", null).futureFor(FINISHED).get(); + dbMgr.preserveWalTailPointer(); - sharedCtx.wal().onDeActivate(sharedCtx.kernalContext()); + sharedCtx.wal().onDeActivate(sharedCtx.kernalContext()); + } + + /** */ + public void executeDefragmentation() throws IgniteCheckedException { + log.info("Defragmentation started."); + try { // Now the actual process starts. TreeIterator treeIter = new TreeIterator(pageSize); @@ -197,241 +209,272 @@ public class CachePartitionDefragmentationManager { int grpId = oldGrpCtx.groupId(); - if (!cacheGroupsForDefragmentation.isEmpty() && !cacheGroupsForDefragmentation.contains(grpId)) - continue; + if (!cachesForDefragmentation.isEmpty()) { + if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name()))) + continue; + } File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName()); - if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) - continue; + try { + if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) + continue; - GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap(); + GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap(); - List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false) - .filter(store -> { - try { - return filePageStoreMgr.exists(grpId, store.partId()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false) + .filter(store -> { + try { + return filePageStoreMgr.exists(grpId, store.partId()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }) + .collect(Collectors.toList()); + + if (workDir != null && !oldCacheDataStores.isEmpty()) { + // We can't start defragmentation of new group on the region that has wrong eviction mode. + // So waiting of the previous cache group defragmentation is inevitable. + DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode(); + + if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) { + prevPageEvictionMode = curPageEvictionMode; + + partDataRegion.config().setPageEvictionMode(curPageEvictionMode); + + if (idxDfrgFut != null) + idxDfrgFut.get(); } - }) - .collect(Collectors.toList()); - if (workDir != null && !oldCacheDataStores.isEmpty()) { - // We can't start defragmentation of new group on the region that has wrong eviction mode. - // So waiting of the previous cache group defragmentation is inevitable. - DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode(); + IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>(); - if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) { - prevPageEvictionMode = curPageEvictionMode; + for (CacheDataStore store : offheap.cacheDataStores()) { + // Tree can be null for not yet initialized partitions. + // This would mean that these partitions are empty. + assert store.tree() == null || store.tree().groupId() == grpId; - partDataRegion.config().setPageEvictionMode(curPageEvictionMode); + if (store.tree() != null) + cacheDataStores.put(store.partId(), store); + } - if (idxDfrgFut != null) - idxDfrgFut.get(); - } + dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion()); - IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>(); + // Another cheat. Ttl cleanup manager knows too much shit. + oldGrpCtx.caches().stream() + .filter(cacheCtx -> cacheCtx.groupId() == grpId) + .forEach(cacheCtx -> cacheCtx.ttl().unregister()); - for (CacheDataStore store : offheap.cacheDataStores()) { - // Tree can be null for not yet initialized partitions. - // This would mean that these partitions are empty. - assert store.tree() == null || store.tree().groupId() == grpId; + // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care + // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for + // cache group explicitly. + oldGrpCtx.localWalEnabled(false, false); - if (store.tree() != null) - cacheDataStores.put(store.partId(), store); - } + boolean encrypted = oldGrpCtx.config().isEncryptionEnabled(); - dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion()); + FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted); - // Another cheat. Ttl cleanup manager knows too much shit. - oldGrpCtx.caches().stream() - .filter(cacheCtx -> cacheCtx.groupId() == grpId) - .forEach(cacheCtx -> cacheCtx.ttl().unregister()); + createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> { + }); //TODO Allocated tracker. - // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care and - // WAL records will be allocated anyway just to be ignored later if we don't disable WAL for - // cache group explicitly. - oldGrpCtx.localWalEnabled(false, false); + checkCancellation(); - boolean encrypted = oldGrpCtx.config().isEncryptionEnabled(); + GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>(); - FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted); + PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory(); - createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> { - }); //TODO Allocated tracker. + CacheGroupContext newGrpCtx = new CacheGroupContext( + sharedCtx, + grpId, + oldGrpCtx.receivedFrom(), + CacheType.USER, + oldGrpCtx.config(), + oldGrpCtx.affinityNode(), + partDataRegion, + oldGrpCtx.cacheObjectContext(), + null, + null, + oldGrpCtx.localStartVersion(), + true, + false, + true + ); - GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>(); + defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock(); - PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory(); + try { + // This will initialize partition meta in index partition - meta tree and reuse list. + newGrpCtx.start(); + } + finally { + defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock(); + } - CacheGroupContext newGrpCtx = new CacheGroupContext( - sharedCtx, - grpId, - oldGrpCtx.receivedFrom(), - CacheType.USER, - oldGrpCtx.config(), - oldGrpCtx.affinityNode(), - partDataRegion, - oldGrpCtx.cacheObjectContext(), - null, - null, - oldGrpCtx.localStartVersion(), - true, - false, - true - ); + IntMap<LinkMap> linkMapByPart = new IntHashMap<>(); - defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock(); + for (CacheDataStore oldCacheDataStore : oldCacheDataStores) { + checkCancellation(); - try { - // This will initialize partition meta in index partition - meta tree and reuse list. - newGrpCtx.start(); - } - finally { - defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock(); - } + int partId = oldCacheDataStore.partId(); - IntMap<LinkMap> linkMapByPart = new IntHashMap<>(); + PartitionContext partCtx = new PartitionContext( + workDir, + grpId, + partId, + partDataRegion, + mappingDataRegion, + oldGrpCtx, + newGrpCtx, + cacheDataStores.get(partId), + pageStoreFactory + ); - for (CacheDataStore oldCacheDataStore : oldCacheDataStores) { - int partId = oldCacheDataStore.partId(); + if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) { + partCtx.createPageStore( + () -> defragmentedPartMappingFile(workDir, partId).toPath(), + partCtx.mappingPagesAllocated, + partCtx.mappingPageMemory + ); - PartitionContext partCtx = new PartitionContext( - workDir, - grpId, - partId, - partDataRegion, - mappingDataRegion, - oldGrpCtx, - newGrpCtx, - cacheDataStores.get(partId), - pageStoreFactory - ); + linkMapByPart.put(partId, partCtx.createLinkMapTree(false)); + + continue; + } - if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) { partCtx.createPageStore( () -> defragmentedPartMappingFile(workDir, partId).toPath(), partCtx.mappingPagesAllocated, partCtx.mappingPageMemory ); - linkMapByPart.put(partId, partCtx.createLinkMapTree(false)); + linkMapByPart.put(partId, partCtx.createLinkMapTree(true)); - continue; - } - - partCtx.createPageStore( - () -> defragmentedPartMappingFile(workDir, partId).toPath(), - partCtx.mappingPagesAllocated, - partCtx.mappingPageMemory - ); + checkCancellation(); - linkMapByPart.put(partId, partCtx.createLinkMapTree(true)); + partCtx.createPageStore( + () -> defragmentedPartTmpFile(workDir, partId).toPath(), + partCtx.partPagesAllocated, + partCtx.partPageMemory + ); - partCtx.createPageStore( - () -> defragmentedPartTmpFile(workDir, partId).toPath(), - partCtx.partPagesAllocated, - partCtx.partPageMemory - ); + partCtx.createNewCacheDataStore(offheap); - partCtx.createNewCacheDataStore(offheap); + copyPartitionData(partCtx, treeIter); - copyPartitionData(partCtx, treeIter); + IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> { + if (fut.error() != null) + return; - IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> { - if (fut.error() != null) - return; + PageStore oldPageStore = null; - PageStore oldPageStore = null; + try { + oldPageStore = filePageStoreMgr.getStore(grpId, partId); + } + catch (IgniteCheckedException ignore) { + } - try { - oldPageStore = filePageStoreMgr.getStore(grpId, partId); - } - catch (IgniteCheckedException ignore) { - } + assert oldPageStore != null; - if (log.isDebugEnabled()) { - log.debug(S.toString( - "Partition defragmented", - "grpId", grpId, false, - "partId", partId, false, - "oldPages", oldPageStore.pages(), false, - "newPages", partCtx.partPagesAllocated.get() + 1, false, - "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false, - "pageSize", pageSize, false, - "partFile", defragmentedPartFile(workDir, partId).getName(), false, - "workDir", workDir, false - )); - } + if (log.isDebugEnabled()) { + log.debug(S.toString( + "Partition defragmented", + "grpId", grpId, false, + "partId", partId, false, + "oldPages", oldPageStore.pages(), false, + "newPages", partCtx.partPagesAllocated.get() + 1, false, + "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false, + "pageSize", pageSize, false, + "partFile", defragmentedPartFile(workDir, partId).getName(), false, + "workDir", workDir, false + )); + } - oldPageMem.invalidate(grpId, partId); + oldPageMem.invalidate(grpId, partId); - partCtx.partPageMemory.invalidate(grpId, partId); + partCtx.partPageMemory.invalidate(grpId, partId); - DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager(); + DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager(); - pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second. + pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second. - renameTempPartitionFile(workDir, partId); - }; + renameTempPartitionFile(workDir, partId); + }; - GridFutureAdapter<?> cpFut = defragmentationCheckpoint - .forceCheckpoint("partition defragmented", null) - .futureFor(CheckpointState.FINISHED); + GridFutureAdapter<?> cpFut = defragmentationCheckpoint + .forceCheckpoint("partition defragmented", null) + .futureFor(CheckpointState.FINISHED); - cpFut.listen(cpLsnr); + cpFut.listen(cpLsnr); - cmpFut.add((IgniteInternalFuture<Object>)cpFut); - } + cmpFut.add((IgniteInternalFuture<Object>)cpFut); + } - // A bit too general for now, but I like it more then saving only the last checkpoint future. - cmpFut.markInitialized().get(); + // A bit too general for now, but I like it more then saving only the last checkpoint future. + cmpFut.markInitialized().get(); - idxDfrgFut = new GridFinishedFuture<>(); + idxDfrgFut = new GridFinishedFuture<>(); - if (filePageStoreMgr.hasIndexStore(grpId)) { - defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart); + if (filePageStoreMgr.hasIndexStore(grpId)) { + defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart); - idxDfrgFut = defragmentationCheckpoint - .forceCheckpoint("index defragmented", null) - .futureFor(CheckpointState.FINISHED); - } + idxDfrgFut = defragmentationCheckpoint + .forceCheckpoint("index defragmented", null) + .futureFor(CheckpointState.FINISHED); + } - idxDfrgFut.listen(fut -> { - oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); + idxDfrgFut = idxDfrgFut.chain(fut -> { + oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); - PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory(); + PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory(); - partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); + partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); - DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager(); + DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager(); - pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION); + pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION); - PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory(); + PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory(); - pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager(); + pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager(); - pageMgr.pageStoreMap().clear(grpId); + pageMgr.pageStoreMap().clear(grpId); - renameTempIndexFile(workDir); + renameTempIndexFile(workDir); - writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log); + writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log); - batchRenameDefragmentedCacheGroupPartitions(workDir, log); - }); + batchRenameDefragmentedCacheGroupPartitions(workDir, log); + return null; + }); + } } + catch (DefragmentationCancelledException e) { + DefragmentationFileUtils.deleteLeftovers(workDir); - // I guess we should wait for it? - if (idxDfrgFut != null) - idxDfrgFut.get(); + throw e; + } } + if (idxDfrgFut != null) + idxDfrgFut.get(); + mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); log.info("Defragmentation completed. All partitions are defragmented."); + + completionFut.onDone(); + } + catch (DefragmentationCancelledException e) { + mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + + log.info("Defragmentation has been cancelled."); + + completionFut.onDone(); + } + catch (Throwable t) { + completionFut.onDone(t); + + throw t; } finally { defragmentationCheckpoint.stop(true); @@ -439,6 +482,11 @@ public class CachePartitionDefragmentationManager { } /** */ + public IgniteInternalFuture<?> completionFuture() { + return completionFut.chain(future -> null); + } + + /** */ public void createIndexPageStore( int grpId, File workDir, @@ -476,6 +524,39 @@ public class CachePartitionDefragmentationManager { } /** + * Cancel the process of defragmentation. + * + * @return {@code true} if process was cancelled by this method. + */ + public boolean cancel() { + if (completionFut.isDone()) + return false; + + if (cancel.compareAndSet(false, true)) { + try { + completionFut.get(); + } + catch (Throwable ignore) { + } + + return true; + } + + return false; + } + + /** */ + private void checkCancellation() throws DefragmentationCancelledException { + if (cancel.get()) + throw new DefragmentationCancelledException(); + } + + /** */ + public String status() { + throw new UnsupportedOperationException("Not implemented yet."); + } + + /** * Defragmentate partition. * * @param partCtx @@ -501,6 +582,8 @@ public class CachePartitionDefragmentationManager { AtomicInteger entriesProcessed = new AtomicInteger(); treeIter.iterate(tree, partCtx.cachePageMemory, (tree0, io, pageAddr, idx) -> { + checkCancellation(); + if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) { defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock(); @@ -541,6 +624,8 @@ public class CachePartitionDefragmentationManager { return true; }); + checkCancellation(); + defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock(); defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock(); @@ -673,12 +758,15 @@ public class CachePartitionDefragmentationManager { CheckpointTimeoutLock cpLock = defragmentationCheckpoint.checkpointTimeoutLock(); + Runnable cancellationChecker = this::checkCancellation; + idx.defragment( grpCtx, newCtx, (PageMemoryEx)partDataRegion.pageMemory(), mappingByPartition, - cpLock + cpLock, + cancellationChecker ); } @@ -824,4 +912,10 @@ public class CachePartitionDefragmentationManager { this.newCacheDataStore = newCacheDataStore; } } + + /** */ + private static class DefragmentationCancelledException extends RuntimeException { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java index b4273cd..214e17d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java @@ -81,16 +81,7 @@ public class DefragmentationFileUtils { U.delete(defragmentationCompletionMarkerFile(workDir)); - for (File file : workDir.listFiles()) { - String fileName = file.getName(); - - if ( - fileName.startsWith(DFRG_PARTITION_FILE_PREFIX) - || fileName.startsWith(DFRG_INDEX_FILE_NAME) - || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX) - ) - U.delete(file); - } + deleteLeftovers(workDir); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -98,6 +89,24 @@ public class DefragmentationFileUtils { } /** + * Deletes all defragmentation related file from work directory, except for completion marker. + * + * @param workDir Cache group working directory. + */ + public static void deleteLeftovers(File workDir) { + for (File file : workDir.listFiles()) { + String fileName = file.getName(); + + if ( + fileName.startsWith(DFRG_PARTITION_FILE_PREFIX) + || fileName.startsWith(DFRG_INDEX_FILE_NAME) + || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX) + ) + U.delete(file); + } + } + + /** * Checks whether cache group defragmentation completed or not. Completes it if all that's left is renaming. * * @param workDir Cache group working directory. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java index 6bc3ddc..499e247 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.ignite.maintenance.MaintenanceTask; @@ -29,31 +30,31 @@ import static org.apache.ignite.internal.processors.cache.persistence.defragment */ public class DefragmentationParameters { /** */ - public static final String CACHE_GROUP_ID_SEPARATOR = ","; + public static final String SEPARATOR = "/"; /** */ - private final List<Integer> cacheGrpIds; + private final List<String> cacheNames; /** - * @param cacheGrpIds Id of cache group for defragmentations. + * @param cacheNames Names of caches for defragmentations. */ - private DefragmentationParameters(List<Integer> cacheGrpIds) { - this.cacheGrpIds = cacheGrpIds; + private DefragmentationParameters(List<String> cacheNames) { + this.cacheNames = cacheNames; } /** * Convert parameter to maintenance storage. * - * @param cacheGroupIds Cache group ids for defragmentation. + * @param cacheNames Names of caches for defragmentations. * @return Maintenance task. */ - public static MaintenanceTask toStore(List<Integer> cacheGroupIds) { + public static MaintenanceTask toStore(List<String> cacheNames) { return new MaintenanceTask( DEFRAGMENTATION_MNTC_TASK_NAME, - "Cache group defragmentation", - cacheGroupIds.stream() + "Caches defragmentation", + cacheNames.stream() .map(String::valueOf) - .collect(Collectors.joining(CACHE_GROUP_ID_SEPARATOR)) + .collect(Collectors.joining(SEPARATOR)) ); } @@ -62,17 +63,20 @@ public class DefragmentationParameters { * @return Defragmentation parameters. */ public static DefragmentationParameters fromStore(MaintenanceTask rawTask) { + if (rawTask.parameters() == null) + return new DefragmentationParameters(Collections.emptyList()); + return new DefragmentationParameters(Arrays.stream(rawTask.parameters() - .split(CACHE_GROUP_ID_SEPARATOR)) - .map(Integer::valueOf) + .split(SEPARATOR)) .collect(Collectors.toList()) ); } /** - * @return Cache groups ids. + * @return Cache names. */ - public List<Integer> cacheGroupIds() { - return cacheGrpIds; + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + public List<String> cacheNames() { + return cacheNames; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java index a809579..b3bbe51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.function.Function; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager; +import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.maintenance.MaintenanceAction; import org.apache.ignite.maintenance.MaintenanceWorkflowCallback; import org.jetbrains.annotations.NotNull; @@ -37,16 +38,22 @@ public class DefragmentationWorkflowCallback implements MaintenanceWorkflowCallb /** Logger provider. */ private final Function<Class<?>, IgniteLogger> logProvider; + /** Failure processor. */ + private final FailureProcessor failureProc; + /** * @param logProvider Logger provider. * @param defrgMgr Defragmentation manager. + * @param failureProc Failure processor. */ public DefragmentationWorkflowCallback( Function<Class<?>, IgniteLogger> logProvider, - CachePartitionDefragmentationManager defrgMgr + CachePartitionDefragmentationManager defrgMgr, + FailureProcessor failureProc ) { this.defrgMgr = defrgMgr; this.logProvider = logProvider; + this.failureProc = failureProc; } /** {@inheritDoc} */ @@ -56,11 +63,11 @@ public class DefragmentationWorkflowCallback implements MaintenanceWorkflowCallb /** {@inheritDoc} */ @Override public @NotNull List<MaintenanceAction<?>> allActions() { - return Collections.singletonList(automaticAction()); + return Collections.singletonList(new StopDefragmentationAction(defrgMgr)); } /** {@inheritDoc} */ @Override public @Nullable MaintenanceAction<Boolean> automaticAction() { - return new ExecuteDefragmentationAction(logProvider, defrgMgr); + return new ExecuteDefragmentationAction(logProvider, defrgMgr, failureProc); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java index 42b2de7..0758772 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java @@ -21,7 +21,10 @@ import java.util.function.Function; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager; +import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.maintenance.MaintenanceAction; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -36,29 +39,53 @@ class ExecuteDefragmentationAction implements MaintenanceAction<Boolean> { /** Defragmentation manager. */ private final CachePartitionDefragmentationManager defrgMgr; + /** Failure processor. */ + private final FailureProcessor failureProc; + /** * @param logFunction Logger provider. * @param defrgMgr Defragmentation manager. + * @param failureProc Failure processor. */ public ExecuteDefragmentationAction( Function<Class<?>, IgniteLogger> logFunction, - CachePartitionDefragmentationManager defrgMgr + CachePartitionDefragmentationManager defrgMgr, + FailureProcessor failureProc ) { this.log = logFunction.apply(ExecuteDefragmentationAction.class); this.defrgMgr = defrgMgr; + this.failureProc = failureProc; } /** {@inheritDoc} */ @Override public Boolean execute() { try { - defrgMgr.executeDefragmentation(); + defrgMgr.beforeDefragmentation(); } catch (IgniteCheckedException | IgniteException e) { - log.error("Defragmentation is failed", e); + log.error("Checkpoint before defragmentation failed", e); return false; } + Thread defrgThread = new Thread(() -> { + try { + defrgMgr.executeDefragmentation(); + } + catch (Throwable e) { + log.error("Defragmentation failed", e); + + // TODO Check other options. + failureProc.process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } + }); + + defrgThread.setName("defragmentation-thread"); + + defrgThread.setDaemon(true); + + defrgThread.start(); + return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java similarity index 56% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java index 42b2de7..4b40b9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java @@ -17,58 +17,37 @@ package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance; -import java.util.function.Function; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager; import org.apache.ignite.maintenance.MaintenanceAction; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** - * Action which allows to start the defragmentation process. + * Action which allows to stop the defragmentation at any time from maintenance mode processor. */ -class ExecuteDefragmentationAction implements MaintenanceAction<Boolean> { - /** Logger. */ - private final IgniteLogger log; - +class StopDefragmentationAction implements MaintenanceAction<Boolean> { /** Defragmentation manager. */ - private final CachePartitionDefragmentationManager defrgMgr; + private final CachePartitionDefragmentationManager defragmentationMgr; /** - * @param logFunction Logger provider. - * @param defrgMgr Defragmentation manager. + * @param defragmentationMgr Defragmentation manager. */ - public ExecuteDefragmentationAction( - Function<Class<?>, IgniteLogger> logFunction, - CachePartitionDefragmentationManager defrgMgr - ) { - this.log = logFunction.apply(ExecuteDefragmentationAction.class); - this.defrgMgr = defrgMgr; + public StopDefragmentationAction(CachePartitionDefragmentationManager defragmentationMgr) { + this.defragmentationMgr = defragmentationMgr; } /** {@inheritDoc} */ @Override public Boolean execute() { - try { - defrgMgr.executeDefragmentation(); - } - catch (IgniteCheckedException | IgniteException e) { - log.error("Defragmentation is failed", e); - - return false; - } - - return true; + return defragmentationMgr.cancel(); } /** {@inheritDoc} */ @Override public @NotNull String name() { - return "execute"; + return "stop"; } /** {@inheritDoc} */ @Override public @Nullable String description() { - return "Starting the process of defragmentation."; + return "Stopping the defragmentation process immediately"; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 236de0c..117fdeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -501,6 +501,7 @@ public interface GridQueryIndexing { * @param partPageMem Partition page memory. * @param mappingByPart Mapping page memory. * @param cpLock Defragmentation checkpoint read lock. + * @param cancellationChecker Cancellation checker. * * @throws IgniteCheckedException If failed. */ @@ -509,6 +510,7 @@ public interface GridQueryIndexing { CacheGroupContext newCtx, PageMemoryEx partPageMem, IntMap<LinkMap> mappingByPart, - CheckpointTimeoutLock cpLock + CheckpointTimeoutLock cpLock, + Runnable cancellationChecker ) throws IgniteCheckedException; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index b3b644d..bda05be 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -5803,10 +5803,24 @@ public abstract class IgniteUtils { * @param e Enum value to write, possibly {@code null}. * @throws IOException If write failed. */ - public static <E extends Enum> void writeEnum(DataOutput out, E e) throws IOException { + public static <E extends Enum<E>> void writeEnum(DataOutput out, E e) throws IOException { out.writeByte(e == null ? -1 : e.ordinal()); } + /** */ + public static <E extends Enum<E>> E readEnum(DataInput in, Class<E> enumCls) throws IOException { + byte ordinal = in.readByte(); + + if (ordinal == (byte)-1) + return null; + + int idx = ordinal & 0xFF; + + E[] values = enumCls.getEnumConstants(); + + return idx < values.length ? values[idx] : null; + } + /** * Gets collection value by index. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java new file mode 100644 index 0000000..9cc5eab --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.defragmentation; + +/** */ +public enum VisorDefragmentationOperation { + /** */ + SCHEDULE, + /** */ + STATUS, + /** */ + CANCEL +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java new file mode 100644 index 0000000..14cea62 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.defragmentation; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.processors.task.GridVisorManagementTask; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.apache.ignite.maintenance.MaintenanceAction; +import org.apache.ignite.maintenance.MaintenanceRegistry; +import org.apache.ignite.maintenance.MaintenanceTask; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME; +import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore; + +/** */ +@GridInternal +@GridVisorManagementTask +public class VisorDefragmentationTask extends VisorMultiNodeTask + <VisorDefragmentationTaskArg, VisorDefragmentationTaskResult, VisorDefragmentationTaskResult> +{ + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob<VisorDefragmentationTaskArg, VisorDefragmentationTaskResult> job( + VisorDefragmentationTaskArg arg + ) { + return new VisorDefragmentationJob(arg, debug); + } + + /** {@inheritDoc} */ + @Nullable @Override protected VisorDefragmentationTaskResult reduce0(List<ComputeJobResult> results) { + if (taskArg.operation() == VisorDefragmentationOperation.SCHEDULE) { + StringBuilder msg = new StringBuilder(); + + for (ComputeJobResult res : results) { + msg.append(res.getNode().consistentId()).append(":\n"); + + if (res.getData() == null) + msg.append(" err=").append(res.getException()).append('\n'); + else { + VisorDefragmentationTaskResult data = res.getData(); + + msg.append(" success=").append(data.isSuccess()).append('\n'); + msg.append(" msg=").append(data.getMessage()).append('\n'); + } + } + + return new VisorDefragmentationTaskResult(true, msg.toString()); + } + + assert results.size() == 1; + + ComputeJobResult res = results.get(0); + + if (res.getException() == null) + return res.getData(); + + throw res.getException(); + } + + /** */ + private static class VisorDefragmentationJob extends VisorJob<VisorDefragmentationTaskArg, VisorDefragmentationTaskResult> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** + * Create job with specified argument. + * + * @param arg Job argument. + * @param debug Flag indicating whether debug information should be printed into node log. + */ + protected VisorDefragmentationJob(@Nullable VisorDefragmentationTaskArg arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected VisorDefragmentationTaskResult run( + @Nullable VisorDefragmentationTaskArg arg + ) throws IgniteException { + switch (arg.operation()) { + case SCHEDULE: + return runSchedule(arg); + + case STATUS: + return runStatus(arg); + + case CANCEL: + return runCancel(arg); + } + + throw new IllegalArgumentException("Operation: " + arg.operation()); + } + + /** */ + private VisorDefragmentationTaskResult runSchedule(VisorDefragmentationTaskArg arg) { + MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); + + MaintenanceTask oldTask; + + try { + List<String> cacheNames = arg.cacheNames(); + + oldTask = mntcReg.registerMaintenanceTask(toStore(cacheNames == null ? Collections.emptyList() : cacheNames)); + } + catch (IgniteCheckedException e) { + return new VisorDefragmentationTaskResult(false, "Scheduling failed: " + e.getMessage()); + } + + return new VisorDefragmentationTaskResult( + true, + "Scheduling completed successfully." + + (oldTask == null ? "" : " Previously scheduled task has been removed.") + ); + } + + /** */ + private VisorDefragmentationTaskResult runStatus(VisorDefragmentationTaskArg arg) { + MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); + + if (!mntcReg.isMaintenanceMode()) + return new VisorDefragmentationTaskResult(false, "Node is not in maintenance node."); + + IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database(); + + assert dbMgr instanceof GridCacheDatabaseSharedManager; + + CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr) + .defragmentationManager(); + + if (defrgMgr == null) + return new VisorDefragmentationTaskResult(true, "There's no active defragmentation process on the node."); + + return new VisorDefragmentationTaskResult(true, defrgMgr.status()); + } + + /** */ + private VisorDefragmentationTaskResult runCancel(VisorDefragmentationTaskArg arg) { + assert arg.cacheNames() == null : "Cancelling specific caches is not yet implemented"; + + MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); + + if (!mntcReg.isMaintenanceMode()) { + boolean deleted = mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + + String msg = deleted + ? "Scheduled defragmentation task cancelled successfully." + : "Scheduled defragmentation task is not found."; + + return new VisorDefragmentationTaskResult(true, msg); + } + else { + List<MaintenanceAction<?>> actions; + + try { + actions = mntcReg.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + } + catch (IgniteException e) { + return new VisorDefragmentationTaskResult(true, "Defragmentation is already completed or has been cancelled previously."); + } + + Optional<MaintenanceAction<?>> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny(); + + assert stopAct.isPresent(); + + try { + Object res = stopAct.get().execute(); + + assert res instanceof Boolean; + + boolean cancelled = (Boolean)res; + + String msg = cancelled + ? "Defragmentation cancelled successfully." + : "Defragmentation is already completed or has been cancelled previously."; + + return new VisorDefragmentationTaskResult(true, msg); + } + catch (Exception e) { + return new VisorDefragmentationTaskResult(false, "Exception occurred: " + e.getMessage()); + } + } + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java new file mode 100644 index 0000000..1b1c8b1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.defragmentation; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.List; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") +public class VisorDefragmentationTaskArg extends IgniteDataTransferObject { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** */ + private VisorDefragmentationOperation operation; + + /** */ + private List<String> nodeIds; + + /** */ + private List<String> cacheNames; + + /** Empty constructor for serialization. */ + public VisorDefragmentationTaskArg() { + // No-op. + } + + /** */ + public VisorDefragmentationTaskArg( + VisorDefragmentationOperation operation, + List<String> nodeIds, + List<String> cacheNames + ) { + + this.operation = operation; + this.nodeIds = nodeIds; + this.cacheNames = cacheNames; + } + + /** */ + public VisorDefragmentationOperation operation() { + return operation; + } + + /** */ + public List<String> nodeIds() { + return nodeIds; + } + + /** */ + public List<String> cacheNames() { + return cacheNames; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeEnum(out, operation); + + U.writeCollection(out, nodeIds); + + U.writeCollection(out, cacheNames); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + operation = U.readEnum(in, VisorDefragmentationOperation.class); + + nodeIds = U.readList(in); + + cacheNames = U.readList(in); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java new file mode 100644 index 0000000..4575ac2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.defragmentation; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class VisorDefragmentationTaskResult extends IgniteDataTransferObject { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean success; + + /** */ + private String msg; + + /** Empty constructor for serialization. */ + public VisorDefragmentationTaskResult() { + // No-op. + } + + /** */ + public VisorDefragmentationTaskResult(boolean success, String msg) { + this.success = success; + + this.msg = msg; + } + + /** */ + public boolean isSuccess() { + return success; + } + + /** */ + public String getMessage() { + return msg; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + out.writeBoolean(success); + + U.writeString(out, msg); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException { + success = in.readBoolean(); + + msg = U.readString(in); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java index 9cebef0..a2cabde 100644 --- a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java @@ -104,8 +104,9 @@ public interface MaintenanceRegistry { * Deletes {@link MaintenanceTask} of given ID from maintenance registry. * * @param maintenanceTaskName name of {@link MaintenanceTask} to be deleted. + * @return {@code true} if existing task has been deleted. */ - public void unregisterMaintenanceTask(String maintenanceTaskName); + public boolean unregisterMaintenanceTask(String maintenanceTaskName); /** * Returns active {@link MaintenanceTask} by its name. diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 92d01dd..f480d0f 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -2124,6 +2124,11 @@ org.apache.ignite.internal.visor.debug.VisorThreadDumpTaskResult org.apache.ignite.internal.visor.debug.VisorThreadInfo org.apache.ignite.internal.visor.debug.VisorThreadLockInfo org.apache.ignite.internal.visor.debug.VisorThreadMonitorInfo +org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask +org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask$VisorDefragmentationJob +org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation +org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskArg +org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskResult org.apache.ignite.internal.visor.event.VisorGridDeploymentEvent org.apache.ignite.internal.visor.event.VisorGridDiscoveryEvent org.apache.ignite.internal.visor.event.VisorGridEvent diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java index 10e1db1..9b1067a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java @@ -43,6 +43,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cluster.ClusterState.ACTIVE; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME; /** * Concurrent and advanced tests for WAL state change. @@ -190,10 +191,18 @@ public class WalModeChangeAdvancedSelfTest extends WalModeChangeCommonAbstractSe cleanCacheDir(cacheToClean); - // Node should start successfully and not enter maintenance mode as MaintenanceRecord will be cleaned + // Node should start successfully and enter maintenance mode. MaintenanceRecord will be cleaned // automatically because corrupted PDS was deleted during downtime srv = startGrid(config(SRV_1, false, false)); - assertFalse(srv.context().maintenanceRegistry().isMaintenanceMode()); + assertTrue(srv.context().maintenanceRegistry().isMaintenanceMode()); + + try { + srv.context().maintenanceRegistry().actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME); + + fail("Maintenance task is not completed yet for some reason."); + } + catch (Exception ignore) { + } stopAllGrids(false); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java index 8f06a48..4ce9bb4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java @@ -39,6 +39,9 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteState; +import org.apache.ignite.Ignition; +import org.apache.ignite.IgnitionListener; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; @@ -48,12 +51,12 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.maintenance.MaintenanceFileStore; import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; -import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.maintenance.MaintenanceRegistry; import org.apache.ignite.testframework.GridTestUtils; @@ -200,10 +203,25 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest { startGrid(0); + waitForDefragmentation(0); + + assertEquals(ClusterState.INACTIVE, grid(0).context().state().clusterState().state()); + + GridTestUtils.assertThrowsAnyCause( + log, + () -> { + grid(0).cluster().state(ClusterState.ACTIVE); + + return null; + }, + IgniteCheckedException.class, + "Failed to activate cluster (node is in maintenance mode)" + ); + long[] newPartLen = partitionSizes(workDir); for (int p = 0; p < PARTS; p++) - assertTrue(newPartLen[p] < oldPartLen[p]); //TODO Fails. + assertTrue(newPartLen[p] < oldPartLen[p]); long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length(); @@ -252,11 +270,22 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest { } /** */ + protected void waitForDefragmentation(int idx) throws IgniteCheckedException { + IgniteEx ig = grid(idx); + + ((GridCacheDatabaseSharedManager)ig.context().cache().context().database()) + .defragmentationManager() + .completionFuture() + .get(); + } + + /** */ protected void createMaintenanceRecord() throws IgniteCheckedException { IgniteEx grid = grid(0); + MaintenanceRegistry mntcReg = grid.context().maintenanceRegistry(); - mntcReg.registerMaintenanceTask(toStore(Collections.singletonList(groupIdForCache(grid, DEFAULT_CACHE_NAME)))); + mntcReg.registerMaintenanceTask(toStore(Collections.singletonList(DEFAULT_CACHE_NAME))); } /** @@ -383,6 +412,29 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest { File workDir = resolveCacheWorkDir(ig); + //Defragmentation should fail when node starts. + startAndAwaitNodeFail(workDir); + + c.accept(workDir); + + startGrid(0); // Fails here VERY rarely. WTF? + + waitForDefragmentation(0); + + stopGrid(0); + + // Everything must be completed. + startGrid(0).cluster().state(ClusterState.ACTIVE); + + validateCache(grid(0).cache(DEFAULT_CACHE_NAME)); + + validateLeftovers(workDir); + } + + /** + * @throws IgniteInterruptedCheckedException If fail. + */ + private void startAndAwaitNodeFail(File workDir) throws IgniteInterruptedCheckedException { String errMsg = "Failed to create defragmentation completion marker."; AtomicBoolean errOccurred = new AtomicBoolean(); @@ -405,34 +457,32 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest { return cfg; }; - try { - startGrid(0, cfgOp); - } - catch (Exception ignore) { - // No-op. - } - - // Failed node can leave interrupted status of the thread that needs to be cleared, - // otherwise following "wait" wouldn't work. - // This call can't be moved inside of "catch" block because interruption can actually be silent. - Thread.interrupted(); - - assertTrue(GridTestUtils.waitForCondition(errOccurred::get, 10_000L)); - - assertTrue(GridTestUtils.waitForCondition(() -> G.allGrids().isEmpty(), 10_000L)); - - c.accept(workDir); - - startGrid(0); - - stopGrid(0); + AtomicBoolean nodeStopped = new AtomicBoolean(); + IgnitionListener nodeStopListener = (name, state) -> { + if (name.equals(getTestIgniteInstanceName(0)) && state == IgniteState.STOPPED_ON_FAILURE) + nodeStopped.set(true); + }; - // Everything must be completed. - startGrid(0).cluster().state(ClusterState.ACTIVE); + Ignition.addListener(nodeStopListener); + try { + try { + startGrid(0, cfgOp); + } + catch (Exception ignore) { + // No-op. + } - validateCache(grid(0).cache(DEFAULT_CACHE_NAME)); + // Failed node can leave interrupted status of the thread that needs to be cleared, + // otherwise following "wait" wouldn't work. + // This call can't be moved inside of "catch" block because interruption can actually be silent. + Thread.interrupted(); - validateLeftovers(workDir); + assertTrue(GridTestUtils.waitForCondition(errOccurred::get, 3_000L)); + assertTrue(GridTestUtils.waitForCondition(nodeStopped::get, 3_000L)); + } + finally { + Ignition.removeListener(nodeStopListener); + } } /** */ @@ -463,6 +513,8 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest { startGrid(0); + waitForDefragmentation(0); + File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false); AtomicReference<File> cachePartFile = new AtomicReference<>(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java index 5f0b04f..732a48a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java @@ -345,7 +345,8 @@ public class DummyQueryIndexing implements GridQueryIndexing { CacheGroupContext newCtx, PageMemoryEx partPageMem, IntMap<LinkMap> mappingByPart, - CheckpointTimeoutLock cpLock + CheckpointTimeoutLock cpLock, + Runnable cancellationChecker ) throws IgniteCheckedException { // No-op. } diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index 5950ed1..a4c2ec4 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -221,6 +221,15 @@ If the file name isn't specified the output file name is: '<typeId>.bin' Backup data files of only given caches: control.(sh|bat) --persistence backup caches cache1,cache2,cache3 + Schedule PDS defragmentation on given nodes for all caches: + control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 + + Schedule PDS defragmentation on given nodes but only for given caches: + control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 --caches cache1,cache2,cache3 + + Cancel scheduled or active PDS defragmentation on underlying node: + control.(sh|bat) --defragmentation cancel + By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index 5950ed1..a4c2ec4 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -221,6 +221,15 @@ If the file name isn't specified the output file name is: '<typeId>.bin' Backup data files of only given caches: control.(sh|bat) --persistence backup caches cache1,cache2,cache3 + Schedule PDS defragmentation on given nodes for all caches: + control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 + + Schedule PDS defragmentation on given nodes but only for given caches: + control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 --caches cache1,cache2,cache3 + + Cancel scheduled or active PDS defragmentation on underlying node: + control.(sh|bat) --defragmentation cancel + By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 517bc69..6d4b4e3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -3201,8 +3201,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { CacheGroupContext newCtx, PageMemoryEx partPageMem, IntMap<LinkMap> mappingByPart, - CheckpointTimeoutLock cpLock + CheckpointTimeoutLock cpLock, + Runnable cancellationChecker ) throws IgniteCheckedException { - defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, log); + defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, cancellationChecker, log); } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java index c41f587..19d15c4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java @@ -80,6 +80,7 @@ public class IndexingDefragmentation { * @param partPageMem Partition page memory. * @param mappingByPartition Mapping page memory. * @param cpLock Defragmentation checkpoint read lock. + * @param cancellationChecker Cancellation checker. * @param log Log. * * @throws IgniteCheckedException If failed. @@ -90,6 +91,7 @@ public class IndexingDefragmentation { PageMemoryEx partPageMem, IntMap<LinkMap> mappingByPartition, CheckpointTimeoutLock cpLock, + Runnable cancellationChecker, IgniteLogger log ) throws IgniteCheckedException { int pageSize = grpCtx.cacheObjectContext().kernalContext().grid().configuration().getDataStorageConfiguration().getPageSize(); @@ -115,6 +117,8 @@ public class IndexingDefragmentation { if (cctx.groupId() != grpCtx.groupId()) continue; // Not our index. + cancellationChecker.run(); + GridH2RowDescriptor rowDesc = table.rowDescriptor(); List<Index> indexes = table.getIndexes(); @@ -155,6 +159,8 @@ public class IndexingDefragmentation { H2Tree tree = oldH2Idx.treeForRead(i); treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> { + cancellationChecker.run(); + if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) { cpLock.checkpointReadUnlock(); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java index bbb69ae..cf6b422 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java @@ -136,6 +136,8 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati startGrid(0); + waitForDefragmentation(0); + long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length(); assertTrue(newIdxFileLen <= oldIdxFileLen);