Avoid blocking gossip during pending range calculation patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540 Branch: refs/heads/cassandra-3.X Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9 Parents: 7d2fdfe Author: Stefan Podkowinski <s.podkowin...@gmail.com> Authored: Fri Oct 21 11:34:53 2016 +0200 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Nov 18 17:29:14 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++-------- .../service/PendingRangeCalculatorService.java | 10 +- .../gms/PendingRangeCalculatorServiceTest.java | 133 ++++++++++++ 4 files changed, 267 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 54dc4b5..5a2e0ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.9 + * Avoid blocking gossip during pending range calculation (CASSANDRA-12281) * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792) * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901) * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index b06c9c8..aafd7f9 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -649,12 +649,7 @@ public class TokenMetadata lock.readLock().lock(); try { - TokenMetadata allLeftMetadata = cloneOnlyTokenMap(); - - for (InetAddress endpoint : leavingEndpoints) - allLeftMetadata.removeEndpoint(endpoint); - - return allLeftMetadata; + return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints); } finally { @@ -662,6 +657,14 @@ public class TokenMetadata } } + private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints) + { + for (InetAddress endpoint : leavingEndpoints) + allLeftMetadata.removeEndpoint(endpoint); + + return allLeftMetadata; + } + /** * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all * current leave, and move operations have finished. @@ -787,118 +790,154 @@ public class TokenMetadata */ public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) { - lock.readLock().lock(); - try + // avoid race between both branches - do not use a lock here as this will block any other unrelated operations! + synchronized (pendingRanges) { - PendingRangeMaps newPendingRanges = new PendingRangeMaps(); - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) { if (logger.isTraceEnabled()) logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - pendingRanges.put(keyspaceName, newPendingRanges); - return; + pendingRanges.put(keyspaceName, new PendingRangeMaps()); } + else + { + if (logger.isDebugEnabled()) + logger.debug("Starting pending range calculation for {}", keyspaceName); - Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); - - // Copy of metadata reflecting the situation after all leave operations are finished. - TokenMetadata allLeftMetadata = cloneAfterAllLeft(); + long startedAt = System.currentTimeMillis(); - // get all ranges that will be affected by leaving nodes - Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); - for (InetAddress endpoint : leavingEndpoints) - affectedRanges.addAll(addressRanges.get(endpoint)); + // create clone of current state + BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>(); + Set<InetAddress> leavingEndpoints = new HashSet<>(); + Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>(); + TokenMetadata metadata; - // for each of those ranges, find what new nodes will be responsible for the range when - // all leaving nodes are gone. - TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758 - for (Range<Token> range : affectedRanges) - { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints)) + lock.readLock().lock(); + try + { + bootstrapTokens.putAll(this.bootstrapTokens); + leavingEndpoints.addAll(this.leavingEndpoints); + movingEndpoints.addAll(this.movingEndpoints); + metadata = this.cloneOnlyTokenMap(); + } + finally { - newPendingRanges.addPendingRange(range, address); + lock.readLock().unlock(); } + + pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens, + leavingEndpoints, movingEndpoints)); + long took = System.currentTimeMillis() - startedAt; + + if (logger.isDebugEnabled()) + logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took); + if (logger.isTraceEnabled()) + logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges())); } + } + } + + /** + * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String) + */ + private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy, + TokenMetadata metadata, + BiMultiValMap<Token, InetAddress> bootstrapTokens, + Set<InetAddress> leavingEndpoints, + Set<Pair<Token, InetAddress>> movingEndpoints) + { + PendingRangeMaps newPendingRanges = new PendingRangeMaps(); + + Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata); - // At this stage newPendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. + // Copy of metadata reflecting the situation after all leave operations are finished. + TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints); - // For each of the bootstrapping nodes, simply add and remove them one by one to - // allLeftMetadata and check in between what their ranges would be. - Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse(); - for (InetAddress endpoint : bootstrapAddresses.keySet()) + // get all ranges that will be affected by leaving nodes + Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); + for (InetAddress endpoint : leavingEndpoints) + affectedRanges.addAll(addressRanges.get(endpoint)); + + // for each of those ranges, find what new nodes will be responsible for the range when + // all leaving nodes are gone. + for (Range<Token> range : affectedRanges) + { + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints)) { - Collection<Token> tokens = bootstrapAddresses.get(endpoint); + newPendingRanges.addPendingRange(range, address); + } + } - allLeftMetadata.updateNormalTokens(tokens, endpoint); - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - newPendingRanges.addPendingRange(range, endpoint); - } - allLeftMetadata.removeEndpoint(endpoint); + // At this stage newPendingRanges has been updated according to leave operations. We can + // now continue the calculation by checking bootstrapping nodes. + + // For each of the bootstrapping nodes, simply add and remove them one by one to + // allLeftMetadata and check in between what their ranges would be. + Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse(); + for (InetAddress endpoint : bootstrapAddresses.keySet()) + { + Collection<Token> tokens = bootstrapAddresses.get(endpoint); + + allLeftMetadata.updateNormalTokens(tokens, endpoint); + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + { + newPendingRanges.addPendingRange(range, endpoint); } + allLeftMetadata.removeEndpoint(endpoint); + } - // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving nodes. + // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. + // We can now finish the calculation by checking moving nodes. - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair<Token, InetAddress> moving : movingEndpoints) + // For each of the moving nodes, we do the same thing we did for bootstrapping: + // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. + for (Pair<Token, InetAddress> moving : movingEndpoints) + { + //Calculate all the ranges which will could be affected. This will include the ranges before and after the move. + Set<Range<Token>> moveAffectedRanges = new HashSet<>(); + InetAddress endpoint = moving.right; // address of the moving node + //Add ranges before the move + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) { - //Calculate all the ranges which will could be affected. This will include the ranges before and after the move. - Set<Range<Token>> moveAffectedRanges = new HashSet<>(); - InetAddress endpoint = moving.right; // address of the moving node - //Add ranges before the move - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - moveAffectedRanges.add(range); - } + moveAffectedRanges.add(range); + } - allLeftMetadata.updateNormalToken(moving.left, endpoint); - //Add ranges after the move - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - moveAffectedRanges.add(range); - } + allLeftMetadata.updateNormalToken(moving.left, endpoint); + //Add ranges after the move + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + { + moveAffectedRanges.add(range); + } - for(Range<Token> range : moveAffectedRanges) + for(Range<Token> range : moveAffectedRanges) + { + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints); + for(final InetAddress address : difference) { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints); - for(final InetAddress address : difference) - { - Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address); - Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address); - //We want to get rid of any ranges which the node is currently getting. - newRanges.removeAll(oldRanges); + Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address); + Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address); + //We want to get rid of any ranges which the node is currently getting. + newRanges.removeAll(oldRanges); - for(Range<Token> newRange : newRanges) + for(Range<Token> newRange : newRanges) + { + for(Range<Token> pendingRange : newRange.subtractAll(oldRanges)) { - for(Range<Token> pendingRange : newRange.subtractAll(oldRanges)) - { - newPendingRanges.addPendingRange(pendingRange, address); - } + newPendingRanges.addPendingRange(pendingRange, address); } } } - - allLeftMetadata.removeEndpoint(endpoint); } - pendingRanges.put(keyspaceName, newPendingRanges); - - if (logger.isTraceEnabled()) - logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges())); - } - finally - { - lock.readLock().unlock(); + allLeftMetadata.removeEndpoint(endpoint); } + + return newPendingRanges; } public Token getPredecessor(Token token) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index 806f6a5..116cede 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -26,6 +26,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -34,6 +35,9 @@ public class PendingRangeCalculatorService public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService(); private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class); + + // the executor will only run a single range calculation at a time while keeping at most one task queued in order + // to trigger an update only after the most recent state change and not for each update individually private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal"); @@ -58,9 +62,11 @@ public class PendingRangeCalculatorService try { long start = System.currentTimeMillis(); - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) + List<String> keyspaces = Schema.instance.getNonSystemKeyspaces(); + for (String keyspaceName : keyspaces) calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName); - logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start); + if (logger.isTraceEnabled()) + logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java new file mode 100644 index 0000000..507948c --- /dev/null +++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.StorageService; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +/** + * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281). + */ +@RunWith(BMUnitRunner.class) +public class PendingRangeCalculatorServiceTest +{ + static ReentrantLock calculationLock = new ReentrantLock(); + + @BeforeClass + public static void setUp() throws ConfigurationException + { + SchemaLoader.prepareServer(); + StorageService.instance.initServer(); + } + + @Test + @BMRule(name = "Block pending range calculation", + targetClass = "TokenMetadata", + targetMethod = "calculatePendingRanges", + targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges", + action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()") + public void testDelayedResponse() throws UnknownHostException, InterruptedException + { + final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2"); + final UUID otherHostId = UUID.randomUUID(); + + // introduce node for first major state change + Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false)); + + // acquire lock to block pending range calculation via byteman + calculationLock.lock(); + try + { + // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger + // the pending range calculation that will be blocked by our lock + Thread t1 = new Thread() + { + public void run() + { + Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true)); + } + }; + t1.start(); + + // busy-spin until t1 is blocked by lock + while (!calculationLock.hasQueuedThreads()) ; + + // trigger further state changes in case we don't want the blocked thread from the + // expensive range calculation to block us here as well + Thread t2 = new Thread() + { + public void run() + { + Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false)); + Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false)); + Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false)); + } + }; + t2.start(); + t2.join(2000); + assertFalse("Thread still blocked by pending range calculation", t2.isAlive()); + assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion()); + } + finally + { + calculationLock.unlock(); + } + } + + private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping) + { + HeartBeatState hb = new HeartBeatState(1, ver); + EndpointState state = new EndpointState(hb); + Collection<Token> tokens = new ArrayList<>(); + + tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3})); + state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens)); + state.addApplicationState(ApplicationState.STATUS, bootstrapping ? + StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens)); + state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId)); + state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); + + Map<InetAddress, EndpointState> states = new HashMap<>(); + states.put(otherNodeAddr, state); + return states; + } +}