Merge branch 'cassandra-3.0' into cassandra-3.7 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/Keyspace.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f24e0667 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f24e0667 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f24e0667 Branch: refs/heads/trunk Commit: f24e066754188c9c1298b17bb7cb40502655eb99 Parents: a40d2e2 5794fb3 Author: Tyler Hobbs <tylerlho...@gmail.com> Authored: Thu May 26 11:37:47 2016 -0500 Committer: Tyler Hobbs <tylerlho...@gmail.com> Committed: Thu May 26 11:37:47 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ src/java/org/apache/cassandra/db/Keyspace.java | 12 +++++------- 2 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f24e0667/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 3acd3da,97893c9..79086b1 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,6 -1,6 +1,8 @@@ -3.0.7 +3.7 + * Don't use static dataDirectories field in Directories instances (CASSANDRA-11647) +Merged from 3.0: + * Avoid WriteTimeoutExceptions during commit log replay due to materialized + view lock contention (CASSANDRA-11891) * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530) * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) * Allow compaction strategies to disable early open (CASSANDRA-11754) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f24e0667/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index 08317b4,8b23186..1b3418e --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -434,45 -419,19 +434,43 @@@ public class Keyspac if (requiresViewUpdate) { mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis()); - lock = ViewManager.acquireLockFor(mutation.key().getKey()); - if (lock == null) + // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock() + Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds(); + Iterator<UUID> idIterator = columnFamilyIds.iterator(); + locks = new Lock[columnFamilyIds.size()]; + + for (int i = 0; i < columnFamilyIds.size(); i++) { - // avoid throwing a WTE during commitlog replay - if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + UUID cfid = idIterator.next(); + int lockKey = Objects.hash(mutation.key().getKey(), cfid); + Lock lock = ViewManager.acquireLockFor(lockKey); + if (lock == null) { - logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); - Tracing.trace("Could not acquire MV lock"); - if (future != null) - future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); + // we will either time out or retry, so release all acquired locks + for (int j = 0; j < i; j++) + locks[j].unlock(); + - if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) ++ // avoid throwing a WTE during commitlog replay ++ if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + { + logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name); + Tracing.trace("Could not acquire MV lock"); + if (future != null) + future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); + else + throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + } else - throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + { + // This view update can't happen right now. so rather than keep this thread busy + // we will re-apply ourself to the queue and try again later - StageManager.getStage(Stage.MUTATION).execute(() -> { - if (writeCommitLog) - apply(mutation, true, true, isClReplay, mark); - else - apply(mutation, false, true, isClReplay, mark); - }); ++ StageManager.getStage(Stage.MUTATION).execute(() -> ++ apply(mutation, writeCommitLog, true, isClReplay, mark) ++ ); + + return mark; + } } else {