Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed9df6d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed9df6d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed9df6d Branch: refs/heads/trunk Commit: 4ed9df6dfc2bb484c6fc5497a0bdc23dc076d04d Parents: e35f4f9 13e9396 Author: Paulo Motta <pa...@apache.org> Authored: Tue Dec 20 12:14:43 2016 -0200 Committer: Paulo Motta <pa...@apache.org> Committed: Tue Dec 20 12:15:20 2016 -0200 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/Keyspace.java | 24 +++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed9df6d/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index d9f8f62,217cf54..6e7e141 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -488,54 -466,24 +494,54 @@@ public class Keyspac if (requiresViewUpdate) { mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis()); - while (true) - { - if (TEST_FAIL_MV_LOCKS_COUNT == 0) - lock = ViewManager.acquireLockFor(mutation.key().getKey()); - else - TEST_FAIL_MV_LOCKS_COUNT--; - if (lock == null) + // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock() + Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds(); + Iterator<UUID> idIterator = columnFamilyIds.iterator(); + + locks = new Lock[columnFamilyIds.size()]; + for (int i = 0; i < columnFamilyIds.size(); i++) + { + UUID cfid = idIterator.next(); + int lockKey = Objects.hash(mutation.key().getKey(), cfid); + while (true) { - //throw WTE only if request is droppable - if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + Lock lock = null; + + if (TEST_FAIL_MV_LOCKS_COUNT == 0) + lock = ViewManager.acquireLockFor(lockKey); + else + TEST_FAIL_MV_LOCKS_COUNT--; + + if (lock == null) { - logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); - Tracing.trace("Could not acquire MV lock"); - if (future != null) + //throw WTE only if request is droppable + if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + { + for (int j = 0; j < i; j++) + locks[j].unlock(); + + logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name); + Tracing.trace("Could not acquire MV lock"); + if (future != null) + { + future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); + return future; + } + else + throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + } + else if (isDeferrable) { - future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); + for (int j = 0; j < i; j++) + locks[j].unlock(); + + // This view update can't happen right now. so rather than keep this thread busy + // we will re-apply ourself to the queue and try again later + final CompletableFuture<?> mark = future; + StageManager.getStage(Stage.MUTATION).execute(() -> - apply(mutation, writeCommitLog, true, isDroppable, true, mark) ++ applyInternal(mutation, writeCommitLog, true, isDroppable, true, mark) + ); return future; } else