Fix deadlockin mutation state underconcurrent, CL > ONE writes to counters backport of CASSANDRA-4578
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73f479c2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73f479c2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73f479c2 Branch: refs/heads/trunk Commit: 73f479c287d3f8b399926abea7d771e34c79f623 Parents: 373e5b0 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Nov 1 14:32:23 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Nov 1 14:32:23 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 5 ++ .../org/apache/cassandra/db/CounterColumn.java | 2 +- .../cassandra/db/CounterMutationVerbHandler.java | 32 +++++++++++--- .../service/AbstractWriteResponseHandler.java | 15 ++++++- .../DatacenterSyncWriteResponseHandler.java | 2 +- .../service/DatacenterWriteResponseHandler.java | 2 +- .../cassandra/service/IWriteResponseHandler.java | 9 ++++ .../org/apache/cassandra/service/StorageProxy.java | 15 +++++-- .../cassandra/service/WriteResponseHandler.java | 2 +- 9 files changed, 67 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 884eaba..54a0511 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,8 @@ +Unreleased + * Fix deadlock in mutation state under concurrent, CL > ONE writes to counters + (backport of CASSANDRA-4578) + + 1.0.12 * Switch from NBHM to CHM in MessagingService's callback map, which prevents OOM in long-running instances (CASSANDRA-4708) http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/db/CounterColumn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java index a988055..5901eec 100644 --- a/src/java/org/apache/cassandra/db/CounterColumn.java +++ b/src/java/org/apache/cassandra/db/CounterColumn.java @@ -372,7 +372,7 @@ public class CounterColumn extends Column responseHandler.response(null); StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level); } - }); + }, null); // we don't wait for answers } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index b700d37..643c618 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -40,7 +40,7 @@ public class CounterMutationVerbHandler implements IVerbHandler { private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class); - public void doVerb(Message message, String id) + public void doVerb(final Message message, final String id) { byte[] bytes = message.getMessageBody(); FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes); @@ -48,15 +48,33 @@ public class CounterMutationVerbHandler implements IVerbHandler try { DataInputStream is = new DataInputStream(buffer); - CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion()); + final CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion()); if (logger.isDebugEnabled()) logger.debug("Applying forwarded " + cm); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get(); - WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true); - Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response); - MessagingService.instance().sendReply(responseMessage, id, message.getFrom()); + // We should not wait for the result of the write in this thread, + // otherwise we could have a distributed deadlock between replicas + // running this VerbHandler (see #4578). + // Instead, we use a callback to send the response. Note that the callback + // will not be called if the request timeout, but this is ok + // because the coordinator of the counter mutation will timeout on + // it's own in that case. + StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){ + public void run() + { + try + { + WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true); + Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response); + MessagingService.instance().sendReply(responseMessage, id, message.getFrom()); + } + catch (IOException e) + { + logger.error("Error writing response to counter mutation", e); + } + } + }); } catch (UnavailableException e) { @@ -66,7 +84,7 @@ public class CounterMutationVerbHandler implements IVerbHandler } catch (TimeoutException e) { - // The coordinator node will have timeout itself so we let that goes + // The coordinator will timeout on it's own so ignore } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 0a21676..c2d05b4 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -39,10 +39,11 @@ import org.apache.cassandra.utils.SimpleCondition; public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler { - protected final SimpleCondition condition = new SimpleCondition(); + private final SimpleCondition condition = new SimpleCondition(); protected final long startTime; protected final Collection<InetAddress> writeEndpoints; protected final ConsistencyLevel consistencyLevel; + protected volatile Runnable callback; protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel) { @@ -74,4 +75,16 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand public abstract void response(Message msg); public abstract void assureSufficientLiveNodes() throws UnavailableException; + + protected void signal() + { + condition.signal(); + if (callback != null) + callback.run(); + } + + public void setCallback(Runnable callback) + { + this.callback = callback; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index 50453a0..985567f 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -91,7 +91,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan } // all the quorum conditions are met - condition.signal(); + signal(); } public void assureSufficientLiveNodes() throws UnavailableException http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index 7ca3dd1..c1a5ed3 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -75,7 +75,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom()))) { if (responses.decrementAndGet() == 0) - condition.signal(); + signal(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/IWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java index 807df5b..ac76b78 100644 --- a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java @@ -30,4 +30,13 @@ public interface IWriteResponseHandler extends IAsyncCallback { public void get() throws TimeoutException; public void assureSufficientLiveNodes() throws UnavailableException; + + /** + * Set a callback to be called when the write is successful. + * Note that the callback will *not* be called in case of an exception (timeout or unavailable). + * Also, the callback should be set before any response() call, otherwise + * there is no guarantee it will ever be called. + * Successive calls to this method will override the previous callback by the new one. + */ + public void setCallback(Runnable callback); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index ef70d1e..6887f1f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -191,7 +191,7 @@ public class StorageProxy implements StorageProxyMBean } else { - responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer)); + responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null)); } } @@ -235,11 +235,14 @@ public class StorageProxy implements StorageProxyMBean * @param performer the WritePerformer in charge of appliying the mutation * given the list of write endpoints (either standardWritePerformer for * standard writes or counterWritePerformer for counter writes). + * @param callback an optional callback to be run if and when the write is + * successful. */ public static IWriteResponseHandler performWrite(IMutation mutation, ConsistencyLevel consistency_level, String localDataCenter, - WritePerformer performer) + WritePerformer performer, + Runnable callback) throws UnavailableException, TimeoutException, IOException { String table = mutation.getTable(); @@ -248,6 +251,8 @@ public class StorageProxy implements StorageProxyMBean Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key()); IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level); + if (callback != null) + responseHandler.setCallback(callback); // exit early if we can't fulfill the CL at this time responseHandler.assureSufficientLiveNodes(); @@ -500,16 +505,16 @@ public class StorageProxy implements StorageProxyMBean // Must be called on a replica of the mutation. This replica becomes the // leader of this mutation. - public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException + public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, TimeoutException, IOException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer); + return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback); } // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer); + return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null); } private static Runnable counterWriteTask(final IMutation mutation, http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 5884687..6fc11dd 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -67,7 +67,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler public void response(Message m) { if (responses.decrementAndGet() == 0) - condition.signal(); + signal(); } protected int determineBlockFor(String table)