Repository: ignite Updated Branches: refs/heads/master 10c2b10e6 -> 33b9611e7
IGNITE-9875 Optimized GridDhtPartitionsStateValidator - Fixes #4983. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33b9611e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33b9611e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33b9611e Branch: refs/heads/master Commit: 33b9611e72f03b2f6ec8c72600e7d42558a92339 Parents: 10c2b10 Author: Evgeny Stanilovskiy <[email protected]> Authored: Wed Oct 17 16:35:32 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Oct 17 16:45:51 2018 +0300 ---------------------------------------------------------------------- modules/benchmarks/pom.xml | 12 +- ...ridDhtPartitionsStateValidatorBenchmark.java | 168 +++++++++++++++++++ .../CachePartitionFullCountersMap.java | 21 --- .../GridDhtPartitionsSingleMessage.java | 29 ++++ .../GridDhtPartitionsStateValidator.java | 84 ++++++---- 5 files changed, 257 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/benchmarks/pom.xml ---------------------------------------------------------------------- diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml index 1ea984c..06e0e50 100644 --- a/modules/benchmarks/pom.xml +++ b/modules/benchmarks/pom.xml @@ -62,6 +62,16 @@ <version>${jmh.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> </dependencies> <build> @@ -131,4 +141,4 @@ </plugins> </pluginManagement> </build> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java new file mode 100644 index 0000000..151606d --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/misc/GridDhtPartitionsStateValidatorBenchmark.java @@ -0,0 +1,168 @@ +package org.apache.ignite.internal.benchmarks.jmh.misc; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark; +import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator; +import org.apache.ignite.internal.util.typedef.T2; +import org.jetbrains.annotations.Nullable; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; + +import static org.openjdk.jmh.annotations.Scope.Thread; + +/** */ +@State(Scope.Benchmark) +public class GridDhtPartitionsStateValidatorBenchmark extends JmhAbstractBenchmark { + /** */ + @State(Thread) + public static class Context { + /** */ + private final UUID localNodeId = UUID.randomUUID(); + + /** */ + private GridCacheSharedContext cctxMock; + + /** */ + private GridDhtPartitionTopology topologyMock; + + /** */ + private GridDhtPartitionsStateValidator validator; + + /** */ + private Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>(); + + /** */ + private UUID ignoreNode = UUID.randomUUID(); + + /** */ + private static final int NODES = 3; + + /** */ + private static final int PARTS = 100; + + /** + * @return Partition mock with specified {@code id}, {@code updateCounter} and {@code size}. + */ + private GridDhtLocalPartition partitionMock(int id, long updateCounter, long size) { + GridDhtLocalPartition partitionMock = Mockito.mock(GridDhtLocalPartition.class); + Mockito.when(partitionMock.id()).thenReturn(id); + Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter); + Mockito.when(partitionMock.fullSize()).thenReturn(size); + Mockito.when(partitionMock.state()).thenReturn(GridDhtPartitionState.OWNING); + return partitionMock; + } + + /** + * @param countersMap Update counters map. + * @param sizesMap Sizes map. + * @return Message with specified {@code countersMap} and {@code sizeMap}. + */ + private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) { + GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(); + if (countersMap != null) + msg.addPartitionUpdateCounters(0, countersMap); + if (sizesMap != null) + msg.addPartitionSizes(0, sizesMap); + return msg; + } + + /** */ + @Setup + public void setup() { + // Prepare mocks. + cctxMock = Mockito.mock(GridCacheSharedContext.class); + Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId); + + topologyMock = Mockito.mock(GridDhtPartitionTopology.class); + Mockito.when(topologyMock.partitionState(Matchers.any(), Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING); + Mockito.when(topologyMock.groupId()).thenReturn(0); + + Mockito.when(topologyMock.partitions()).thenReturn(PARTS); + + List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(); + + Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>(); + + Map<Integer, Long> cacheSizesMap = new HashMap<>(); + + IntStream.range(0, PARTS).forEach(k -> { localPartitions.add(partitionMock(k, k + 1, k + 1)); + long us = k > 20 && k <= 30 ? 0 :k + 2L; + updateCountersMap.put(k, new T2<>(k + 2L, us)); + cacheSizesMap.put(k, us); }); + + Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions); + Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions); + + // Form single messages map. + Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>(); + + for (int n = 0; n < NODES; ++n) { + UUID remoteNode = UUID.randomUUID(); + + messages.put(remoteNode, from(updateCountersMap, cacheSizesMap)); + } + + messages.put(ignoreNode, from(updateCountersMap, cacheSizesMap)); + + validator = new GridDhtPartitionsStateValidator(cctxMock); + } + } + + /** */ + @Benchmark + public void testValidatePartitionsUpdateCounters(Context context) { + context.validator.validatePartitionsUpdateCounters(context.topologyMock, + context.messages, Sets.newHashSet(context.ignoreNode)); + } + + /** */ + @Benchmark + public void testValidatePartitionsSizes(Context context) { + context.validator.validatePartitionsSizes(context.topologyMock, context + .messages, Sets.newHashSet(context.ignoreNode)); + } + + /** + * Run benchmarks. + * + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + run(1); + } + + /** + * Run benchmark. + * + * @param threads Amount of threads. + * @throws Exception If failed. + */ + private static void run(int threads) throws Exception { + JmhIdeBenchmarkRunner.create() + .forks(1) + .threads(threads) + .warmupIterations(5) + .measurementIterations(10) + .benchmarks(GridDhtPartitionsStateValidatorBenchmark.class.getSimpleName()) + .jvmArguments("-XX:+UseG1GC", "-Xms4g", "-Xmx4g") + .run(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java index 2d5eec3..008c276 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java @@ -94,27 +94,6 @@ public class CachePartitionFullCountersMap implements Serializable { } /** - * Creates submap for provided partition IDs. - * - * @param parts Partition IDs. - * @return Partial counters map. - */ - public CachePartitionPartialCountersMap subMap(Set<Integer> parts) { - CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(parts.size()); - - for (int p = 0; p < updCntrs.length; p++) { - if (!parts.contains(p)) - continue; - - res.add(p, initialUpdCntrs[p], updCntrs[p]); - } - - assert res.size() == parts.size(); - - return res; - } - - /** * Clears full counters map. */ public void clear() { http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 088fb31..7dd34f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -246,6 +246,35 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** + * @param grpId Cache group ID. + * @param partsCnt Total cache partitions. + * @return Partition update counters. + */ + @SuppressWarnings("unchecked") + public CachePartitionPartialCountersMap partitionUpdateCountersUnsorted(int grpId, int partsCnt) { + Object res = partCntrs == null ? null : partCntrs.get(grpId); + + if (res == null) + return CachePartitionPartialCountersMap.EMPTY; + + if (res instanceof CachePartitionPartialCountersMap) + return (CachePartitionPartialCountersMap)res; + + assert res instanceof Map : res; + + Map<Integer, T2<Long, Long>> map = (Map<Integer, T2<Long, Long>>)res; + + CachePartitionPartialCountersMap partCounersMap = new CachePartitionPartialCountersMap(partsCnt); + + for (Map.Entry<Integer, T2<Long, Long>> e : map.entrySet()) + partCounersMap.add(e.getKey(), e.getValue().get1(), e.getValue().get2()); + + partCounersMap.trim(); + + return partCounersMap; + } + + /** * Adds partition sizes map for specified {@code grpId} to the current message. * * @param grpId Group id. http://git-wip-us.apache.org/repos/asf/ignite/blob/33b9611e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java index 544d453..63fe926 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionsStateValidator.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.topology; +import java.util.AbstractMap; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -33,7 +34,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Cac import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.Nullable; @@ -78,14 +78,16 @@ public class GridDhtPartitionsStateValidator { final Set<UUID> ignoringNodes = new HashSet<>(); // Ignore just joined nodes. - for (DiscoveryEvent evt : fut.events().events()) + for (DiscoveryEvent evt : fut.events().events()) { if (evt.type() == EVT_NODE_JOINED) ignoringNodes.add(evt.eventNode().id()); + } AffinityTopologyVersion topVer = fut.context().events().topologyVersion(); // Validate update counters. Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes); + if (!result.isEmpty()) throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result)); @@ -110,13 +112,16 @@ public class GridDhtPartitionsStateValidator { * * @param top Topology to validate. * @param nodeId Node which sent single message. - * @param singleMsg Single message. + * @param countersMap Counters map. + * @param sizesMap Sizes map. * @return Set of partition ids should be excluded from validation. */ - @Nullable private Set<Integer> shouldIgnore(GridDhtPartitionTopology top, UUID nodeId, GridDhtPartitionsSingleMessage singleMsg) { - CachePartitionPartialCountersMap countersMap = singleMsg.partitionUpdateCounters(top.groupId(), top.partitions()); - Map<Integer, Long> sizesMap = singleMsg.partitionSizes(top.groupId()); - + @Nullable private Set<Integer> shouldIgnore( + GridDhtPartitionTopology top, + UUID nodeId, + CachePartitionPartialCountersMap countersMap, + Map<Integer, Long> sizesMap + ) { Set<Integer> ignore = null; for (int i = 0; i < countersMap.size(); i++) { @@ -155,14 +160,14 @@ public class GridDhtPartitionsStateValidator { * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)). * If map is empty validation is successful. */ - public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters( - GridDhtPartitionTopology top, - Map<UUID, GridDhtPartitionsSingleMessage> messages, - Set<UUID> ignoringNodes - ) { + public Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters( + GridDhtPartitionTopology top, + Map<UUID, GridDhtPartitionsSingleMessage> messages, + Set<UUID> ignoringNodes + ) { Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>(); - Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>(); + Map<Integer, AbstractMap.Entry<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>(); // Populate counters statistics from local node partitions. for (GridDhtLocalPartition part : top.currentLocalPartitions()) { @@ -172,7 +177,7 @@ public class GridDhtPartitionsStateValidator { if (part.updateCounter() == 0 && part.fullSize() == 0) continue; - updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter())); + updateCountersAndNodesByPartitions.put(part.id(), new AbstractMap.SimpleEntry<>(cctx.localNodeId(), part.updateCounter())); } int partitions = top.partitions(); @@ -183,9 +188,13 @@ public class GridDhtPartitionsStateValidator { if (ignoringNodes.contains(nodeId)) continue; - CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions); + final GridDhtPartitionsSingleMessage message = e.getValue(); - Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue()); + CachePartitionPartialCountersMap countersMap = message.partitionUpdateCountersUnsorted(top.groupId(), partitions); + + Map<Integer, Long> sizesMap = message.partitionSizes(top.groupId()); + + Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, countersMap, sizesMap); for (int i = 0; i < countersMap.size(); i++) { int p = countersMap.partitionAt(i); @@ -211,14 +220,14 @@ public class GridDhtPartitionsStateValidator { * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)). * If map is empty validation is successful. */ - public Map<Integer, Map<UUID, Long>> validatePartitionsSizes( - GridDhtPartitionTopology top, - Map<UUID, GridDhtPartitionsSingleMessage> messages, - Set<UUID> ignoringNodes - ) { + public Map<Integer, Map<UUID, Long>> validatePartitionsSizes( + GridDhtPartitionTopology top, + Map<UUID, GridDhtPartitionsSingleMessage> messages, + Set<UUID> ignoringNodes + ) { Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>(); - Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>(); + Map<Integer, AbstractMap.Entry<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>(); // Populate sizes statistics from local node partitions. for (GridDhtLocalPartition part : top.currentLocalPartitions()) { @@ -228,7 +237,7 @@ public class GridDhtPartitionsStateValidator { if (part.updateCounter() == 0 && part.fullSize() == 0) continue; - sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize())); + sizesAndNodesByPartitions.put(part.id(), new AbstractMap.SimpleEntry<>(cctx.localNodeId(), part.fullSize())); } int partitions = top.partitions(); @@ -239,10 +248,13 @@ public class GridDhtPartitionsStateValidator { if (ignoringNodes.contains(nodeId)) continue; - CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions); - Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId()); + final GridDhtPartitionsSingleMessage message = e.getValue(); + + CachePartitionPartialCountersMap countersMap = message.partitionUpdateCountersUnsorted(top.groupId(), partitions); + + Map<Integer, Long> sizesMap = message.partitionSizes(top.groupId()); - Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue()); + Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, countersMap, sizesMap); for (int i = 0; i < countersMap.size(); i++) { int p = countersMap.partitionAt(i); @@ -269,20 +281,22 @@ public class GridDhtPartitionsStateValidator { * @param node Node id. * @param counter Counter value reported by {@code node}. */ - private void process(Map<Integer, Map<UUID, Long>> invalidPartitions, - Map<Integer, T2<UUID, Long>> countersAndNodes, - int part, - UUID node, - long counter) { - T2<UUID, Long> existingData = countersAndNodes.get(part); + private void process( + Map<Integer, Map<UUID, Long>> invalidPartitions, + Map<Integer, AbstractMap.Entry<UUID, Long>> countersAndNodes, + int part, + UUID node, + long counter + ) { + AbstractMap.Entry<UUID, Long> existingData = countersAndNodes.get(part); if (existingData == null) - countersAndNodes.put(part, new T2<>(node, counter)); + countersAndNodes.put(part, new AbstractMap.SimpleEntry<>(node, counter)); - if (existingData != null && counter != existingData.get2()) { + if (existingData != null && counter != existingData.getValue()) { if (!invalidPartitions.containsKey(part)) { Map<UUID, Long> map = new HashMap<>(); - map.put(existingData.get1(), existingData.get2()); + map.put(existingData.getKey(), existingData.getValue()); invalidPartitions.put(part, map); }
