Emit metrics for CAS

Patch by Sankalp Kohli, reviewed by brandonwilliams for CASSANDRA-7341


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ca16e5be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ca16e5be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ca16e5be

Branch: refs/heads/trunk
Commit: ca16e5be37cd6eff83a86e141ff6201fd11767ef
Parents: 432b731
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Thu Oct 16 11:43:13 2014 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Thu Oct 16 11:43:13 2014 -0500

----------------------------------------------------------------------
 .../cassandra/metrics/ColumnFamilyMetrics.java  |  10 ++
 .../cassandra/metrics/KeyspaceMetrics.java      |  10 ++
 .../apache/cassandra/service/StorageProxy.java  | 149 ++++++++++++-------
 .../cassandra/service/paxos/PaxosState.java     |  95 +++++++-----
 4 files changed, 172 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java 
b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index 75a21dc..8ab432e 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -109,6 +109,12 @@ public class ColumnFamilyMetrics
     public final Counter rowCacheHit;
     /** Number of row cache misses */
     public final Counter rowCacheMiss;
+    /** CAS Prepare metrics */
+    public final LatencyMetrics casPrepare;
+    /** CAS Propose metrics */
+    public final LatencyMetrics casPropose;
+    /** CAS Commit metrics */
+    public final LatencyMetrics casCommit;
 
     public final Timer coordinatorReadLatency;
     public final Timer coordinatorScanLatency;
@@ -505,6 +511,10 @@ public class ColumnFamilyMetrics
         rowCacheHitOutOfRange = 
createColumnFamilyCounter("RowCacheHitOutOfRange");
         rowCacheHit = createColumnFamilyCounter("RowCacheHit");
         rowCacheMiss = createColumnFamilyCounter("RowCacheMiss");
+
+        casPrepare = new LatencyMetrics(factory, "CasPrepare", 
cfs.keyspace.metric.casPrepare);
+        casPropose = new LatencyMetrics(factory, "CasPropose", 
cfs.keyspace.metric.casPropose);
+        casCommit = new LatencyMetrics(factory, "CasCommit", 
cfs.keyspace.metric.casCommit);
     }
 
     public void updateSSTableIterated(int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java 
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 92fabf1..6fa64e9 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -70,6 +70,12 @@ public class KeyspaceMetrics
     public final Histogram tombstoneScannedHistogram;
     /** Live cells scanned in queries on this Keyspace */
     public final Histogram liveScannedHistogram;
+    /** CAS Prepare metric */
+    public final LatencyMetrics casPrepare;
+    /** CAS Propose metrics */
+    public final LatencyMetrics casPropose;
+    /** CAS Commit metrics */
+    public final LatencyMetrics casCommit;
     
     public final MetricNameFactory factory;
     private Keyspace keyspace;
@@ -187,6 +193,10 @@ public class KeyspaceMetrics
         liveScannedHistogram = 
Metrics.newHistogram(factory.createMetricName("LiveScannedHistogram"), true);
         // add manually since histograms do not use createKeyspaceGauge method
         allMetrics.addAll(Lists.newArrayList("SSTablesPerReadHistogram", 
"TombstoneScannedHistogram", "LiveScannedHistogram"));
+
+        casPrepare = new LatencyMetrics(factory, "CasPrepare");
+        casPropose = new LatencyMetrics(factory, "CasPropose");
+        casCommit = new LatencyMetrics(factory, "CasCommit");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index d9602bb..f30862b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,7 +31,7 @@ import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Uninterruptibles;
-import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.metrics.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,9 +58,6 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.metrics.ClientRequestMetrics;
-import org.apache.cassandra.metrics.ReadRepairMetrics;
-import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.*;
 import org.apache.cassandra.sink.SinkManager;
@@ -93,6 +90,8 @@ public class StorageProxy implements StorageProxyMBean
     private static final ClientRequestMetrics readMetrics = new 
ClientRequestMetrics("Read");
     private static final ClientRequestMetrics rangeMetrics = new 
ClientRequestMetrics("RangeSlice");
     private static final ClientRequestMetrics writeMetrics = new 
ClientRequestMetrics("Write");
+    private static final CASClientRequestMetrics casWriteMetrics = new 
CASClientRequestMetrics("CASWrite");
+    private static final CASClientRequestMetrics casReadMetrics = new 
CASClientRequestMetrics("CASRead");
 
     private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
 
@@ -205,63 +204,86 @@ public class StorageProxy implements StorageProxyMBean
                                    ConsistencyLevel consistencyForCommit)
     throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, WriteTimeoutException, InvalidRequestException
     {
-        consistencyForPaxos.validateForCas();
-        consistencyForCommit.validateForCasCommit(keyspaceName);
-
-        CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
cfName);
-
-        long start = System.nanoTime();
-        long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
-        while (System.nanoTime() - start < timeout)
+        final long start = System.nanoTime();
+        int contentions = 0;
+        try
         {
-            // for simplicity, we'll do a single liveness check at the start 
of each attempt
-            Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(keyspaceName, key, consistencyForPaxos);
-            List<InetAddress> liveEndpoints = p.left;
-            int requiredParticipants = p.right;
+            consistencyForPaxos.validateForCas();
+            consistencyForCommit.validateForCasCommit(keyspaceName);
 
-            UUID ballot = beginAndRepairPaxos(start, key, metadata, 
liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit);
+            CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
cfName);
 
