IGNITE-8973 Calculate partition hash and print into standard output Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8309cef7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8309cef7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8309cef7 Branch: refs/heads/ignite-8446 Commit: 8309cef795e78f9a1a988eb02326278c6f16f1f5 Parents: 5e8669a Author: Anton Kalashnikov <kaa....@yandex.ru> Authored: Tue Jul 31 16:13:27 2018 +0300 Committer: Andrey Gura <ag...@apache.org> Committed: Tue Jul 31 16:13:56 2018 +0300 ---------------------------------------------------------------------- .../internal/commandline/CommandHandler.java | 162 ++++++------- .../commandline/cache/CacheArguments.java | 34 +++ .../cache/verify/IdleVerifyDumpResult.java | 73 ++++++ .../cache/verify/IdleVerifyResultV2.java | 85 +++++-- .../verify/VerifyBackupPartitionsDumpTask.java | 230 +++++++++++++++++++ .../visor/verify/VisorIdleVerifyDumpTask.java | 37 +++ .../verify/VisorIdleVerifyDumpTaskArg.java | 73 ++++++ .../visor/verify/VisorIdleVerifyJob.java | 83 +++++++ .../resources/META-INF/classnames.properties | 2 + .../ignite/util/GridCommandHandlerTest.java | 178 +++++++++++++- 10 files changed, 843 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java index ca14e08..7b5ce44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java @@ -34,6 +34,7 @@ import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.client.GridClientAuthenticationException; @@ -54,9 +55,7 @@ import org.apache.ignite.internal.processors.cache.verify.CacheInfo; import org.apache.ignite.internal.processors.cache.verify.ContentionInfo; import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord; -import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; import org.apache.ignite.internal.processors.cache.verify.PartitionKey; -import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -84,6 +83,8 @@ import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult; import org.apache.ignite.internal.visor.verify.VisorContentionTask; import org.apache.ignite.internal.visor.verify.VisorContentionTaskArg; import org.apache.ignite.internal.visor.verify.VisorContentionTaskResult; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTask; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg; import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTask; import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg; import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskResult; @@ -154,8 +155,15 @@ public class CommandHandler { /** */ protected static final String CMD_PING_TIMEOUT = "--ping-timeout"; + /** */ + private static final String CMD_DUMP = "--dump"; + + /** */ + private static final String CMD_SKIP_ZEROS = "--skipZeros"; + /** List of optional auxiliary commands. */ private static final Set<String> AUX_COMMANDS = new HashSet<>(); + static { AUX_COMMANDS.add(CMD_HELP); AUX_COMMANDS.add(CMD_HOST); @@ -237,7 +245,7 @@ public class CommandHandler { private static final String TX_ORDER = "order"; /** */ - public static final String CMD_TX_ORDER_START_TIME="START_TIME"; + public static final String CMD_TX_ORDER_START_TIME = "START_TIME"; /** */ private static final String TX_SERVERS = "servers"; @@ -463,14 +471,14 @@ public class CommandHandler { } /** - * * @param client Client. * @param taskCls Task class. * @param taskArgs Task arguments. * @return Task result. * @throws GridClientException If failed to execute task. */ - private <R> R executeTask(GridClient client, Class<?> taskCls, Object taskArgs) throws GridClientException { + private <R> R executeTask(GridClient client, Class<? extends ComputeTask<?, R>> taskCls, + Object taskArgs) throws GridClientException { return executeTaskByNameOnNode(client, taskCls.getName(), taskArgs, null); } @@ -595,7 +603,7 @@ public class CommandHandler { usage(" Show information about caches, groups or sequences that match a regex:", CACHE, " list regexPattern [groups|seq] [nodeId]"); usage(" Show hot keys that are point of contention for multiple transactions:", CACHE, " contention minQueueSize [nodeId] [maxPrint]"); - usage(" Verify partition counters and hashes between primary and backups on idle cluster:", CACHE, " idle_verify [cache1,...,cacheN]"); + usage(" Verify partition counters and hashes between primary and backups on idle cluster:", CACHE, " idle_verify [--dump] [--skipZeros] [cache1,...,cacheN]"); usage(" Validate custom indexes on idle cluster:", CACHE, " validate_indexes [cache1,...,cacheN] [nodeId] [checkFirst|checkThrough]"); log(" If [nodeId] is not specified, contention and validate_indexes commands will be broadcasted to all server nodes."); @@ -717,7 +725,8 @@ public class CommandHandler { } /** - * Executes appropriate version of idle_verify check. Old version will be used if there are old nodes in the cluster. + * Executes appropriate version of idle_verify check. Old version will be used if there are old nodes in the + * cluster. * * @param client Client. * @param cacheArgs Cache args. @@ -739,7 +748,9 @@ public class CommandHandler { } } - if (idleVerifyV2) + if (cacheArgs.dump()) + cacheIdleVerifyDump(client, cacheArgs); + else if (idleVerifyV2) cacheIdleVerifyV2(client, cacheArgs); else legacyCacheIdleVerify(client, cacheArgs); @@ -760,7 +771,7 @@ public class CommandHandler { nl(); } else { - log ("idle_verify check has finished, found " + conflicts.size() + " conflict partitions."); + log("idle_verify check has finished, found " + conflicts.size() + " conflict partitions."); nl(); for (Map.Entry<PartitionKey, List<PartitionHashRecord>> entry : conflicts.entrySet()) { @@ -775,66 +786,33 @@ public class CommandHandler { * @param client Client. * @param cacheArgs Cache args. */ + private void cacheIdleVerifyDump(GridClient client, CacheArguments cacheArgs) throws GridClientException { + String path = executeTask( + client, + VisorIdleVerifyDumpTask.class, + new VisorIdleVerifyDumpTaskArg(cacheArgs.caches(), cacheArgs.isSkipZeros()) + ); + + log("VisorIdleVerifyDumpTask successfully written output to '" + path + "'"); + } + + /** + * @param client Client. + * @param cacheArgs Cache args. + */ private void cacheIdleVerifyV2(GridClient client, CacheArguments cacheArgs) throws GridClientException { IdleVerifyResultV2 res = executeTask( client, VisorIdleVerifyTaskV2.class, new VisorIdleVerifyTaskArg(cacheArgs.caches())); - if (!res.hasConflicts()) { - log("idle_verify check has finished, no conflicts have been found."); - nl(); - } - else { - int cntrConflictsSize = res.counterConflicts().size(); - int hashConflictsSize = res.hashConflicts().size(); - - log("idle_verify check has finished, found " + (cntrConflictsSize + hashConflictsSize) + - " conflict partitions: [counterConflicts=" + cntrConflictsSize + ", hashConflicts=" + - hashConflictsSize + "]"); - nl(); - - if (!F.isEmpty(res.counterConflicts())) { - log("Update counter conflicts:"); - - for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : res.counterConflicts().entrySet()) { - log("Conflict partition: " + entry.getKey()); - - log("Partition instances: " + entry.getValue()); - } - - nl(); - } - - if (!F.isEmpty(res.hashConflicts())) { - log("Hash conflicts:"); - - for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : res.hashConflicts().entrySet()) { - log("Conflict partition: " + entry.getKey()); - - log("Partition instances: " + entry.getValue()); - } - - nl(); - } - } - - if (!F.isEmpty(res.movingPartitions())) { - log("Verification was skipped for " + res.movingPartitions().size() + " MOVING partitions:"); - - for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : res.movingPartitions().entrySet()) { - log("Rebalancing partition: " + entry.getKey()); - - log("Partition instances: " + entry.getValue()); - } - - nl(); - } + res.print(System.out::print); } /** * Change baseline. * * @param client Client. - * @param baselineAct Baseline action to execute. @throws GridClientException If failed to execute baseline action. + * @param baselineAct Baseline action to execute. @throws GridClientException If failed to execute baseline + * action. * @param baselineArgs Baseline action arguments. * @throws Throwable If failed to execute baseline action. */ @@ -928,7 +906,7 @@ public class CommandHandler { else { log("Baseline nodes:"); - for(VisorBaselineNode node : baseline.values()) { + for (VisorBaselineNode node : baseline.values()) { log(" ConsistentID=" + node.getConsistentId() + ", STATE=" + (srvs.containsKey(node.getConsistentId()) ? "ONLINE" : "OFFLINE")); } @@ -950,7 +928,7 @@ public class CommandHandler { else { log("Other nodes:"); - for(VisorBaselineNode node : others) + for (VisorBaselineNode node : others) log(" ConsistentID=" + node.getConsistentId()); log("Number of other nodes: " + others.size()); @@ -1123,7 +1101,7 @@ public class CommandHandler { * @throws Throwable If failed to execute wal action. */ private void wal(GridClient client, String walAct, String walArgs) throws Throwable { - switch (walAct){ + switch (walAct) { case WAL_DELETE: deleteUnusedWalSegments(client, walArgs); @@ -1145,7 +1123,7 @@ public class CommandHandler { */ private void deleteUnusedWalSegments(GridClient client, String walArgs) throws Throwable { VisorWalTaskResult res = executeTask(client, VisorWalTask.class, - walArg(VisorWalTaskOperation.DELETE_UNUSED_WAL_SEGMENTS, walArgs)); + walArg(VisorWalTaskOperation.DELETE_UNUSED_WAL_SEGMENTS, walArgs)); printDeleteWalSegments0(res); } @@ -1157,7 +1135,7 @@ public class CommandHandler { */ private void printUnusedWalSegments(GridClient client, String walArgs) throws Throwable { VisorWalTaskResult res = executeTask(client, VisorWalTask.class, - walArg(VisorWalTaskOperation.PRINT_UNUSED_WAL_SEGMENTS, walArgs)); + walArg(VisorWalTaskOperation.PRINT_UNUSED_WAL_SEGMENTS, walArgs)); printUnusedWalSegments0(res); } @@ -1168,7 +1146,7 @@ public class CommandHandler { * @param s Argument from command line. * @return Task argument. */ - private VisorWalTaskArg walArg(VisorWalTaskOperation op, String s){ + private VisorWalTaskArg walArg(VisorWalTaskOperation op, String s) { List<String> consistentIds = null; if (!F.isEmpty(s)) { @@ -1202,23 +1180,23 @@ public class CommandHandler { Map<String, Exception> failRes = taskRes.exceptions(); Map<String, VisorClusterNode> nodesInfo = taskRes.getNodesInfo(); - for(Map.Entry<String, Collection<String>> entry: res.entrySet()) { + for (Map.Entry<String, Collection<String>> entry : res.entrySet()) { VisorClusterNode node = nodesInfo.get(entry.getKey()); log("Node=" + node.getConsistentId()); - log(" addresses " + U.addressesAsString(node.getAddresses(),node.getHostNames())); + log(" addresses " + U.addressesAsString(node.getAddresses(), node.getHostNames())); - for(String fileName: entry.getValue()) + for (String fileName : entry.getValue()) log(" " + fileName); nl(); } - for(Map.Entry<String, Exception> entry: failRes.entrySet()) { + for (Map.Entry<String, Exception> entry : failRes.entrySet()) { VisorClusterNode node = nodesInfo.get(entry.getKey()); log("Node=" + node.getConsistentId()); - log(" addresses " + U.addressesAsString(node.getAddresses(),node.getHostNames())); + log(" addresses " + U.addressesAsString(node.getAddresses(), node.getHostNames())); log(" failed with error: " + entry.getValue().getMessage()); nl(); } @@ -1237,19 +1215,19 @@ public class CommandHandler { Map<String, Exception> errors = taskRes.exceptions(); Map<String, VisorClusterNode> nodesInfo = taskRes.getNodesInfo(); - for(Map.Entry<String, Collection<String>> entry: res.entrySet()) { + for (Map.Entry<String, Collection<String>> entry : res.entrySet()) { VisorClusterNode node = nodesInfo.get(entry.getKey()); log("Node=" + node.getConsistentId()); - log(" addresses " + U.addressesAsString(node.getAddresses(),node.getHostNames())); + log(" addresses " + U.addressesAsString(node.getAddresses(), node.getHostNames())); nl(); } - for(Map.Entry<String, Exception> entry: errors.entrySet()) { + for (Map.Entry<String, Exception> entry : errors.entrySet()) { VisorClusterNode node = nodesInfo.get(entry.getKey()); log("Node=" + node.getConsistentId()); - log(" addresses " + U.addressesAsString(node.getAddresses(),node.getHostNames())); + log(" addresses " + U.addressesAsString(node.getAddresses(), node.getHostNames())); log(" failed with error: " + entry.getValue().getMessage()); nl(); } @@ -1284,7 +1262,7 @@ public class CommandHandler { private void usage(String desc, Command cmd, String... args) { log(desc); log(" control.sh [--host HOST_OR_IP] [--port PORT] [--user USER] [--password PASSWORD] " + - " [--ping-interval PING_INTERVAL] [--ping-timeout PING_TIMEOUT] " + cmd.text() + String.join("", args)); + " [--ping-interval PING_INTERVAL] [--ping-timeout PING_TIMEOUT] " + cmd.text() + String.join("", args)); nl(); } @@ -1419,8 +1397,8 @@ public class CommandHandler { if (WAL_PRINT.equals(walAct) || WAL_DELETE.equals(walAct)) walArgs = (str = peekNextArg()) != null && !isCommandOrOption(str) - ? nextArg("Unexpected argument for " + WAL.text() + ": " + walAct) - : ""; + ? nextArg("Unexpected argument for " + WAL.text() + ": " + walAct) + : ""; else throw new IllegalArgumentException("Unexpected action " + walAct + " for " + WAL.text()); @@ -1499,7 +1477,7 @@ public class CommandHandler { throw new IllegalArgumentException("Both user and password should be specified"); return new Arguments(cmd, host, port, user, pwd, baselineAct, baselineArgs, txArgs, cacheArgs, walAct, walArgs, - pingTimeout, pingInterval, autoConfirmation); + pingTimeout, pingInterval, autoConfirmation); } /** @@ -1529,9 +1507,18 @@ public class CommandHandler { break; case IDLE_VERIFY: - if (hasNextCacheArg()) - parseCacheNames(nextArg(""), cacheArgs); + int idleVerifyArgsCnt = 3; + + while (hasNextCacheArg() && idleVerifyArgsCnt-- > 0) { + String nextArg = nextArg(""); + if (CMD_DUMP.equals(nextArg)) + cacheArgs.dump(true); + else if (CMD_SKIP_ZEROS.equals(nextArg)) + cacheArgs.skipZeros(true); + else + parseCacheNames(nextArg, cacheArgs); + } break; case CONTENTION: @@ -1709,7 +1696,7 @@ public class CommandHandler { case TX_LIMIT: nextArg(""); - limit = (int) nextLongArg(TX_LIMIT); + limit = (int)nextLongArg(TX_LIMIT); break; case TX_ORDER: @@ -1746,7 +1733,7 @@ public class CommandHandler { case TX_SIZE: nextArg(""); - size = (int) nextLongArg(TX_SIZE); + size = (int)nextLongArg(TX_SIZE); break; case TX_LABEL: @@ -1807,9 +1794,9 @@ public class CommandHandler { } /** - * Check if raw arg is command or option. + * Check if raw arg is command or option. * - * @return {@code true} If raw arg is command, overwise {@code false}. + * @return {@code true} If raw arg is command, overwise {@code false}. */ private boolean isCommandOrOption(String raw) { return raw != null && raw.contains("--"); @@ -1843,11 +1830,11 @@ public class CommandHandler { "[minSize SIZE] [label PATTERN_REGEX] [servers|clients] " + "[nodes consistentId1[,consistentId2,....,consistentIdN] [limit NUMBER] [order DURATION|SIZE|", CMD_TX_ORDER_START_TIME, "] [kill] [" + CMD_AUTO_CONFIRMATION + "]"); - if(enableExperimental) { + if (enableExperimental) { usage(" Print absolute paths of unused archived wal segments on each node:", WAL, - " print [consistentId1,consistentId2,....,consistentIdN]"); + " print [consistentId1,consistentId2,....,consistentIdN]"); usage(" Delete unused archived wal segments on each node:", WAL, - " delete [consistentId1,consistentId2,....,consistentIdN] [" + CMD_AUTO_CONFIRMATION + "]"); + " delete [consistentId1,consistentId2,....,consistentIdN] [" + CMD_AUTO_CONFIRMATION + "]"); } log(" View caches information in a cluster. For more details type:"); @@ -1962,6 +1949,7 @@ public class CommandHandler { /** * Used for tests. + * * @return Last operation result; */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java index 1411b2a..353a63c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java @@ -55,6 +55,12 @@ public class CacheArguments { /** Cache view command. */ private @Nullable VisorViewCacheCmd cacheCmd; + /** Calculate partition hash and print into standard output. */ + private boolean dump; + + /** Skip zeros partitions. */ + private boolean skipZeros; + /** * @return Command. */ @@ -194,4 +200,32 @@ public class CacheArguments { public void checkThrough(int checkThrough) { this.checkThrough = checkThrough; } + + /** + * @return Calculate partition hash and print into standard output. + */ + public boolean dump() { + return dump; + } + + /** + * @param dump Calculate partition hash and print into standard output. + */ + public void dump(boolean dump) { + this.dump = dump; + } + + /** + * @return Skip zeros partitions(size == 0) in result. + */ + public boolean isSkipZeros() { + return skipZeros; + } + + /** + * @param skipZeros Skip zeros partitions. + */ + public void skipZeros(boolean skipZeros) { + this.skipZeros = skipZeros; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyDumpResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyDumpResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyDumpResult.java new file mode 100644 index 0000000..6a74405 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyDumpResult.java @@ -0,0 +1,73 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.verify; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorDataTransferObject; + +/** + * Encapsulates result of {@link VerifyBackupPartitionsDumpTask}. + */ +public class IdleVerifyDumpResult extends VisorDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** Cluster hashes. */ + private Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes; + + /** + * @param clusterHashes Cluster hashes. + */ + public IdleVerifyDumpResult(Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes) { + this.clusterHashes = clusterHashes; + } + + /** + * Default constructor for Externalizable. + */ + public IdleVerifyDumpResult() { + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeMap(out, clusterHashes); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, + ObjectInput in) throws IOException, ClassNotFoundException { + clusterHashes = U.readLinkedMap(in); + } + + /** + * @return Cluster hashes. + */ + public Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes() { + return clusterHashes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IdleVerifyDumpResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java index d5815cd..c31d662 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.ignite.internal.processors.cache.verify; import java.io.IOException; @@ -21,6 +21,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -106,6 +107,60 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject { return !F.isEmpty(hashConflicts()) || !F.isEmpty(counterConflicts()); } + /** + * Print formatted result to given printer. + * + * @param printer Consumer for handle formatted result. + */ + public void print(Consumer<String> printer) { + if (!hasConflicts()) + printer.accept("idle_verify check has finished, no conflicts have been found.\n"); + else { + int cntrConflictsSize = counterConflicts().size(); + int hashConflictsSize = hashConflicts().size(); + + printer.accept("idle_verify check has finished, found " + (cntrConflictsSize + hashConflictsSize) + + " conflict partitions: [counterConflicts=" + cntrConflictsSize + ", hashConflicts=" + + hashConflictsSize + "]\n"); + + if (!F.isEmpty(counterConflicts())) { + printer.accept("Update counter conflicts:\n"); + + for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : counterConflicts().entrySet()) { + printer.accept("Conflict partition: " + entry.getKey() + "\n"); + + printer.accept("Partition instances: " + entry.getValue() + "\n"); + } + + printer.accept("\n"); + } + + if (!F.isEmpty(hashConflicts())) { + printer.accept("Hash conflicts:\n"); + + for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : hashConflicts().entrySet()) { + printer.accept("Conflict partition: " + entry.getKey() + "\n"); + + printer.accept("Partition instances: " + entry.getValue() + "\n"); + } + + printer.accept("\n"); + } + } + + if (!F.isEmpty(movingPartitions())) { + printer.accept("Verification was skipped for " + movingPartitions().size() + " MOVING partitions:\n"); + + for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : movingPartitions().entrySet()) { + printer.accept("Rebalancing partition: " + entry.getKey() + "\n"); + + printer.accept("Partition instances: " + entry.getValue() + "\n"); + } + + printer.accept("\n"); + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IdleVerifyResultV2.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java new file mode 100644 index 0000000..7bbd9f1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.verify; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeTaskAdapter; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Task for collection checksums primary and backup partitions of specified caches. <br> Argument: Set of cache names, + * 'null' will trigger verification for all caches. <br> Result: {@link IdleVerifyDumpResult} with all found partitions. + * <br> Works properly only on idle cluster - there may be false positive conflict reports if data in cluster is being + * concurrently updated. + */ +@GridInternal +public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdleVerifyTaskArg, String> { + /** */ + private static final long serialVersionUID = 0L; + + /** Time formatter for dump file name. */ + private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH-mm-ss_SSS"); + + /** Visible for testing. */ + public static final String IDLE_DUMP_FILE_PREMIX = "idle-dump-"; + + /** Delegate for map execution */ + private final VerifyBackupPartitionsTaskV2 delegate = new VerifyBackupPartitionsTaskV2(); + + /** */ + private VisorIdleVerifyDumpTaskArg taskArg; + + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map( + List<ClusterNode> subgrid, VisorIdleVerifyTaskArg arg) throws IgniteException { + if (arg instanceof VisorIdleVerifyDumpTaskArg) + taskArg = (VisorIdleVerifyDumpTaskArg)arg; + + return delegate.map(subgrid, arg); + } + + /** {@inheritDoc} */ + @Nullable @Override public String reduce(List<ComputeJobResult> results) + throws IgniteException { + Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new TreeMap<>(buildPartitionKeyComparator()); + + for (ComputeJobResult res : results) { + Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes = res.getData(); + + for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) { + clusterHashes + .computeIfAbsent(e.getKey(), k -> new ArrayList<>()) + .add(e.getValue()); + } + } + + Comparator<PartitionHashRecordV2> recordComp = buildRecordComparator().reversed(); + + Map<PartitionKeyV2, List<PartitionHashRecordV2>> partitions = new LinkedHashMap<>(); + + int skippedRecords = 0; + + for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : clusterHashes.entrySet()) { + if (needToAdd(entry.getValue())) { + entry.getValue().sort(recordComp); + + partitions.put(entry.getKey(), entry.getValue()); + } + else + skippedRecords++; + } + + return writeHashes(partitions, delegate.reduce(results), skippedRecords); + } + + /** + * Checking conditions for adding given record to result. + * + * @param records records to check. + * @return {@code true} if this records should be add to result and {@code false} otherwise. + */ + private boolean needToAdd(List<PartitionHashRecordV2> records) { + if (records.isEmpty() || (taskArg != null && !taskArg.isSkipZeros())) + return true; + + PartitionHashRecordV2 record = records.get(0); + + if (record.updateCounter() != 0 || record.size() != 0) + return true; + + int firstHash = record.partitionHash(); + + for (int i = 1; i < records.size(); i++) { + record = records.get(i); + + if (record.partitionHash() != firstHash || record.updateCounter() != 0 || record.size() != 0) + return true; + } + + return false; + } + + /** + * @param partitions Dump result. + * @return Path where results are written. + * @throws IgniteException If failed to write the file. + */ + private String writeHashes( + Map<PartitionKeyV2, List<PartitionHashRecordV2>> partitions, + IdleVerifyResultV2 conflictRes, + int skippedRecords + ) throws IgniteException { + File workDir = ignite.configuration().getWorkDirectory() == null + ? new File("/tmp") + : new File(ignite.configuration().getWorkDirectory()); + + File out = new File(workDir, IDLE_DUMP_FILE_PREMIX + LocalDateTime.now().format(TIME_FORMATTER) + ".txt"); + + ignite.log().info("IdleVerifyDumpTask will write output to " + out.getAbsolutePath()); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(out))) { + try { + + writer.write("idle_verify check has finished, found " + partitions.size() + " partitions\n"); + + if (skippedRecords > 0) + writer.write(skippedRecords + " partitions was skipped\n"); + + if (!F.isEmpty(partitions)) { + writer.write("Cluster partitions:\n"); + + for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : partitions.entrySet()) { + writer.write("Partition: " + entry.getKey() + "\n"); + + writer.write("Partition instances: " + entry.getValue() + "\n"); + } + + writer.write("\n\n-----------------------------------\n\n"); + + conflictRes.print(str -> { + try { + writer.write(str); + } + catch (IOException e) { + throw new IgniteException("Failed to write partitions conflict.", e); + } + }); + } + } + finally { + writer.flush(); + } + + ignite.log().info("IdleVerifyDumpTask successfully written dump to '" + out.getAbsolutePath() + "'"); + } + catch (IOException | IgniteException e) { + ignite.log().error("Failed to write dump file: " + out.getAbsolutePath(), e); + + throw new IgniteException(e); + } + + return out.getAbsolutePath(); + } + + /** + * @return Comparator for {@link PartitionHashRecordV2}. + */ + @NotNull private Comparator<PartitionHashRecordV2> buildRecordComparator() { + return (o1, o2) -> { + int compare = Boolean.compare(o1.isPrimary(), o2.isPrimary()); + + if (compare != 0) + return compare; + + return o1.consistentId().toString().compareTo(o2.consistentId().toString()); + }; + } + + /** + * @return Comparator for {@link PartitionKeyV2}. + */ + @NotNull private Comparator<PartitionKeyV2> buildPartitionKeyComparator() { + return (o1, o2) -> { + int compare = Integer.compare(o1.groupId(), o2.groupId()); + + if (compare != 0) + return compare; + + return Integer.compare(o1.partitionId(), o2.partitionId()); + }; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTask.java new file mode 100644 index 0000000..8f34c68 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.verify; + +import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsDumpTask; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorOneNodeTask; + +/** + * Task to verify checksums of backup partitions and return all collected information. + */ +@GridInternal +public class VisorIdleVerifyDumpTask extends VisorOneNodeTask<VisorIdleVerifyTaskArg, String> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob<VisorIdleVerifyTaskArg, String> job(VisorIdleVerifyTaskArg arg) { + return new VisorIdleVerifyJob<>(arg, debug, VerifyBackupPartitionsDumpTask.class); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java new file mode 100644 index 0000000..6316c24 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.verify; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Set; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Arguments for {@link VisorIdleVerifyDumpTask}. + */ +public class VisorIdleVerifyDumpTaskArg extends VisorIdleVerifyTaskArg { + /** */ + private static final long serialVersionUID = 0L; + /** */ + private boolean skipZeros; + + /** + * Default constructor. + */ + public VisorIdleVerifyDumpTaskArg() { + } + + /** + * @param caches Caches. + * @param skipZeros Skip zeros partitions. + */ + public VisorIdleVerifyDumpTaskArg(Set<String> caches, boolean skipZeros) { + super(caches); + this.skipZeros = skipZeros; + } + + /** + * @return Skip zeros partitions. + */ + public boolean isSkipZeros() { + return skipZeros; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + super.writeExternalData(out); + out.writeBoolean(skipZeros); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternalData(protoVer, in); + skipZeros = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorIdleVerifyDumpTaskArg.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java new file mode 100644 index 0000000..a8dc697 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.verify; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.compute.ComputeJobContext; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.ComputeTaskFuture; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.resources.JobContextResource; + +/** + * + */ +class VisorIdleVerifyJob<ResultT> extends VisorJob<VisorIdleVerifyTaskArg, ResultT> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private ComputeTaskFuture<ResultT> fut; + + /** Auto-inject job context. */ + @JobContextResource + protected transient ComputeJobContext jobCtx; + + /** Task class for execution */ + private final Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls; + + /** + * @param arg Argument. + * @param debug Debug. + * @param taskCls Task class for execution. + */ + VisorIdleVerifyJob(VisorIdleVerifyTaskArg arg, boolean debug, + Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls) { + super(arg, debug); + this.taskCls = taskCls; + } + + /** {@inheritDoc} */ + @Override protected ResultT run(VisorIdleVerifyTaskArg arg) throws IgniteException { + if (fut == null) { + fut = ignite.compute().executeAsync(taskCls, arg); + + if (!fut.isDone()) { + jobCtx.holdcc(); + + fut.listen(new IgniteInClosure<IgniteFuture<ResultT>>() { + @Override public void apply(IgniteFuture<ResultT> f) { + jobCtx.callcc(); + } + }); + + return null; + } + } + + return fut.get(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorIdleVerifyJob.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 02e2e78..d4bbf0b 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1138,6 +1138,7 @@ org.apache.ignite.internal.processors.cache.verify.CollectConflictPartitionKeysT org.apache.ignite.internal.processors.cache.verify.ContentionClosure org.apache.ignite.internal.processors.cache.verify.ContentionInfo org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2 +org.apache.ignite.internal.processors.cache.verify.IdleVerifyDumpResult org.apache.ignite.internal.processors.cache.verify.PartitionEntryHashRecord org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord org.apache.ignite.internal.processors.cache.verify.PartitionKey @@ -2159,6 +2160,7 @@ org.apache.ignite.internal.visor.verify.VisorIdleVerifyTask org.apache.ignite.internal.visor.verify.VisorIdleVerifyTask$VisorIdleVerifyJob org.apache.ignite.internal.visor.verify.VisorIdleVerifyTask$VisorIdleVerifyJob$1 org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg +org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskResult org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskArg http://git-wip-us.apache.org/repos/asf/ignite/blob/8309cef7/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index c37b17c..f612a96 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -17,9 +17,16 @@ package org.apache.ignite.util; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -33,9 +40,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCache; @@ -59,18 +65,17 @@ import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheFuture; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; @@ -89,12 +94,16 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionRollbackException; import org.apache.ignite.transactions.TransactionTimeoutException; +import org.jetbrains.annotations.NotNull; +import static java.nio.file.Files.delete; +import static java.nio.file.Files.newDirectoryStream; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; +import static org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsDumpTask.IDLE_DUMP_FILE_PREMIX; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; @@ -139,6 +148,15 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { cleanPersistenceDir(); + //delete idle-verify dump files. + try (DirectoryStream<Path> files = newDirectoryStream( + Paths.get(U.defaultWorkDirectory()), + entry -> entry.toFile().getName().startsWith(IDLE_DUMP_FILE_PREMIX) + )) { + for (Path path : files) + delete(path); + } + System.clearProperty(IGNITE_ENABLE_EXPERIMENTAL_COMMAND); System.setOut(sysOut); @@ -505,14 +523,14 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { validate(h, map -> { VisorTxTaskResult res = map.get(grid(0).localNode()); - assertTrue(res.getInfos().get(0).getSize() >= res.getInfos().get(1).getSize()); + assertTrue(res.getInfos().get(0).getSize() >= res.getInfos().get(1).getSize()); }, "--tx", "order", "SIZE"); // test order by duration. validate(h, map -> { VisorTxTaskResult res = map.get(grid(0).localNode()); - assertTrue(res.getInfos().get(0).getDuration() >= res.getInfos().get(1).getDuration()); + assertTrue(res.getInfos().get(0).getDuration() >= res.getInfos().get(1).getDuration()); }, "--tx", "order", "DURATION"); // test order by start_time. @@ -585,7 +603,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { GridNearTxLocal clientTx = null; - try(Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 2000, 1)) { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 2000, 1)) { clientTx = ((TransactionProxyImpl)tx).tx(); client.cache(DEFAULT_CACHE_NAME).put(0L, 0L); @@ -716,7 +734,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { primSpi.waitForBlocked(clients.length); // Unblock only finish messages from clients from 2 to 4. - primSpi.stopBlock(true, new IgnitePredicate<T2<ClusterNode,GridIoMessage>>() { + primSpi.stopBlock(true, new IgnitePredicate<T2<ClusterNode, GridIoMessage>>() { @Override public boolean apply(T2<ClusterNode, GridIoMessage> objects) { GridIoMessage iom = objects.get2(); @@ -921,6 +939,141 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { } /** + * Tests that idle verify print partitions info. + * + * @throws Exception If failed. + */ + public void testCacheIdleVerifyDump() throws Exception { + IgniteEx ignite = (IgniteEx)startGrids(3); + + ignite.cluster().active(true); + + int parts = 32; + + IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>() + .setAffinity(new RendezvousAffinityFunction(false, parts)) + .setBackups(1) + .setName(DEFAULT_CACHE_NAME)); + + ignite.createCache(new CacheConfiguration<>() + .setAffinity(new RendezvousAffinityFunction(false, parts)) + .setBackups(1) + .setName(DEFAULT_CACHE_NAME + "other")); + + injectTestSystemOut(); + + int keysCount = 20;//less than parts number for ability to check skipZeros flag. + + for (int i = 0; i < keysCount; i++) + cache.put(i, i); + + assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump", DEFAULT_CACHE_NAME)); + + assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump", "--skipZeros", DEFAULT_CACHE_NAME)); + + Matcher fileNameMatcher = dumpFileNameMatcher(); + + if (fileNameMatcher.find()) { + String dumpWithZeros = new String(Files.readAllBytes(Paths.get(fileNameMatcher.group(1)))); + + assertTrue(dumpWithZeros.contains("idle_verify check has finished, found " + parts + " partitions")); + assertTrue(dumpWithZeros.contains("Partition: PartitionKeyV2 [grpId=1544803905, grpName=default, partId=0]")); + assertTrue(dumpWithZeros.contains("updateCntr=0, size=0, partHash=0")); + assertTrue(dumpWithZeros.contains("no conflicts have been found")); + + assertSort(parts, dumpWithZeros); + } + + if (fileNameMatcher.find()) { + String dumpWithoutZeros = new String(Files.readAllBytes(Paths.get(fileNameMatcher.group(1)))); + + assertTrue(dumpWithoutZeros.contains("idle_verify check has finished, found " + keysCount + " partitions")); + assertTrue(dumpWithoutZeros.contains((parts - keysCount) + " partitions was skipped")); + assertTrue(dumpWithoutZeros.contains("Partition: PartitionKeyV2 [grpId=1544803905, grpName=default, partId=")); + + assertFalse(dumpWithoutZeros.contains("updateCntr=0, size=0, partHash=0")); + + assertTrue(dumpWithoutZeros.contains("no conflicts have been found")); + + assertSort(keysCount, dumpWithoutZeros); + } + else + fail("Should be found both files"); + } + + /** + * Checking sorting of partitions. + * + * @param expectedPartsCount Expected parts count. + * @param output Output. + */ + private void assertSort(int expectedPartsCount, String output) { + Pattern partIdPattern = Pattern.compile(".*partId=([0-9]*)"); + Pattern primaryPattern = Pattern.compile("Partition instances: \\[PartitionHashRecordV2 \\[isPrimary=true"); + + Matcher partIdMatcher = partIdPattern.matcher(output); + Matcher primaryMatcher = primaryPattern.matcher(output); + + int i = 0; + + while (partIdMatcher.find()) { + assertEquals(i++, Integer.parseInt(partIdMatcher.group(1))); + assertTrue(primaryMatcher.find());//primary node should be first in every line + } + + assertEquals(expectedPartsCount, i); + } + + /** + * Tests that idle verify print partitions info. + * + * @throws Exception If failed. + */ + public void testCacheIdleVerifyDumpForCorruptedData() throws Exception { + IgniteEx ignite = (IgniteEx)startGrids(3); + + ignite.cluster().active(true); + + int parts = 32; + + IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>() + .setAffinity(new RendezvousAffinityFunction(false, parts)) + .setBackups(1) + .setName(DEFAULT_CACHE_NAME)); + + injectTestSystemOut(); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + GridCacheContext<Object, Object> cacheCtx = ignite.cachex(DEFAULT_CACHE_NAME).context(); + + corruptDataEntry(cacheCtx, 0, true, false); + + corruptDataEntry(cacheCtx, 0 + parts / 2, false, true); + + assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump")); + + Matcher fileNameMatcher = dumpFileNameMatcher(); + + if (fileNameMatcher.find()) { + String dumpWithConflicts = new String(Files.readAllBytes(Paths.get(fileNameMatcher.group(1)))); + + assertTrue(dumpWithConflicts.contains("found 2 conflict partitions: [counterConflicts=1, hashConflicts=1]")); + }else + fail("Should be found dump with conflicts"); + } + + /** + * @return Build matcher for dump file name. + */ + @NotNull private Matcher dumpFileNameMatcher() { + Pattern fileNamePattern = Pattern.compile(".*VisorIdleVerifyDumpTask successfully written output to '(.*)'"); + + return fileNamePattern.matcher(testOut.toString()); + } + + /** * @throws Exception If failed. */ public void testCacheIdleVerifyMovingParts() throws Exception { @@ -1288,7 +1441,8 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { /** */ private static class IncrementClosure implements EntryProcessor<Long, Long, Void> { /** {@inheritDoc} */ - @Override public Void process(MutableEntry<Long, Long> entry, Object... arguments) throws EntryProcessorException { + @Override public Void process(MutableEntry<Long, Long> entry, + Object... arguments) throws EntryProcessorException { entry.setValue(entry.exists() ? entry.getValue() + 1 : 0); return null;