Mutations do not block for completion under view lock contention Patch by Carl Yeksigian; reviewed by Tyler Hobbs for CASSANDRA-10779
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/839a5bab Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/839a5bab Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/839a5bab Branch: refs/heads/cassandra-3.3 Commit: 839a5bab2a7f5385a878e5dc5f8b01bda28fa777 Parents: b554cb3 Author: Carl Yeksigian <c...@apache.org> Authored: Mon Feb 1 16:51:15 2016 -0500 Committer: Carl Yeksigian <c...@apache.org> Committed: Mon Feb 1 16:59:57 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Keyspace.java | 32 +++++++++++++------- src/java/org/apache/cassandra/db/Mutation.java | 19 ++++++++++-- .../cassandra/db/MutationVerbHandler.java | 25 ++++++++++++--- .../db/commitlog/CommitLogReplayer.java | 7 +++-- .../cassandra/service/paxos/PaxosState.java | 12 ++++++-- 6 files changed, 73 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7a42916..bed8703 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.3 + * Mutations do not block for completion under view lock contention (CASSANDRA-10779) * Invalidate legacy schema tables when unloading them (CASSANDRA-11071) * (cqlsh) handle INSERT and UPDATE statements with LWT conditions correctly (CASSANDRA-11003) http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 7b4f79b..2b62f0e 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -379,19 +379,19 @@ public class Keyspace } } - public void apply(Mutation mutation, boolean writeCommitLog) + public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog) { - apply(mutation, writeCommitLog, true, false); + return apply(mutation, writeCommitLog, true, false, null); } - public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) + public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { - apply(mutation, writeCommitLog, updateIndexes, false); + return apply(mutation, writeCommitLog, updateIndexes, false, null); } - public void applyFromCommitLog(Mutation mutation) + public CompletableFuture<?> applyFromCommitLog(Mutation mutation) { - apply(mutation, false, true, true); + return apply(mutation, false, true, true, null); } /** @@ -403,13 +403,18 @@ public class Keyspace * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") * @param isClReplay true if caller is the commitlog replayer */ - public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes, boolean isClReplay) + public CompletableFuture<?> apply(final Mutation mutation, + final boolean writeCommitLog, + boolean updateIndexes, + boolean isClReplay, + CompletableFuture<?> future) { if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); Lock lock = null; boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false); + final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future; if (requiresViewUpdate) { @@ -422,7 +427,10 @@ public class Keyspace { logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); Tracing.trace("Could not acquire MV lock"); - throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + if (future != null) + future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); + else + throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); } else { @@ -430,12 +438,12 @@ public class Keyspace // we will re-apply ourself to the queue and try again later StageManager.getStage(Stage.MUTATION).execute(() -> { if (writeCommitLog) - mutation.apply(); + apply(mutation, true, true, isClReplay, mark); else - mutation.applyUnsafe(); + apply(mutation, false, true, isClReplay, mark); }); - return; + return mark; } } else @@ -495,6 +503,8 @@ public class Keyspace if (requiresViewUpdate) baseComplete.set(System.currentTimeMillis()); } + mark.complete(null); + return mark; } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index cbc7e17..6b4c8e9 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -20,8 +20,11 @@ package org.apache.cassandra.db; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -195,14 +198,26 @@ public class Mutation implements IMutation return new Mutation(ks, key, modifications); } + public CompletableFuture<?> applyFuture() + { + Keyspace ks = Keyspace.open(keyspaceName); + return ks.apply(this, ks.getMetadata().params.durableWrites); + } + /* * This is equivalent to calling commit. Applies the changes to * to the keyspace that is obtained by calling Keyspace.open(). */ public void apply() { - Keyspace ks = Keyspace.open(keyspaceName); - ks.apply(this, ks.getMetadata().params.durableWrites); + try + { + Uninterruptibles.getUninterruptibly(applyFuture()); + } + catch (ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } } public void apply(boolean durableWrites) http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index d4670a2..74dd625 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db; import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; +import java.util.function.Consumer; import org.apache.cassandra.batchlog.LegacyBatchlogMigrator; import org.apache.cassandra.exceptions.WriteTimeoutException; @@ -29,6 +30,17 @@ import org.apache.cassandra.tracing.Tracing; public class MutationVerbHandler implements IVerbHandler<Mutation> { + private void reply(int id, InetAddress replyTo) + { + Tracing.trace("Enqueuing response to {}", replyTo); + MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo); + } + + private void failed() + { + Tracing.trace("Payload application resulted in WriteTimeout, not replying"); + } + public void doVerb(MessageIn<Mutation> message, int id) throws IOException { // Check if there were any forwarding headers in this message @@ -49,16 +61,19 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> try { if (message.version < MessagingService.VERSION_30 && LegacyBatchlogMigrator.isLegacyBatchlogMutation(message.payload)) + { LegacyBatchlogMigrator.handleLegacyMutation(message.payload); + reply(id, replyTo); + } else - message.payload.apply(); - - Tracing.trace("Enqueuing response to {}", replyTo); - MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo); + message.payload.applyFuture().thenAccept(o -> reply(id, replyTo)).exceptionally(wto -> { + failed(); + return null; + }); } catch (WriteTimeoutException wto) { - Tracing.trace("Payload application resulted in WriteTimeout, not replying"); + failed(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 2668bba..b4472ed 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -25,6 +25,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; @@ -34,6 +35,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -54,7 +56,6 @@ import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -567,7 +568,7 @@ public class CommitLogReplayer Runnable runnable = new WrappedRunnable() { - public void runMayThrow() throws IOException + public void runMayThrow() throws ExecutionException { if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) return; @@ -602,7 +603,7 @@ public class CommitLogReplayer if (newMutation != null) { assert !newMutation.isEmpty(); - Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation); + Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); keyspacesRecovered.add(keyspace); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index 20ccb90..3ecac99 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -20,10 +20,11 @@ */ package org.apache.cassandra.service.paxos; -import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.Lock; import com.google.common.util.concurrent.Striped; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -138,7 +139,14 @@ public class PaxosState { Tracing.trace("Committing proposal {}", proposal); Mutation mutation = proposal.makeMutation(); - Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true); + try + { + Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true)); + } + catch (ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } } else {