Repository: cassandra Updated Branches: refs/heads/trunk f42e235b1 -> c1a9a47df
Make randompartitioner work with new vnode allocation patch by Dikang Gu; reviewed by Branimir Lambov for CASSANDRA-12647 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1a9a47d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1a9a47d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1a9a47d Branch: refs/heads/trunk Commit: c1a9a47df292dbbde3c675c10d68043e7b212c28 Parents: f42e235 Author: Dikang Gu <dikan...@gmail.com> Authored: Wed Sep 14 23:04:14 2016 -0700 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Sep 19 16:36:34 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/dht/RandomPartitioner.java | 13 +++ .../ReplicationAwareTokenAllocatorTest.java | 84 +++++++++++++++----- 3 files changed, 76 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8e39d95..b625a58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) * Fix cassandra-stress graphing (CASSANDRA-12237) * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index c063be3..7c8f6ac 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -177,6 +177,19 @@ public class RandomPartitioner implements IPartitioner { return HEAP_SIZE; } + + public Token increaseSlightly() + { + return new BigIntegerToken(token.add(BigInteger.ONE)); + } + + public double size(Token next) + { + BigIntegerToken n = (BigIntegerToken) next; + BigInteger v = n.token.subtract(token); // Overflow acceptable and desired. + double d = Math.scalb(v.doubleValue(), -127); // Scale so that the full range is 1. + return d > 0.0 ? d : (d + 1.0); // Adjust for signed long, also making sure t.size(t) == 1. + } } public BigIntegerToken getToken(ByteBuffer key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java index 1b36c55..482e2ac 100644 --- a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java +++ b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java @@ -30,7 +30,9 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.junit.Test; import org.apache.cassandra.Util; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Token; public class ReplicationAwareTokenAllocatorTest @@ -489,10 +491,10 @@ public class ReplicationAwareTokenAllocatorTest } }; - Murmur3Partitioner partitioner = new Murmur3Partitioner(); Random seededRand = new Random(2); - private void random(Map<Token, Unit> map, TestReplicationStrategy rs, int unitCount, TokenCount tc, int perUnitCount) + private void random(Map<Token, Unit> map, TestReplicationStrategy rs, + int unitCount, TokenCount tc, int perUnitCount, IPartitioner partitioner) { System.out.format("\nRandom generation of %d units with %d tokens each\n", unitCount, perUnitCount); Random rand = seededRand; @@ -509,49 +511,82 @@ public class ReplicationAwareTokenAllocatorTest } @Test - public void testExistingCluster() + public void testExistingClusterWithRandomPartitioner() + { + testExistingCluster(new RandomPartitioner()); + } + + @Test + public void testExistingClusterWithMurmur3Partitioner() + { + testExistingCluster(new Murmur3Partitioner()); + } + + public void testExistingCluster(IPartitioner partitioner) { for (int rf = 1; rf <= 5; ++rf) { for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4) { - testExistingCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf)); - testExistingCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf)); + testExistingCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner); + testExistingCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner); if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1. for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 4 < TARGET_CLUSTER_SIZE; groupSize *= 4) { - testExistingCluster(perUnitCount, fixedTokenCount, new BalancedGroupReplicationStrategy(rf, groupSize)); - testExistingCluster(perUnitCount, varyingTokenCount, new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand)); + testExistingCluster(perUnitCount, fixedTokenCount, + new BalancedGroupReplicationStrategy(rf, groupSize), partitioner); + testExistingCluster(perUnitCount, varyingTokenCount, + new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand), + partitioner); } - testExistingCluster(perUnitCount, fixedTokenCount, new FixedGroupCountReplicationStrategy(rf, rf * 2)); + testExistingCluster(perUnitCount, fixedTokenCount, + new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner); } } } - public void testExistingCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs) + public void testExistingCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner) { System.out.println("Testing existing cluster, target " + perUnitCount + " vnodes, replication " + rs); final int targetClusterSize = TARGET_CLUSTER_SIZE; NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap(); - random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount); + random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, partitioner); ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner); grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false); grow(t, targetClusterSize, tc, perUnitCount, true); - loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount); + loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, partitioner); System.out.println(); } @Test - public void testNewCluster() + public void testNewClusterWithRandomPartitioner() { - Util.flakyTest(this::flakyTestNewCluster, + Util.flakyTest(this::flakyTestNewClusterWithRandomPartitioner, 5, "It tends to fail sometimes due to the random selection of the tokens in the first few nodes."); } - public void flakyTestNewCluster() + @Test + public void testNewClusterWithMurmur3Partitioner() + { + Util.flakyTest(this::flakyTestNewClusterWithMurmur3Partitioner, + 5, + "It tends to fail sometimes due to the random selection of the tokens in the first few nodes."); + } + + public void flakyTestNewClusterWithRandomPartitioner() + { + flakyTestNewCluster(new RandomPartitioner()); + } + + public void flakyTestNewClusterWithMurmur3Partitioner() + { + flakyTestNewCluster(new Murmur3Partitioner()); + } + + public void flakyTestNewCluster(IPartitioner partitioner) { // This test is flaky because the selection of the tokens for the first RF nodes (which is random, with an // uncontrolled seed) can sometimes cause a pathological situation where the algorithm will find a (close to) @@ -564,20 +599,24 @@ public class ReplicationAwareTokenAllocatorTest { for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4) { - testNewCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf)); - testNewCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf)); + testNewCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner); + testNewCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner); if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1. for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 8 < TARGET_CLUSTER_SIZE; groupSize *= 4) { - testNewCluster(perUnitCount, fixedTokenCount, new BalancedGroupReplicationStrategy(rf, groupSize)); - testNewCluster(perUnitCount, varyingTokenCount, new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand)); + testNewCluster(perUnitCount, fixedTokenCount, + new BalancedGroupReplicationStrategy(rf, groupSize), partitioner); + testNewCluster(perUnitCount, varyingTokenCount, + new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand), + partitioner); } - testNewCluster(perUnitCount, fixedTokenCount, new FixedGroupCountReplicationStrategy(rf, rf * 2)); + testNewCluster(perUnitCount, fixedTokenCount, + new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner); } } } - public void testNewCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs) + public void testNewCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner) { System.out.println("Testing new cluster, target " + perUnitCount + " vnodes, replication " + rs); final int targetClusterSize = TARGET_CLUSTER_SIZE; @@ -586,11 +625,12 @@ public class ReplicationAwareTokenAllocatorTest ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner); grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false); grow(t, targetClusterSize, tc, perUnitCount, true); - loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount); + loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount, partitioner); System.out.println(); } - private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int howMany, TokenCount tc, int perUnitCount) + private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int howMany, + TokenCount tc, int perUnitCount, IPartitioner partitioner) { int fullCount = t.unitCount(); System.out.format("Losing %d units. ", howMany);