IGNITE-4756 Print info about partition distribution to log Signed-off-by: Anton Vinogradov <a...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3eb1f5d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3eb1f5d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3eb1f5d Branch: refs/heads/ignite-8201 Commit: a3eb1f5d753a38c4019440e1bf39d00bc6136455 Parents: 0e73fa2 Author: Vyacheslav Daradur <daradu...@gmail.com> Authored: Wed Apr 11 14:41:29 2018 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Wed Apr 11 14:41:29 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 7 + .../affinity/GridAffinityAssignmentCache.java | 50 +++- .../AffinityDistributionLoggingTest.java | 268 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite5.java | 9 +- 4 files changed, 327 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 9da123e..04eb425 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -857,6 +857,13 @@ public final class IgniteSystemProperties { public static final String IGNITE_BPLUS_TREE_LOCK_RETRIES = "IGNITE_BPLUS_TREE_LOCK_RETRIES"; /** + * The threshold of uneven distribution above which partition distribution will be logged. + * + * The default is '50', that means: warn about nodes with 50+% difference. + */ + public static final String IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD = "IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 18edd02..b1899e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -34,13 +34,14 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.AffinityCentralizedFunction; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.cluster.NodeOrderComparator; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.NodeOrderComparator; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cluster.BaselineTopology; @@ -53,7 +54,10 @@ import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD; +import static org.apache.ignite.IgniteSystemProperties.getFloat; import static org.apache.ignite.IgniteSystemProperties.getInteger; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; /** @@ -63,6 +67,9 @@ public class GridAffinityAssignmentCache { /** Cleanup history size. */ private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500); + /** Partition distribution. */ + private final float partDistribution = getFloat(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, 50f); + /** Group name if specified or cache name. */ private final String cacheOrGrpName; @@ -367,6 +374,9 @@ public class GridAffinityAssignmentCache { idealAssignment = assignment; + if (ctx.cache().cacheMode(cacheOrGrpName) == PARTITIONED) + printDistributionIfThresholdExceeded(assignment, sorted.size()); + if (hasBaseline) { baselineTopology = discoCache.state().baselineTopology(); assert baselineAssignment != null; @@ -418,6 +428,44 @@ public class GridAffinityAssignmentCache { } /** + * Calculates and logs partitions distribution if threshold of uneven distribution {@link #partDistribution} is exceeded. + * + * @param assignments Assignments to calculate partitions distribution. + * @param nodes Affinity nodes number. + * @see IgniteSystemProperties#IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD + */ + private void printDistributionIfThresholdExceeded(List<List<ClusterNode>> assignments, int nodes) { + int locPrimaryCnt = 0; + int locBackupCnt = 0; + + for (List<ClusterNode> assignment : assignments) { + for (int i = 0; i < assignment.size(); i++) { + ClusterNode node = assignment.get(i); + + if (node.isLocal()) { + if (i == 0) + locPrimaryCnt++; + else + locBackupCnt++; + } + } + } + + float expCnt = (float)partsCnt / nodes; + + float deltaPrimary = Math.abs(1 - (float)locPrimaryCnt / expCnt) * 100; + float deltaBackup = Math.abs(1 - (float)locBackupCnt / (expCnt * backups)) * 100; + + if (deltaPrimary > partDistribution || deltaBackup > partDistribution) { + log.info(String.format("Local node affinity assignment distribution is not ideal " + + "[cache=%s, expectedPrimary=%.2f, actualPrimary=%d, " + + "expectedBackups=%.2f, actualBackups=%d, warningThreshold=%.2f%%]", + cacheOrGrpName, expCnt, locPrimaryCnt, + expCnt * backups, locBackupCnt, partDistribution)); + } + } + + /** * Copies previous affinity assignment when discovery event does not cause affinity assignment changes * (e.g. client node joins on leaves). * http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java new file mode 100644 index 0000000..813c830 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.testframework.GridStringLogger; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; + +/** + * Tests of partitions distribution logging. + * + * Tests based on using of affinity function which provides an even distribution of partitions between nodes. + * + * @see EvenDistributionAffinityFunction + */ +public class AffinityDistributionLoggingTest extends GridCommonAbstractTest { + /** Pattern to test. */ + private static final String LOG_MESSAGE_PREFIX = "Local node affinity assignment distribution is not ideal "; + + /** Partitions number. */ + private int parts = 0; + + /** Nodes number. */ + private int nodes = 0; + + /** Backups number. */ + private int backups = 0; + + /** For storing original value of system property. */ + private String tempProp; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + tempProp = System.getProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + if (tempProp != null) + System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, tempProp); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + System.clearProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(backups); + cacheCfg.setAffinity(new EvenDistributionAffinityFunction(parts)); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * @throws Exception In case of an error. + */ + public void test2PartitionsIdealDistributionIsNotLogged() throws Exception { + System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "0"); + + nodes = 2; + parts = 2; + backups = 1; + + String testsLog = runAndGetExchangeLog(); + + assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX)); + } + + /** + * @throws Exception In case of an error. + */ + public void test120PartitionsIdeadDistributionIsNotLogged() throws Exception { + System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "0.0"); + + nodes = 3; + parts = 120; + backups = 2; + + String testsLog = runAndGetExchangeLog(); + + assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX)); + } + + /** + * @throws Exception In case of an error. + */ + public void test5PartitionsNotIdealDistributionIsLogged() throws Exception { + System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "50.0"); + + nodes = 4; + parts = 5; + backups = 3; + + String testsLog = runAndGetExchangeLog(); + + assertTrue(testsLog.contains(LOG_MESSAGE_PREFIX)); + } + + /** + * @throws Exception In case of an error. + */ + public void test7PartitionsNotIdealDistributionSuppressedLogging() throws Exception { + System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "50.0"); + + nodes = 3; + parts = 7; + backups = 0; + + String testsLog = runAndGetExchangeLog(); + + assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX)); + } + + /** + * @throws Exception In case of an error. + */ + public void test5PartitionsNotIdealDistributionSuppressedLogging() throws Exception { + System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "65"); + + nodes = 4; + parts = 5; + backups = 3; + + String testsLog = runAndGetExchangeLog(); + + assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX)); + } + + /** + * Starts a specified number of Ignite nodes and log partition node exchange during a last node's startup. + * + * @return Log of latest partition map exchange. + * @throws Exception In case of an error. + */ + private String runAndGetExchangeLog() throws Exception { + assert nodes > 1; + + IgniteEx ignite = (IgniteEx)startGrids(nodes - 1); + + awaitPartitionMapExchange(); + + GridCacheProcessor proc = ignite.context().cache(); + + GridCacheContext cctx = proc.context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)); + + final GridStringLogger log = new GridStringLogger(false, this.log); + + GridAffinityAssignmentCache aff = GridTestUtils.getFieldValue(cctx.affinity(), "aff"); + + GridTestUtils.setFieldValue(aff, "log", log); + + startGrid(nodes); + + awaitPartitionMapExchange(); + + return log.toString(); + } + + /** + * Affinity function for a partitioned cache which provides even distribution partitions between nodes in cluster. + */ + private static class EvenDistributionAffinityFunction implements AffinityFunction { + /** */ + private static final long serialVersionUID = 0L; + + /** Partitions number. */ + private int parts; + + /** + * @param parts Number of partitions for one cache. + */ + private EvenDistributionAffinityFunction(int parts) { + this.parts = parts; + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return key.hashCode() % parts; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + List<ClusterNode> nodes = new ArrayList<>(affCtx.currentTopologySnapshot()); + + nodes.sort(Comparator.comparing(o -> o.<String>attribute(ATTR_IGNITE_INSTANCE_NAME))); + + List<List<ClusterNode>> res = new ArrayList<>(parts); + + for (int i = 0; i < parts; i++) { + Set<ClusterNode> n0 = new LinkedHashSet<>(); + + n0.add(nodes.get(i % nodes.size())); + + for (int j = 1; j <= affCtx.backups(); j++) + n0.add(nodes.get((i + j) % nodes.size())); + + res.add(new ArrayList<>(n0)); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 7c41e49..945a76c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.GridCacheAffinityBackupsSelfTest; import org.apache.ignite.IgniteCacheAffinitySelfTest; import org.apache.ignite.cache.affinity.AffinityClientNodeSelfTest; +import org.apache.ignite.cache.affinity.AffinityDistributionLoggingTest; import org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest; import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest; import org.apache.ignite.internal.GridCachePartitionExchangeManagerHistSizeTest; @@ -35,13 +36,7 @@ import org.apache.ignite.internal.processors.cache.EntryVersionConsistencyReadTh import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughEvictionsVariationsSuite; import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest; -import org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest; -import org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest; -import org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.PartitionsExchangeOnDiscoveryHistoryOverflowTest; -import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest; -import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest; -import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest; @@ -95,6 +90,8 @@ public class IgniteCacheTestSuite5 extends TestSuite { suite.addTestSuite(LocalAffinityFunctionTest.class); suite.addTestSuite(AffinityHistoryCleanupTest.class); + suite.addTestSuite(AffinityDistributionLoggingTest.class); + suite.addTestSuite(IgniteCacheAtomicProtocolTest.class); suite.addTestSuite(PartitionsExchangeOnDiscoveryHistoryOverflowTest.class);