-            // read the current values and check they validate the conditions
-            Tracing.trace("Reading existing values for CAS precondition");
-            long timestamp = System.currentTimeMillis();
-            ReadCommand readCommand = ReadCommand.create(keyspaceName, key, 
cfName, timestamp, request.readFilter());
-            List<Row> rows = read(Arrays.asList(readCommand), 
consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM);
-            ColumnFamily current = rows.get(0).cf;
-            if (!request.appliesTo(current))
+            long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
+            while (System.nanoTime() - start < timeout)
             {
-                Tracing.trace("CAS precondition does not match current values 
{}", current);
-                // We should not return null as this means success
-                return current == null ? 
ArrayBackedSortedColumns.factory.create(metadata) : current;
-            }
+                // for simplicity, we'll do a single liveness check at the 
start of each attempt
+                Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(keyspaceName, key, consistencyForPaxos);
+                List<InetAddress> liveEndpoints = p.left;
+                int requiredParticipants = p.right;
 
-            // finish the paxos round w/ the desired updates
-            // TODO turn null updates into delete?
-            ColumnFamily updates = request.makeUpdates(current);
-
-            // Apply triggers to cas updates. A consideration here is that
-            // triggers emit Mutations, and so a given trigger implementation
-            // may generate mutations for partitions other than the one this
-            // paxos round is scoped for. In this case, TriggerExecutor will
-            // validate that the generated mutations are targetted at the same
-            // partition as the initial updates and reject (via an
-            // InvalidRequestException) any which aren't.
-            updates = TriggerExecutor.instance.execute(key, updates);
-
-            Commit proposal = Commit.newProposal(key, ballot, updates);
-            Tracing.trace("CAS precondition is met; proposing client-requested 
updates for {}", ballot);
-            if (proposePaxos(proposal, liveEndpoints, requiredParticipants, 
true, consistencyForPaxos))
-            {
-                commitPaxos(proposal, consistencyForCommit);
-                Tracing.trace("CAS successful");
-                return null;
+                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, 
key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, 
consistencyForCommit, true);
+                final UUID ballot = pair.left;
+                contentions += pair.right;
+                // read the current values and check they validate the 
conditions
+                Tracing.trace("Reading existing values for CAS precondition");
+                long timestamp = System.currentTimeMillis();
+                ReadCommand readCommand = ReadCommand.create(keyspaceName, 
key, cfName, timestamp, request.readFilter());
+                List<Row> rows = read(Arrays.asList(readCommand), 
consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM);
+                ColumnFamily current = rows.get(0).cf;
+                if (!request.appliesTo(current))
+                {
+                    Tracing.trace("CAS precondition does not match current 
values {}", current);
+                    // We should not return null as this means success
+                    casWriteMetrics.conditionNotMet.inc();
+                    return current == null ? 
ArrayBackedSortedColumns.factory.create(metadata) : current;
+                }
+
+                // finish the paxos round w/ the desired updates
+                // TODO turn null updates into delete?
+                ColumnFamily updates = request.makeUpdates(current);
+
+                // Apply triggers to cas updates. A consideration here is that
+                // triggers emit Mutations, and so a given trigger 
implementation
+                // may generate mutations for partitions other than the one 
this
+                // paxos round is scoped for. In this case, TriggerExecutor 
will
+                // validate that the generated mutations are targetted at the 
same
+                // partition as the initial updates and reject (via an
+                // InvalidRequestException) any which aren't.
+                updates = TriggerExecutor.instance.execute(key, updates);
+
+                Commit proposal = Commit.newProposal(key, ballot, updates);
+                Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
+                if (proposePaxos(proposal, liveEndpoints, 
requiredParticipants, true, consistencyForPaxos))
+                {
+                    commitPaxos(proposal, consistencyForCommit);
+                    Tracing.trace("CAS successful");
+                    return null;
+                }
+
+                Tracing.trace("Paxos proposal not accepted (pre-empted by a 
higher ballot)");
+                contentions++;
+                
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
+                // continue to retry
             }
 
-            Tracing.trace("Paxos proposal not accepted (pre-empted by a higher 
ballot)");
-            
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
-            // continue to retry
+            throw new WriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
+        }
+        catch (WriteTimeoutException|ReadTimeoutException e)
+        {
+            casWriteMetrics.timeouts.mark();
+            throw e;
+        }
+        catch(UnavailableException e)
+        {
+            casWriteMetrics.unavailables.mark();
+            throw e;
+        }
+        finally
+        {
+            if(contentions > 0)
+                casWriteMetrics.contention.update(contentions);
+            casWriteMetrics.addNano(System.nanoTime() - start);
         }
-
-        throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
     }
 
     private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
