Fix potential deadlock during counter writes patch by slebresne; reviewed by jbellis for CASSANDRA-4578
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7371e10b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7371e10b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7371e10b Branch: refs/heads/trunk Commit: 7371e10be40d9a01167d36eb2db69526b6b0ce50 Parents: 4177b58 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Sep 6 14:57:08 2012 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Sep 7 10:41:58 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/CounterColumn.java | 2 +- .../cassandra/db/CounterMutationVerbHandler.java | 32 +++++++++++--- .../locator/AbstractReplicationStrategy.java | 8 ++-- .../service/AbstractWriteResponseHandler.java | 17 +++++++- .../DatacenterSyncWriteResponseHandler.java | 10 ++-- .../service/DatacenterWriteResponseHandler.java | 10 ++-- .../org/apache/cassandra/service/StorageProxy.java | 17 +++++--- .../cassandra/service/WriteResponseHandler.java | 12 +++--- 9 files changed, 72 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4a5bfe9..f192be2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 1.1.6 * (cql3) fix potential NPE with both equal and unequal restriction (CASSANDRA-4532) * (cql3) improves ORDER BY validation (CASSANDRA-4624) + * Fix potential deadlock during counter writes (CASSANDRA-4578) 1.1.5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/db/CounterColumn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java index 2ea0779..b6e3909 100644 --- a/src/java/org/apache/cassandra/db/CounterColumn.java +++ b/src/java/org/apache/cassandra/db/CounterColumn.java @@ -370,7 +370,7 @@ public class CounterColumn extends Column responseHandler.response(null); StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level); } - }); + }, null); // we don't wait for answers } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index 01d7b50..3ecbe8b 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -35,7 +35,7 @@ public class CounterMutationVerbHandler implements IVerbHandler { private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class); - public void doVerb(Message message, String id) + public void doVerb(final Message message, final String id) { byte[] bytes = message.getMessageBody(); FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes); @@ -43,15 +43,33 @@ public class CounterMutationVerbHandler implements IVerbHandler try { DataInputStream is = new DataInputStream(buffer); - CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion()); + final CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion()); if (logger.isDebugEnabled()) logger.debug("Applying forwarded " + cm); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get(); - WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true); - Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response); - MessagingService.instance().sendReply(responseMessage, id, message.getFrom()); + // We should not wait for the result of the write in this thread, + // otherwise we could have a distributed deadlock between replicas + // running this VerbHandler (see #4578). + // Instead, we use a callback to send the response. Note that the callback + // will not be called if the request timeout, but this is ok + // because the coordinator of the counter mutation will timeout on + // it's own in that case. + StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){ + public void run() + { + try + { + WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true); + Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response); + MessagingService.instance().sendReply(responseMessage, id, message.getFrom()); + } + catch (IOException e) + { + logger.error("Error writing response to counter mutation", e); + } + } + }); } catch (UnavailableException e) { @@ -61,7 +79,7 @@ public class CounterMutationVerbHandler implements IVerbHandler } catch (TimeoutException e) { - // The coordinator node will have timeout itself so we let that goes + // The coordinator will timeout on it's own so ignore } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index f925124..54d6d06 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -116,18 +116,18 @@ public abstract class AbstractReplicationStrategy */ public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); - public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistency_level) + public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistency_level, Runnable callback) { if (consistency_level == ConsistencyLevel.LOCAL_QUORUM) { // block for in this context will be localnodes block. - return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level, table); + return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level, table, callback); } else if (consistency_level == ConsistencyLevel.EACH_QUORUM) { - return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level, table); + return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level, table, callback); } - return WriteResponseHandler.create(writeEndpoints, consistency_level, table); + return WriteResponseHandler.create(writeEndpoints, consistency_level, table, callback); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 81def72..d280a8e 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -34,16 +34,22 @@ import org.apache.cassandra.utils.SimpleCondition; public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler { - protected final SimpleCondition condition = new SimpleCondition(); + private final SimpleCondition condition = new SimpleCondition(); protected final long startTime; protected final Collection<InetAddress> writeEndpoints; protected final ConsistencyLevel consistencyLevel; + protected final Runnable callback; - protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel) + /** + * @param callback A callback to be called when the write is successful. + * Note that this callback will *not* be called in case of an exception (timeout or unavailable). + */ + protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, Runnable callback) { startTime = System.currentTimeMillis(); this.consistencyLevel = consistencyLevel; this.writeEndpoints = writeEndpoints; + this.callback = callback; } public void get() throws TimeoutException @@ -69,4 +75,11 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand public abstract void response(Message msg); public abstract void assureSufficientLiveNodes() throws UnavailableException; + + protected void signal() + { + condition.signal(); + if (callback != null) + callback.run(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index a3f6825..cbecf6b 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -56,10 +56,10 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan private final NetworkTopologyStrategy strategy; private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>(); - protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { // Response is been managed by the map so make it 1 for the superclass. - super(writeEndpoints, consistencyLevel); + super(writeEndpoints, consistencyLevel, callback); assert consistencyLevel == ConsistencyLevel.EACH_QUORUM; strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy(); @@ -71,9 +71,9 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan } } - public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { - return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table); + return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table, callback); } public void response(Message message) @@ -91,7 +91,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan } // all the quorum conditions are met - condition.signal(); + signal(); } public void assureSufficientLiveNodes() throws UnavailableException http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index 62385db..881c99d 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -50,15 +50,15 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); } - protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { - super(writeEndpoints, consistencyLevel, table); + super(writeEndpoints, consistencyLevel, table, callback); assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM; } - public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { - return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table); + return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table, callback); } @Override @@ -75,7 +75,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom()))) { if (responses.decrementAndGet() == 0) - condition.signal(); + signal(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 566a3b4..23e0de4 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -190,7 +190,7 @@ public class StorageProxy implements StorageProxyMBean } else { - responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer)); + responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null)); } } @@ -240,11 +240,14 @@ public class StorageProxy implements StorageProxyMBean * @param performer the WritePerformer in charge of appliying the mutation * given the list of write endpoints (either standardWritePerformer for * standard writes or counterWritePerformer for counter writes). + * @param callback an optional callback to be run if and when the write is + * successful. */ public static IWriteResponseHandler performWrite(IMutation mutation, ConsistencyLevel consistency_level, String localDataCenter, - WritePerformer performer) + WritePerformer performer, + Runnable callback) throws UnavailableException, TimeoutException, IOException { String table = mutation.getTable(); @@ -252,7 +255,7 @@ public class StorageProxy implements StorageProxyMBean Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key()); - IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level); + IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level, callback); // exit early if we can't fulfill the CL at this time responseHandler.assureSufficientLiveNodes(); @@ -486,7 +489,7 @@ public class StorageProxy implements StorageProxyMBean AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy(); Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key()); - rs.getWriteResponseHandler(writeEndpoints, cm.consistency()).assureSufficientLiveNodes(); + rs.getWriteResponseHandler(writeEndpoints, cm.consistency(), null).assureSufficientLiveNodes(); // Forward the actual update to the chosen leader replica IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint); @@ -538,16 +541,16 @@ public class StorageProxy implements StorageProxyMBean // Must be called on a replica of the mutation. This replica becomes the // leader of this mutation. - public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException + public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, TimeoutException, IOException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer); + return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback); } // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer); + return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null); } private static Runnable counterWriteTask(final IMutation mutation, http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 5884687..baf8558 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -42,21 +42,21 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler protected final AtomicInteger responses; - protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { - super(writeEndpoints, consistencyLevel); + super(writeEndpoints, consistencyLevel, callback); responses = new AtomicInteger(determineBlockFor(table)); } protected WriteResponseHandler(InetAddress endpoint) { - super(Arrays.asList(endpoint), ConsistencyLevel.ALL); + super(Arrays.asList(endpoint), ConsistencyLevel.ALL, null); responses = new AtomicInteger(1); } - public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { - return new WriteResponseHandler(writeEndpoints, consistencyLevel, table); + return new WriteResponseHandler(writeEndpoints, consistencyLevel, table, callback); } public static IWriteResponseHandler create(InetAddress endpoint) @@ -67,7 +67,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler public void response(Message m) { if (responses.decrementAndGet() == 0) - condition.signal(); + signal(); } protected int determineBlockFor(String table)