Author: jbellis Date: Thu Apr 14 21:46:04 2011 New Revision: 1092525 URL: http://svn.apache.org/viewvc?rev=1092525&view=rev Log: fix possible counter deadlock patch by Kelvin Kakugawa, Stu Hood, and slebresne for CASSANDRA-2454
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1092525&r1=1092524&r2=1092525&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Apr 14 21:46:04 2011 @@ -1,7 +1,8 @@ 0.8-dev * remove Avro RPC support (CASSANDRA-926) * adds support for columns that act as incr/decr counters - (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288, 2105, 2384, 2236, 2342) + (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288, 2105, 2384, 2236, 2342, + 2454) * CQL (CASSANDRA-1703, 1704, 1705, 1706, 1707, 1708, 1710, 1711, 1940, 2124, 2302, 2277) * avoid double RowMutation serialization on write path (CASSANDRA-1800) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1092525&r1=1092524&r2=1092525&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Thu Apr 14 21:46:04 2011 @@ -76,6 +76,7 @@ public class StorageProxy implements Sto private static final WritePerformer standardWritePerformer; private static final WritePerformer counterWritePerformer; + private static final WritePerformer counterWriteOnCoordinatorPerformer; public static final StorageProxy instance = new StorageProxy(); @@ -102,11 +103,25 @@ public class StorageProxy implements Sto } }; + /* + * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or + * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage + * but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the + * underlying on the stage otherwise we risk a deadlock. Hence two different performer. + */ counterWritePerformer = new WritePerformer() { public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException { - applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level); + applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, false); + } + }; + + counterWriteOnCoordinatorPerformer = new WritePerformer() + { + public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException + { + applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, true); } }; } @@ -367,7 +382,7 @@ public class StorageProxy implements Sto if (endpoint.equals(FBUtilities.getLocalAddress())) { - applyCounterMutationOnLeader(cm); + applyCounterMutationOnCoordinator(cm); } else { @@ -423,7 +438,14 @@ public class StorageProxy implements Sto write(Collections.singletonList(cm), cm.consistency(), counterWritePerformer, false); } - private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level) + // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while + // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) + public static void applyCounterMutationOnCoordinator(CounterMutation cm) throws UnavailableException, TimeoutException, IOException + { + write(Collections.singletonList(cm), cm.consistency(), counterWriteOnCoordinatorPerformer, false); + } + + private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level, boolean executeOnMutationStage) { // we apply locally first, then send it to other replica if (logger.isDebugEnabled()) @@ -456,7 +478,10 @@ public class StorageProxy implements Sto } } }; - StageManager.getStage(Stage.MUTATION).execute(runnable); + if (executeOnMutationStage) + StageManager.getStage(Stage.MUTATION).execute(runnable); + else + runnable.run(); } /**