This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6b0637e IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814) 6b0637e is described below commit 6b0637eb52c57a899274b6f6c0976520711100a3 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Thu Mar 10 16:08:46 2022 +0300 IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814) --- .../testsuites/IgniteControlUtilityTestSuite.java | 2 + ...GridCommandHandlerConsistencySensitiveTest.java | 28 ++++++ .../util/GridCommandHandlerConsistencyTest.java | 53 +++++++++++ .../events/CacheConsistencyViolationEvent.java | 21 ++++- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../GridNearReadRepairAbstractFuture.java | 103 +++++++++++++++++++-- .../near/consistency/GridNearReadRepairFuture.java | 20 ---- .../consistency/VisorConsistencyRepairTask.java | 57 ++++++++---- .../consistency/VisorConsistencyRepairTaskArg.java | 19 ---- .../consistency/VisorConsistencyStatusTask.java | 12 +-- .../cache/consistency/AbstractReadRepairTest.java | 15 ++- 11 files changed, 251 insertions(+), 81 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java index bfefe21..02153ae 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java @@ -27,6 +27,7 @@ import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest; import org.apache.ignite.util.GridCommandHandlerClusterByClassTest; import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest; import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest; +import org.apache.ignite.util.GridCommandHandlerConsistencySensitiveTest; import org.apache.ignite.util.GridCommandHandlerConsistencyTest; import org.apache.ignite.util.GridCommandHandlerDefragmentationTest; import org.apache.ignite.util.GridCommandHandlerIndexForceRebuildTest; @@ -93,6 +94,7 @@ import org.junit.runners.Suite; GridCommandHandlerConsistencyTest.class, GridCommandHandlerConsistencyBinaryTest.class, + GridCommandHandlerConsistencySensitiveTest.class, SystemViewCommandTest.class, MetricCommandTest.class, diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java new file mode 100644 index 0000000..64e212c --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.testframework.junits.WithSystemProperty; + +/** + * + */ +@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "false") +public class GridCommandHandlerConsistencySensitiveTest extends GridCommandHandlerConsistencyTest { +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java index f3c8020..7465c33 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java @@ -37,7 +37,11 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.consistency.VisorConsistencyStatusTask; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -48,6 +52,7 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND; import static org.apache.ignite.testframework.GridTestUtils.assertContains; +import static org.apache.ignite.testframework.LogListener.matches; /** * @@ -67,6 +72,9 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster private static final int PARTITIONS = 32; /** */ + protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log); + + /** */ @Parameterized.Parameters(name = "strategy={0}") public static Iterable<Object[]> data() { List<Object[]> res = new ArrayList<>(); @@ -103,6 +111,8 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster cfg.setDataStorageConfiguration(null); + cfg.setGridLogger(listeningLog); + return cfg; } @@ -138,6 +148,27 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster injectTestSystemOut(); + LogListener lsnrUnmaskedKey = matches("Key: 0 (cache: ").build(); // Unmasked key. + LogListener lsnrMaskedKey = matches("Key: [HIDDEN_KEY#").build(); // Masked key. + LogListener lsnrMaskedVal = matches("Value: [HIDDEN_VALUE#").build(); // Masked value. + + listeningLog.registerListener(lsnrUnmaskedKey); + listeningLog.registerListener(lsnrMaskedKey); + listeningLog.registerListener(lsnrMaskedVal); + + List<LogListener> listeners = new ArrayList<>(); + + // It's unable to check just "Key:" count while https://issues.apache.org/jira/browse/IGNITE-15316 not fixed + if (S.includeSensitive()) { + for (int i = 0; i < PARTITIONS; i++) { + LogListener keyListener = matches("Key: " + i + " (cache: ").build(); + + listeningLog.registerListener(keyListener); + + listeners.add(keyListener); + } + } + assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify")); assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts.get()); @@ -146,13 +177,30 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster readRepair(brokenParts, txCacheName, fixesPerEntry); + if (S.includeSensitive()) { + for (LogListener listener : listeners) { + assertTrue(listener.check()); + + listener.reset(); + } + } + if (fixesPerEntry != null && fixesPerEntry > 0) assertEquals(PARTITIONS, brokenParts.get()); // Half fixed. readRepair(brokenParts, atomicCacheName, fixesPerEntry != null ? 0 : null); + if (S.includeSensitive()) { + for (LogListener listener : listeners) + assertTrue(listener.check()); + } + if (fixesPerEntry != null && fixesPerEntry > 0) assertEquals(PARTITIONS, brokenParts.get()); // Atomics still broken. + + assertEquals(S.includeSensitive(), lsnrUnmaskedKey.check()); + assertEquals(S.includeSensitive(), !lsnrMaskedKey.check()); + assertEquals(S.includeSensitive(), !lsnrMaskedVal.check()); } /** @@ -212,6 +260,8 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster ConsistencyCommand.PARTITION, String.valueOf(i), ConsistencyCommand.STRATEGY, strategy.toString())); + assertTrue(VisorConsistencyStatusTask.MAP.isEmpty()); + assertContains(log, testOut.toString(), "Cache not found"); } } @@ -225,6 +275,9 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster ConsistencyCommand.CACHE, cacheName, ConsistencyCommand.PARTITION, String.valueOf(i), ConsistencyCommand.STRATEGY, strategy.toString())); + + assertTrue(VisorConsistencyStatusTask.MAP.isEmpty()); + assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND); assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : "")); diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java index d02bf6e..bf348c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java @@ -68,7 +68,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter { private static final long serialVersionUID = 0L; /** Represents original values of entries.*/ - private final Map<Object, Map<ClusterNode, EntryInfo>> entries; + private final Map<Object, EntriesInfo> entries; /** Fixed entries. */ private final Map<Object, Object> fixed; @@ -92,7 +92,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter { String cacheName, ClusterNode node, String msg, - Map<Object, Map<ClusterNode, EntryInfo>> entries, + Map<Object, EntriesInfo> entries, Map<Object, Object> fixed, ReadRepairStrategy strategy) { super(node, msg, EVT_CONSISTENCY_VIOLATION); @@ -108,7 +108,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter { * * @return Collection of original entries. */ - public Map<Object, Map<ClusterNode, EntryInfo>> getEntries() { + public Map<Object, EntriesInfo> getEntries() { return entries; } @@ -140,6 +140,21 @@ public class CacheConsistencyViolationEvent extends EventAdapter { } /** + * Inconsistent entries mapping. + */ + public interface EntriesInfo { + /** + * @return Entry's mapping. + */ + public Map<ClusterNode, EntryInfo> getMapping(); + + /** + * @return Entry's partition. + */ + public int partition(); + } + + /** * Inconsistent entry info. */ public interface EntryInfo { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index cbea2b4..3639a04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -2500,7 +2500,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou keepCacheObjects, deserializeBinary, false, - getRes, + null, getRes.version(), 0, 0, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java index c0cdb3d..fef2f04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java @@ -25,9 +25,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.cache.ReadRepairStrategy; import org.apache.ignite.cluster.ClusterNode; @@ -45,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.S; import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -335,7 +338,12 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter if (!evtMgr.isRecordable(EVT_CONSISTENCY_VIOLATION)) return; - Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entries = new HashMap<>(); + boolean includeSensitive = S.includeSensitive(); + + Map<KeyCacheObject, Object> sensitiveKeyMap = new HashMap<>(); + Map<ByteArrayWrapper, Object> sensitiveValMap = new HashMap<>(); + + Map<Object, CacheConsistencyViolationEvent.EntriesInfo> entries = new HashMap<>(); for (Map.Entry<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> pair : futs.entrySet()) { ClusterNode node = pair.getKey(); @@ -344,21 +352,24 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter for (KeyCacheObject key : fut.keys()) { if (inconsistentKeys.contains(key)) { - Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map = - entries.computeIfAbsent( - ctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false, null), k -> new HashMap<>()); + sensitiveKeyMap.computeIfAbsent(key, k -> includeSensitive + ? ctx.unwrapBinaryIfNeeded(k, !deserializeBinary, false, null) + : "[HIDDEN_KEY#" + UUID.randomUUID() + "]"); + + CacheConsistencyViolationEvent.EntriesInfo entriesInfo = + entries.computeIfAbsent(sensitiveKeyMap.get(key), k -> new EventEntriesInfo(key.partition())); EntryGetResult res = fut.result().get(key); CacheEntryVersion ver = res != null ? res.version() : null; - Object val = res != null ? ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null) : null; + Object val = sensitiveValue(includeSensitive, res, sensitiveValMap); boolean primary = primaries.get(key).equals(fut.affNode()); boolean correct = fixedEntries != null && ((fixedEntries.get(key) != null && fixedEntries.get(key).equals(res)) || (fixedEntries.get(key) == null && res == null)); - map.put(node, new EventEntryInfo(val, ver, primary, correct)); + entriesInfo.getMapping().put(node, new EventEntryInfo(val, ver, primary, correct)); } } } @@ -371,9 +382,8 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter fixed = new HashMap<>(); for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fixedEntries.entrySet()) { - Object key = ctx.unwrapBinaryIfNeeded(entry.getKey(), !deserializeBinary, false, null); - Object val = entry.getValue() != null ? - ctx.unwrapBinaryIfNeeded(entry.getValue().value(), !deserializeBinary, false, null) : null; + Object key = sensitiveKeyMap.get(entry.getKey()); + Object val = sensitiveValue(includeSensitive, entry.getValue(), sensitiveValMap); fixed.put(key, val); } @@ -391,6 +401,58 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter /** * */ + private Object sensitiveValue(boolean includeSensitive, EntryGetResult res, + Map<ByteArrayWrapper, Object> sensitiveValMap) { + if (res != null) { + CacheObject val = res.value(); + + try { + ByteArrayWrapper wrapped = new ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext())); + + return sensitiveValMap.computeIfAbsent(wrapped, w -> + includeSensitive ? + ctx.unwrapBinaryIfNeeded(val, !deserializeBinary, false, null) : + "[HIDDEN_VALUE#" + UUID.randomUUID() + "]"); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshall object.", e); + } + } + else + return null; + } + + /** + * + */ + private static final class EventEntriesInfo implements CacheConsistencyViolationEvent.EntriesInfo { + /** Mapping. */ + final Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping = new HashMap<>(); + + /** Partition. */ + final int partition; + + /** + * @param partition Partition. + */ + public EventEntriesInfo(int partition) { + this.partition = partition; + } + + /** {@inheritDoc} */ + @Override public Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> getMapping() { + return mapping; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return partition; + } + } + + /** + * + */ private static final class EventEntryInfo implements CacheConsistencyViolationEvent.EntryInfo { /** Value. */ final Object val; @@ -440,4 +502,27 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter return correct; } } + + /** + * + */ + protected static final class ByteArrayWrapper { + /** Array. */ + final byte[] arr; + + /** */ + public ByteArrayWrapper(byte[] arr) { + this.arr = arr; + } + + /** */ + @Override public boolean equals(Object o) { + return Arrays.equals(arr, ((ByteArrayWrapper)o).arr); + } + + /** */ + @Override public int hashCode() { + return Arrays.hashCode(arr); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java index 2f6c15b..f87e4c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java @@ -240,26 +240,6 @@ public class GridNearReadRepairFuture extends GridNearReadRepairAbstractFuture { */ public Map<KeyCacheObject, EntryGetResult> fixWithMajority(Collection<KeyCacheObject> inconsistentKeys) throws IgniteCheckedException { - /** */ - class ByteArrayWrapper { - final byte[] arr; - - /** */ - public ByteArrayWrapper(byte[] arr) { - this.arr = arr; - } - - /** */ - @Override public boolean equals(Object o) { - return Arrays.equals(arr, ((ByteArrayWrapper)o).arr); - } - - /** */ - @Override public int hashCode() { - return Arrays.hashCode(arr); - } - } - Set<KeyCacheObject> irreparableSet = new HashSet<>(inconsistentKeys.size()); Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java index 0948c30..7914da1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java @@ -117,7 +117,17 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon log.info("Consistency check started " + "[grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + ", strategy=" + strategy + "]"); - VisorConsistencyStatusTask.MAP.put(arg, "0/" + part.fullSize()); + StringBuilder sb = new StringBuilder(); + + sb.append("[node=").append(ignite.localNode()); + sb.append(", cacheGroup=").append(grpCtx.cacheOrGroupName()); + sb.append(", part=").append(p).append("]"); + + String statusKey = sb.toString(); + + if (VisorConsistencyStatusTask.MAP.putIfAbsent(statusKey, "0/" + part.fullSize()) != null) + throw new IllegalStateException("Consistency check already started " + + "[grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + "]"); long cnt = 0; long statusTs = 0; @@ -125,7 +135,7 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon part.reserve(); try { - IgnitePredicate<CacheConsistencyViolationEvent> lsnr = new CacheConsistencyViolationEventListener(); + IgnitePredicate<CacheConsistencyViolationEvent> lsnr = new CacheConsistencyViolationEventListener(cacheName); ignite.events().localListen(lsnr, EVT_CONSISTENCY_VIOLATION); @@ -169,7 +179,7 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon log.info("Consistency check progress [grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + ", checked=" + cnt + "/" + part.fullSize() + "]"); - VisorConsistencyStatusTask.MAP.put(arg, cnt + "/" + part.fullSize()); + VisorConsistencyStatusTask.MAP.put(statusKey, cnt + "/" + part.fullSize()); } } @@ -185,11 +195,11 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon finally { part.release(); - VisorConsistencyStatusTask.MAP.remove(arg); + VisorConsistencyStatusTask.MAP.remove(statusKey); } if (!evts.isEmpty()) - return processEvents(cctx, p, cnt); + return processEvents(p, cnt); else return NOTHING_FOUND + " [processed=" + cnt + "]\n"; } @@ -197,41 +207,43 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon /** * */ - private String processEvents(GridCacheContext<Object, Object> cctx, int part, long cnt) { + private String processEvents(int part, long cnt) { int found = 0; int fixed = 0; StringBuilder sb = new StringBuilder(); for (CacheConsistencyViolationEvent evt : evts) { - for (Map.Entry<?, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entry : evt.getEntries().entrySet()) { + for (Map.Entry<?, CacheConsistencyViolationEvent.EntriesInfo> entry : evt.getEntries().entrySet()) { Object key = entry.getKey(); - if (cctx.affinity().partition(key) != part) + if (entry.getValue().partition() != part) continue; // Skipping other partitions results, which are generated by concurrent executions. found++; sb.append("Key: ").append(key) .append(" (cache: ").append(evt.getCacheName()) - .append(", strategy: ").append(evt.getStrategy()).append(")").append("\n"); + .append(", partition: ").append(entry.getValue().partition()) + .append(", strategy: ").append(evt.getStrategy()) + .append(")").append("\n"); if (evt.getFixedEntries().containsKey(key)) sb.append(" Fixed: ").append(evt.getFixedEntries().get(key)).append("\n"); - for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping : entry.getValue().entrySet()) { + for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping : + entry.getValue().getMapping().entrySet()) { ClusterNode node = mapping.getKey(); CacheConsistencyViolationEvent.EntryInfo info = mapping.getValue(); sb.append(" Node: ").append(node).append("\n") .append(" Value: ").append(info.getValue()).append("\n") - .append(" Version: ").append(info.getVersion()).append("\n"); + .append(" Version: ").append(info.getVersion()).append("\n") + .append(" On primary: ").append(info.isPrimary()).append("\n"); if (info.getVersion() != null) sb.append(" Other cluster version: ").append(info.getVersion().otherClusterVersion()).append("\n"); - sb.append(" On primary: ").append(info.isPrimary()).append("\n"); - if (info.isCorrect()) sb.append(" Considered as a CORRECT value!").append("\n"); } @@ -259,13 +271,26 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon /** Serial version uid. */ private static final long serialVersionUID = 0L; + /** Cache name. */ + private final String cacheName; + + /** + * @param name Name. + */ + private CacheConsistencyViolationEventListener(String name) { + cacheName = name; + } + /** * {@inheritDoc} */ - @Override public boolean apply(CacheConsistencyViolationEvent e) { - assert e instanceof CacheConsistencyViolationEvent; + @Override public boolean apply(CacheConsistencyViolationEvent evt) { + assert evt instanceof CacheConsistencyViolationEvent; + + if (!evt.getCacheName().equals(cacheName)) + return true; // Skipping other caches results, which are generated by concurrent executions. - evts.add(e); + evts.add(evt); return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java index 50c8b3f..6008d3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.consistency; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.Objects; import org.apache.ignite.cache.ReadRepairStrategy; import org.apache.ignite.internal.dto.IgniteDataTransferObject; import org.apache.ignite.internal.util.typedef.internal.U; @@ -92,22 +91,4 @@ public class VisorConsistencyRepairTaskArg extends IgniteDataTransferObject { public ReadRepairStrategy strategy() { return strategy; } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - VisorConsistencyRepairTaskArg arg = (VisorConsistencyRepairTaskArg)o; - - return part == arg.part && Objects.equals(cacheName, arg.cacheName); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return Objects.hash(cacheName, part); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java index af001e6..60f43d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java @@ -37,7 +37,7 @@ public class VisorConsistencyStatusTask extends AbstractConsistencyTask<Void, St public static final String NOTHING_FOUND = "Consistency check/repair operations were NOT found."; /** Status map. */ - public static final ConcurrentHashMap<VisorConsistencyRepairTaskArg, String> MAP = new ConcurrentHashMap<>(); + public static final ConcurrentHashMap<String, String> MAP = new ConcurrentHashMap<>(); /** {@inheritDoc} */ @Override protected VisorJob<Void, String> job(Void arg) { @@ -80,13 +80,9 @@ public class VisorConsistencyStatusTask extends AbstractConsistencyTask<Void, St StringBuilder sb = new StringBuilder(); - for (Map.Entry<VisorConsistencyRepairTaskArg, String> entry : MAP.entrySet()) { - VisorConsistencyRepairTaskArg args = entry.getKey(); - String status = entry.getValue(); - - sb.append("\n Cache: ").append(args.cacheName()).append("\n") - .append(" Partition: ").append(args.part()).append("\n") - .append(" Status: ").append(status).append("\n"); + for (Map.Entry<String, String> entry : MAP.entrySet()) { + sb.append("\n Job: ").append(entry.getKey()).append("\n") + .append(" Status: ").append(entry.getValue()).append("\n"); } return sb.toString(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java index 72358b4..4e1dcec 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.ignite.Ignite; @@ -77,7 +78,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest { private static final ConcurrentLinkedDeque<CacheConsistencyViolationEvent> evtDeq = new ConcurrentLinkedDeque<>(); /** Key. */ - protected static int iterableKey; + private static final AtomicInteger iterableKey = new AtomicInteger(); /** Backups count. */ protected Integer backupsCount() { @@ -140,7 +141,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest { @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); - log.info("Checked " + iterableKey + " keys"); + log.info("Checked " + iterableKey.get() + " keys"); stopAllGrids(); @@ -204,7 +205,9 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest { assertEquals(atomicityMode() == TRANSACTIONAL ? data.strategy : ReadRepairStrategy.CHECK_ONLY, evt.getStrategy()); // Optimistic and read committed transactions produce per key fixes. - evtEntries.putAll(evt.getEntries()); + for (Map.Entry<Object, CacheConsistencyViolationEvent.EntriesInfo> entries : evt.getEntries().entrySet()) + evtEntries.put(entries.getKey(), entries.getValue().getMapping()); + evtFixed.putAll(evt.getFixedEntries()); } } @@ -277,9 +280,11 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest { try { for (int j = 0; j < cnt; j++) { - InconsistentMapping res = setDifferentValuesForSameKey(++iterableKey, misses, nulls, strategy); + int curKey = iterableKey.incrementAndGet(); + + InconsistentMapping res = setDifferentValuesForSameKey(curKey, misses, nulls, strategy); - results.put(iterableKey, res); + results.put(curKey, res); } for (Ignite node : G.allGrids()) { // Check that cache filled properly.