Updated Branches: refs/heads/cassandra-2.0 bb09d3c1b -> bdff106aa
Improve batchlog write performance with vnodes patch by Jonathan Ellis and Rick Branson; reviewed by Aleksey Yeschenko for CASSANDRA-6488 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4be9e672 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4be9e672 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4be9e672 Branch: refs/heads/cassandra-2.0 Commit: 4be9e6720d9f94a83aa42153c3e71ae1e557d2d9 Parents: a3d91dc Author: Aleksey Yeschenko <alek...@apache.org> Authored: Sun Dec 15 13:29:56 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Sun Dec 15 13:29:56 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../locator/AbstractReplicationStrategy.java | 35 ++++-------------- .../apache/cassandra/locator/TokenMetadata.java | 35 +++++++++++++----- .../apache/cassandra/service/StorageProxy.java | 39 +++++++++----------- 4 files changed, 53 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e586592..b55393b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ (CASSANDRA-6413) * (Hadoop) add describe_local_ring (CASSANDRA-6268) * Fix handling of concurrent directory creation failure (CASSANDRA-6459) + * Improve batchlog write performance with vnodes (CASSANDRA-6488) 1.2.12 http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index c36fde4..85e229c 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -20,12 +20,10 @@ package org.apache.cassandra.locator; import java.lang.reflect.Constructor; import java.net.InetAddress; import java.util.*; -import java.util.concurrent.locks.Lock; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.Striped; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +54,8 @@ public abstract class AbstractReplicationStrategy public final Map<String, String> configOptions; private final TokenMetadata tokenMetadata; - // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance, - // so that our ownership calculations never block Gossip from processing an ownership change. - // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345), - // so we keep our own copy here. - // - // Writes to tokenMetadataClone should be synchronized. - private volatile TokenMetadata tokenMetadataClone = null; - private volatile long clonedTokenMetadataVersion = 0; + // track when the token range changes, signaling we need to invalidate our endpoint cache + private volatile long lastInvalidatedVersion = 0; public IEndpointSnitch snitch; @@ -85,16 +77,15 @@ public abstract class AbstractReplicationStrategy { long lastVersion = tokenMetadata.getRingVersion(); - if (lastVersion > clonedTokenMetadataVersion) + if (lastVersion > lastInvalidatedVersion) { synchronized (this) { - if (lastVersion > clonedTokenMetadataVersion) + if (lastVersion > lastInvalidatedVersion) { logger.debug("clearing cached endpoints"); - tokenMetadataClone = null; cachedEndpoints.clear(); - clonedTokenMetadataVersion = lastVersion; + lastInvalidatedVersion = lastVersion; } } } @@ -116,19 +107,9 @@ public abstract class AbstractReplicationStrategy ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken); if (endpoints == null) { - TokenMetadata tm; // local reference in case another thread nulls tMC out from under us - if ((tm = tokenMetadataClone) == null) - { - // synchronize to prevent thundering herd post-invalidation - synchronized (this) - { - if ((tm = tokenMetadataClone) == null) - tm = tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap(); - } - // if our clone got invalidated, it's possible there is a new token to account for too - keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); - } - + TokenMetadata tm = tokenMetadata.cloneOnlyTokenMap(); + // if our cache got invalidated, it's possible there is a new token to account for too + keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm)); cachedEndpoints.put(keyToken, endpoints); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index b724894..be0f7c7 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -585,22 +586,37 @@ public class TokenMetadata } } + private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>(); + /** * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges, * bootstrap tokens and leaving endpoints are not included in the copy. + * + * This uses a cached copy that is invalided when the ring changes, so in the common case + * no extra locking is required. */ public TokenMetadata cloneOnlyTokenMap() { - lock.readLock().lock(); - try - { - return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), - HashBiMap.create(endpointToHostIdMap), - new Topology(topology)); - } - finally + TokenMetadata tm = cachedTokenMap.get(); + if (tm != null) + return tm; + + // synchronize is to prevent thundering herd (CASSANDRA-6345); lock.readLock is for correctness vs updates to our internals + synchronized (this) { - lock.readLock().unlock(); + lock.readLock().lock(); + try + { + tm = new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), + HashBiMap.create(endpointToHostIdMap), + new Topology(topology)); + cachedTokenMap.set(tm); + return tm; + } + finally + { + lock.readLock().unlock(); + } } } @@ -1057,6 +1073,7 @@ public class TokenMetadata public void invalidateCachedRings() { ringVersion++; + cachedTokenMap.set(null); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 3e9f2cb..376edb6 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; @@ -414,30 +415,32 @@ public class StorageProxy implements StorageProxyMBean * - replicas should be alive according to the failure detector * - replicas should be in the local datacenter * - choose min(2, number of qualifying candiates above) - * - allow the local node to be the only replica only if it's a single-node cluster + * - allow the local node to be the only replica only if it's a single-node DC */ - private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException + private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) + throws UnavailableException { - // will include every known node in the DC, including localhost. TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cloneOnlyTokenMap().getTopology(); - Collection<InetAddress> localMembers = topology.getDatacenterEndpoints().get(localDataCenter); + List<InetAddress> localEndpoints = new ArrayList<InetAddress>(topology.getDatacenterEndpoints().get(localDataCenter)); // special case for single-node datacenters - if (localMembers.size() == 1) - return localMembers; + if (localEndpoints.size() == 1) + return localEndpoints; - // not a single-node cluster - don't count the local node. - localMembers.remove(FBUtilities.getBroadcastAddress()); + List<InetAddress> chosenEndpoints = new ArrayList<InetAddress>(2); + int startOffset = new Random().nextInt(localEndpoints.size()); - // include only alive nodes - List<InetAddress> candidates = new ArrayList<InetAddress>(localMembers.size()); - for (InetAddress member : localMembers) + // starts at some random point in the list, advances forward until the end, then loops + // around to the beginning, advancing again until it is back at the starting point again. + for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++) { - if (FailureDetector.instance.isAlive(member)) - candidates.add(member); + InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size()); + // skip localhost and non-alive nodes + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint)) + chosenEndpoints.add(endpoint); } - if (candidates.isEmpty()) + if (chosenEndpoints.isEmpty()) { if (consistencyLevel == ConsistencyLevel.ANY) return Collections.singleton(FBUtilities.getBroadcastAddress()); @@ -445,13 +448,7 @@ public class StorageProxy implements StorageProxyMBean throw new UnavailableException(ConsistencyLevel.ONE, 1, 0); } - if (candidates.size() > 2) - { - Collections.shuffle(candidates); - candidates = candidates.subList(0, 2); - } - - return candidates; + return chosenEndpoints; } /**