Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 4717d2769 -> 38db6e446
Lock counter cells, not partitions patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for CASSANDRA-6880 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/38db6e44 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/38db6e44 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/38db6e44 Branch: refs/heads/cassandra-2.1 Commit: 38db6e44640982feb6397936eafecfee68fa3552 Parents: 4717d27 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Thu Apr 3 17:59:24 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Apr 3 23:51:44 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 15 ---- .../apache/cassandra/db/CounterMutation.java | 83 ++++++++++++++------ src/java/org/apache/cassandra/db/Keyspace.java | 13 +++ .../apache/cassandra/service/CacheService.java | 34 +++----- 5 files changed, 83 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 351b317..ceeb0c1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -38,6 +38,7 @@ (CASSANDRA-6931) * Optimize CounterColumn#reconcile() (CASSANDRA-6953) * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869) + * Lock counter cells, not partitions (CASSANDRA-6880) Merged from 2.0: * Allow compaction of system tables during startup (CASSANDRA-6913) * Restrict Windows to parallel repairs (CASSANDRA-6907) http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b9cab4d..43ecdc1 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; import java.util.regex.Pattern; import javax.management.*; @@ -34,7 +33,6 @@ import com.google.common.base.Function; import com.google.common.collect.*; import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.Striped; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,8 +133,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final ColumnFamilyMetrics metric; public volatile long sampleLatencyNanos; - private final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 128); - public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. @@ -379,17 +375,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } /** - * Obtain a lock for this CF's part of a counter mutation - * @param key the key for the CounterMutation - * @return the striped lock instance - */ - public Lock counterLockFor(ByteBuffer key) - { - assert metadata.isCounter(); - return counterLocks.get(key); - } - - /** * Removes every SSTable in the directory from the DataTracker's view. * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index dfc7a4a..c19b436 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -18,13 +18,16 @@ package org.apache.cassandra.db; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.collect.Iterables; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.context.CounterContext; @@ -34,9 +37,6 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.HeapAllocator; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; @@ -92,7 +92,7 @@ public class CounterMutation implements IMutation /** * Applies the counter mutation, returns the result Mutation (for replication to other nodes). * - * 1. Grabs the striped CF-level lock(s) + * 1. Grabs the striped cell-level locks in the proper order * 2. Gets the current values of the counters-to-be-modified from the counter cache * 3. Reads the rest of the current values (cache misses) from the CF * 4. Writes the updated counter values @@ -105,34 +105,24 @@ public class CounterMutation implements IMutation */ public Mutation apply() throws WriteTimeoutException { - Mutation result = new Mutation(getKeyspaceName(), ByteBufferUtil.clone(key())); + Mutation result = new Mutation(getKeyspaceName(), key()); Keyspace keyspace = Keyspace.open(getKeyspaceName()); - ArrayList<UUID> cfIds = new ArrayList<>(getColumnFamilyIds()); - Collections.sort(cfIds); // will lock in the sorted order, to avoid a potential deadlock. - ArrayList<Lock> locks = new ArrayList<>(cfIds.size()); + int count = 0; + for (ColumnFamily cf : getColumnFamilies()) + count += cf.getColumnCount(); + + List<Lock> locks = new ArrayList<>(count); + Tracing.trace("Acquiring {} counter locks", count); try { - Tracing.trace("Acquiring {} counter locks", cfIds.size()); - for (UUID cfId : cfIds) - { - Lock lock = keyspace.getColumnFamilyStore(cfId).counterLockFor(key()); - if (!lock.tryLock(getTimeout(), TimeUnit.MILLISECONDS)) - throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace)); - locks.add(lock); - } - + grabCounterLocks(keyspace, locks); for (ColumnFamily cf : getColumnFamilies()) result.add(processModifications(cf)); - result.apply(); updateCounterCache(result, keyspace); return result; } - catch (InterruptedException e) - { - throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace)); - } finally { for (Lock lock : locks) @@ -140,10 +130,51 @@ public class CounterMutation implements IMutation } } + private void grabCounterLocks(Keyspace keyspace, List<Lock> locks) throws WriteTimeoutException + { + long startTime = System.nanoTime(); + + for (Lock lock : Keyspace.counterLocksFor(getCounterLockKeys())) + { + long timeout = TimeUnit.MILLISECONDS.toNanos(getTimeout()) - (System.nanoTime() - startTime); + try + { + if (!lock.tryLock(timeout, TimeUnit.NANOSECONDS)) + throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace)); + locks.add(lock); + } + catch (InterruptedException e) + { + throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace)); + } + } + } + + /** + * Returns a wrapper for the Striped#bulkGet() call (via Keyspace#counterLocksFor()) + * Striped#bulkGet() depends on Object#hashCode(), so here we make sure that the cf id and the partition key + * all get to be part of the hashCode() calculation, not just the cell name. + */ + private Iterable<Object> getCounterLockKeys() + { + return Iterables.concat(Iterables.transform(getColumnFamilies(), new Function<ColumnFamily, Iterable<Object>>() + { + public Iterable<Object> apply(final ColumnFamily cf) + { + return Iterables.transform(cf, new Function<Cell, Object>() + { + public Object apply(Cell cell) + { + return Objects.hashCode(cf.id(), key(), cell.name()); + } + }); + } + })); + } + // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s private ColumnFamily processModifications(ColumnFamily changesCF) { - AbstractAllocator allocator = HeapAllocator.instance; ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id()); ColumnFamily resultCF = changesCF.cloneMeShallow(); @@ -154,7 +185,7 @@ public class CounterMutation implements IMutation if (cell instanceof CounterUpdateCell) counterUpdateCells.add((CounterUpdateCell)cell); else - resultCF.addColumn(cell.localCopy(allocator)); + resultCF.addColumn(cell); } if (counterUpdateCells.isEmpty()) @@ -169,7 +200,7 @@ public class CounterMutation implements IMutation long clock = currentValue.clock + 1L; long count = currentValue.count + update.delta(); - resultCF.addColumn(new CounterCell(update.name().copy(allocator), + resultCF.addColumn(new CounterCell(update.name(), CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count), update.timestamp())); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index c0a8690..1c3df77 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -29,9 +29,11 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; +import java.util.concurrent.locks.Lock; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Striped; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,8 @@ public class Keyspace private static final Logger logger = LoggerFactory.getLogger(Keyspace.class); + private static final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 1024); + // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure // proper directories here as well as in CassandraDaemon. static @@ -184,6 +188,15 @@ public class Keyspace } /** + * @param keys the keys to grab the locks for + * @return the striped lock instances + */ + public static Iterable<Lock> counterLocksFor(Iterable<Object> keys) + { + return counterLocks.bulkGet(keys); + } + + /** * Take a snapshot of the specific column family, or the entire set of column families * if columnFamily is null with a given timestamp * http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index b164eb9..048bad4 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -29,7 +29,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.locks.Lock; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -399,27 +398,18 @@ public class CacheService implements CacheServiceMBean public Pair<CounterCacheKey, ClockAndCount> call() throws Exception { DecoratedKey key = cfs.partitioner.decorateKey(partitionKey); - Lock lock = cfs.counterLockFor(partitionKey); - lock.lock(); - try - { - QueryFilter filter = QueryFilter.getNamesFilter(key, - cfs.metadata.cfName, - FBUtilities.singleton(cellName, cfs.metadata.comparator), - Long.MIN_VALUE); - ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE); - if (cf == null) - return null; - Cell cell = cf.getColumn(cellName); - if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE)) - return null; - ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value()); - return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount); - } - finally - { - lock.unlock(); - } + QueryFilter filter = QueryFilter.getNamesFilter(key, + cfs.metadata.cfName, + FBUtilities.singleton(cellName, cfs.metadata.comparator), + Long.MIN_VALUE); + ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE); + if (cf == null) + return null; + Cell cell = cf.getColumn(cellName); + if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE)) + return null; + ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value()); + return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount); } }); }