@@ -302,12 +324,13 @@ public class StorageProxy implements StorageProxyMBean
      * @return the Paxos ballot promised by the replicas if no in-progress 
requests were seen and a quorum of
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
-    private static UUID beginAndRepairPaxos(long start, ByteBuffer key, 
CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, 
ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit)
+    private static Pair<UUID, Integer> beginAndRepairPaxos(long start, 
ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int 
requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel 
consistencyForCommit, final boolean isWrite)
     throws WriteTimeoutException
     {
         long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
 
         PrepareCallback summary = null;
+        int contentions = 0;
         while (System.nanoTime() - start < timeout)
         {
             long ballotMillis = summary == null
@@ -322,6 +345,7 @@ public class StorageProxy implements StorageProxyMBean
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher 
ballot than ours; aborting");
+                contentions++;
                 // sleep a random amount to give the other proposer a chance 
to finish
                 
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
                 continue;
@@ -335,6 +359,10 @@ public class StorageProxy implements StorageProxyMBean
             if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
             {
                 Tracing.trace("Finishing incomplete paxos round {}", 
inProgress);
+                if(isWrite)
+                    casWriteMetrics.unfinishedCommit.inc();
+                else
+                    casReadMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = 
Commit.newProposal(inProgress.key, ballot, inProgress.update);
                 if (proposePaxos(refreshedInProgress, liveEndpoints, 
requiredParticipants, false, consistencyForPaxos))
                 {
@@ -344,6 +372,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     Tracing.trace("Some replicas have already promised a 
higher ballot than ours; aborting");
                     // sleep a random amount to give the other proposer a 
chance to finish
+                    contentions++;
                     
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
                 }
                 continue;
@@ -365,7 +394,7 @@ public class StorageProxy implements StorageProxyMBean
                 continue;
             }
 
-            return ballot;
+            return Pair.create(ballot, contentions);
         }
 
         throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
@@ -1139,7 +1168,9 @@ public class StorageProxy implements StorageProxyMBean
                 final ConsistencyLevel consistencyForCommitOrFetch = 
consistency_level == ConsistencyLevel.LOCAL_SERIAL ? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
                 try
                 {
-                    beginAndRepairPaxos(start, command.key, metadata, 
liveEndpoints, requiredParticipants, consistency_level, 
consistencyForCommitOrFetch);
+                    final Pair<UUID, Integer> pair = 
beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, 
requiredParticipants, consistency_level, consistencyForCommitOrFetch, false);
+                    if(pair.right > 0)
+                        casReadMetrics.contention.update(pair.right);
                 }
                 catch (WriteTimeoutException e)
                 {
@@ -1157,18 +1188,24 @@ public class StorageProxy implements StorageProxyMBean
         {
             readMetrics.unavailables.mark();
             ClientRequestMetrics.readUnavailables.inc();
+            if(consistency_level.isSerialConsistency())
+                casReadMetrics.unavailables.mark();
             throw e;
         }
         catch (ReadTimeoutException e)
         {
             readMetrics.timeouts.mark();
             ClientRequestMetrics.readTimeouts.inc();
+            if(consistency_level.isSerialConsistency())
+                casReadMetrics.timeouts.mark();
             throw e;
         }
         finally
         {
             long latency = System.nanoTime() - start;
             readMetrics.addNano(latency);
+            if(consistency_level.isSerialConsistency())
+                casReadMetrics.addNano(latency);
             // TODO avoid giving every command the same latency number.  Can 
fix this in CASSADRA-5329
             for (ReadCommand command : commands)
                 
Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency,
 TimeUnit.NANOSECONDS);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index df7365d..abd173c 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -27,9 +27,7 @@ import com.google.common.util.concurrent.Striped;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.tracing.Tracing;
 
 public class PaxosState
@@ -57,67 +55,92 @@ public class PaxosState
 
     public static PrepareResponse prepare(Commit toPrepare)
     {
-        Lock lock = LOCKS.get(toPrepare.key);
-        lock.lock();
+        long start = System.nanoTime();
         try
         {
-            PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, 
toPrepare.update.metadata());
-            if (toPrepare.isAfter(state.promised))
+            Lock lock = LOCKS.get(toPrepare.key);
+            lock.lock();
+            try
             {
-                Tracing.trace("Promising ballot {}", toPrepare.ballot);
-                SystemKeyspace.savePaxosPromise(toPrepare);
-                return new PrepareResponse(true, state.accepted, 
state.mostRecentCommit);
+                PaxosState state = 
SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
+                if (toPrepare.isAfter(state.promised))
+                {
+                    Tracing.trace("Promising ballot {}", toPrepare.ballot);
+                    SystemKeyspace.savePaxosPromise(toPrepare);
+                    return new PrepareResponse(true, state.accepted, 
state.mostRecentCommit);
+                }
+                else
+                {
+                    Tracing.trace("Promise rejected; {} is not sufficiently 
newer than {}", toPrepare, state.promised);
+                    // return the currently promised ballot (not the last 
accepted one) so the coordinator can make sure it uses newer ballot next time 
(#5667)
+                    return new PrepareResponse(false, state.promised, 
state.mostRecentCommit);
+                }
             }
-            else
+            finally
             {
-                Tracing.trace("Promise rejected; {} is not sufficiently newer 
than {}", toPrepare, state.promised);
-                // return the currently promised ballot (not the last accepted 
one) so the coordinator can make sure it uses newer ballot next time (#5667)
-                return new PrepareResponse(false, state.promised, 
state.mostRecentCommit);
+                lock.unlock();
             }
         }
         finally
         {
-            lock.unlock();
+            
Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepare.addNano(System.nanoTime()
 - start);
         }
+
     }
 
     public static Boolean propose(Commit proposal)
     {
-        Lock lock = LOCKS.get(proposal.key);
-        lock.lock();
+        long start = System.nanoTime();
         try
         {
-            PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, 
proposal.update.metadata());
-            if (proposal.hasBallot(state.promised.ballot) || 
proposal.isAfter(state.promised))
+            Lock lock = LOCKS.get(proposal.key);
+            lock.lock();
+            try
             {
-                Tracing.trace("Accepting proposal {}", proposal);
-                SystemKeyspace.savePaxosProposal(proposal);
-                return true;
+                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, 
proposal.update.metadata());
+                if (proposal.hasBallot(state.promised.ballot) || 
proposal.isAfter(state.promised))
+                {
+                    Tracing.trace("Accepting proposal {}", proposal);
+                    SystemKeyspace.savePaxosProposal(proposal);
+                    return true;
+                }
+                else
+                {
+                    Tracing.trace("Rejecting proposal for {} because 
inProgress is now {}", proposal, state.promised);
+                    return false;
+                }
             }
-            else
+            finally
             {
-                Tracing.trace("Rejecting proposal for {} because inProgress is 
now {}", proposal, state.promised);
-                return false;
+                lock.unlock();
             }
         }
         finally
         {
-            lock.unlock();
+            
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casPropose.addNano(System.nanoTime()
 - start);
         }
     }
 
     public static void commit(Commit proposal)
     {
-        // There is no guarantee we will see commits in the right order, 
because messages
-        // can get delayed, so a proposal can be older than our current most 
recent ballot/commit.
-        // Committing it is however always safe due to column timestamps, so 
always do it. However,
-        // if our current in-progress ballot is strictly greater than the 
proposal one, we shouldn't
-        // erase the in-progress update.
-        Tracing.trace("Committing proposal {}", proposal);
-        Mutation mutation = proposal.makeMutation();
-        Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
+        long start = System.nanoTime();
+        try
+        {
+            // There is no guarantee we will see commits in the right order, 
because messages
+            // can get delayed, so a proposal can be older than our current 
most recent ballot/commit.
+            // Committing it is however always safe due to column timestamps, 
so always do it. However,
+            // if our current in-progress ballot is strictly greater than the 
proposal one, we shouldn't
+            // erase the in-progress update.
+            Tracing.trace("Committing proposal {}", proposal);
+            Mutation mutation = proposal.makeMutation();
+            Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
 
-        // We don't need to lock, we're just blindly updating
-        SystemKeyspace.savePaxosCommit(proposal);
+            // We don't need to lock, we're just blindly updating
+            SystemKeyspace.savePaxosCommit(proposal);
+        }
+        finally
+        {
+            
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime()
 - start);
+        }
     }
 }

Reply via email to