Emit metrics for CAS Patch by Sankalp Kohli, reviewed by brandonwilliams for CASSANDRA-7341
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ca16e5be Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ca16e5be Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ca16e5be Branch: refs/heads/trunk Commit: ca16e5be37cd6eff83a86e141ff6201fd11767ef Parents: 432b731 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Thu Oct 16 11:43:13 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Thu Oct 16 11:43:13 2014 -0500 ---------------------------------------------------------------------- .../cassandra/metrics/ColumnFamilyMetrics.java | 10 ++ .../cassandra/metrics/KeyspaceMetrics.java | 10 ++ .../apache/cassandra/service/StorageProxy.java | 149 ++++++++++++------- .../cassandra/service/paxos/PaxosState.java | 95 +++++++----- 4 files changed, 172 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index 75a21dc..8ab432e 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -109,6 +109,12 @@ public class ColumnFamilyMetrics public final Counter rowCacheHit; /** Number of row cache misses */ public final Counter rowCacheMiss; + /** CAS Prepare metrics */ + public final LatencyMetrics casPrepare; + /** CAS Propose metrics */ + public final LatencyMetrics casPropose; + /** CAS Commit metrics */ + public final LatencyMetrics casCommit; public final Timer coordinatorReadLatency; public final Timer coordinatorScanLatency; @@ -505,6 +511,10 @@ public class ColumnFamilyMetrics rowCacheHitOutOfRange = createColumnFamilyCounter("RowCacheHitOutOfRange"); rowCacheHit = createColumnFamilyCounter("RowCacheHit"); rowCacheMiss = createColumnFamilyCounter("RowCacheMiss"); + + casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare); + casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); + casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit); } public void updateSSTableIterated(int count) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 92fabf1..6fa64e9 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -70,6 +70,12 @@ public class KeyspaceMetrics public final Histogram tombstoneScannedHistogram; /** Live cells scanned in queries on this Keyspace */ public final Histogram liveScannedHistogram; + /** CAS Prepare metric */ + public final LatencyMetrics casPrepare; + /** CAS Propose metrics */ + public final LatencyMetrics casPropose; + /** CAS Commit metrics */ + public final LatencyMetrics casCommit; public final MetricNameFactory factory; private Keyspace keyspace; @@ -187,6 +193,10 @@ public class KeyspaceMetrics liveScannedHistogram = Metrics.newHistogram(factory.createMetricName("LiveScannedHistogram"), true); // add manually since histograms do not use createKeyspaceGauge method allMetrics.addAll(Lists.newArrayList("SSTablesPerReadHistogram", "TombstoneScannedHistogram", "LiveScannedHistogram")); + + casPrepare = new LatencyMetrics(factory, "CasPrepare"); + casPropose = new LatencyMetrics(factory, "CasPropose"); + casCommit = new LatencyMetrics(factory, "CasCommit"); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index d9602bb..f30862b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -31,7 +31,7 @@ import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.util.concurrent.Uninterruptibles; -import net.nicoulaj.compilecommand.annotations.Inline; +import org.apache.cassandra.metrics.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,9 +58,6 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.metrics.ClientRequestMetrics; -import org.apache.cassandra.metrics.ReadRepairMetrics; -import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.*; import org.apache.cassandra.sink.SinkManager; @@ -93,6 +90,8 @@ public class StorageProxy implements StorageProxyMBean private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read"); private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice"); private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write"); + private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite"); + private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead"); private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10; @@ -205,63 +204,86 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForCommit) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException { - consistencyForPaxos.validateForCas(); - consistencyForCommit.validateForCasCommit(keyspaceName); - - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - - long start = System.nanoTime(); - long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); - while (System.nanoTime() - start < timeout) + final long start = System.nanoTime(); + int contentions = 0; + try { - // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key, consistencyForPaxos); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; + consistencyForPaxos.validateForCas(); + consistencyForCommit.validateForCasCommit(keyspaceName); - UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit); + CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - // read the current values and check they validate the conditions - Tracing.trace("Reading existing values for CAS precondition"); - long timestamp = System.currentTimeMillis(); - ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, request.readFilter()); - List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM); - ColumnFamily current = rows.get(0).cf; - if (!request.appliesTo(current)) + long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); + while (System.nanoTime() - start < timeout) { - Tracing.trace("CAS precondition does not match current values {}", current); - // We should not return null as this means success - return current == null ? ArrayBackedSortedColumns.factory.create(metadata) : current; - } + // for simplicity, we'll do a single liveness check at the start of each attempt + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key, consistencyForPaxos); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; - // finish the paxos round w/ the desired updates - // TODO turn null updates into delete? - ColumnFamily updates = request.makeUpdates(current); - - // Apply triggers to cas updates. A consideration here is that - // triggers emit Mutations, and so a given trigger implementation - // may generate mutations for partitions other than the one this - // paxos round is scoped for. In this case, TriggerExecutor will - // validate that the generated mutations are targetted at the same - // partition as the initial updates and reject (via an - // InvalidRequestException) any which aren't. - updates = TriggerExecutor.instance.execute(key, updates); - - Commit proposal = Commit.newProposal(key, ballot, updates); - Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) - { - commitPaxos(proposal, consistencyForCommit); - Tracing.trace("CAS successful"); - return null; + final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true); + final UUID ballot = pair.left; + contentions += pair.right; + // read the current values and check they validate the conditions + Tracing.trace("Reading existing values for CAS precondition"); + long timestamp = System.currentTimeMillis(); + ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, request.readFilter()); + List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM); + ColumnFamily current = rows.get(0).cf; + if (!request.appliesTo(current)) + { + Tracing.trace("CAS precondition does not match current values {}", current); + // We should not return null as this means success + casWriteMetrics.conditionNotMet.inc(); + return current == null ? ArrayBackedSortedColumns.factory.create(metadata) : current; + } + + // finish the paxos round w/ the desired updates + // TODO turn null updates into delete? + ColumnFamily updates = request.makeUpdates(current); + + // Apply triggers to cas updates. A consideration here is that + // triggers emit Mutations, and so a given trigger implementation + // may generate mutations for partitions other than the one this + // paxos round is scoped for. In this case, TriggerExecutor will + // validate that the generated mutations are targetted at the same + // partition as the initial updates and reject (via an + // InvalidRequestException) any which aren't. + updates = TriggerExecutor.instance.execute(key, updates); + + Commit proposal = Commit.newProposal(key, ballot, updates); + Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); + if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) + { + commitPaxos(proposal, consistencyForCommit); + Tracing.trace("CAS successful"); + return null; + } + + Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); + contentions++; + Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); + // continue to retry } - Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); - Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); - // continue to retry + throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); + } + catch (WriteTimeoutException|ReadTimeoutException e) + { + casWriteMetrics.timeouts.mark(); + throw e; + } + catch(UnavailableException e) + { + casWriteMetrics.unavailables.mark(); + throw e; + } + finally + { + if(contentions > 0) + casWriteMetrics.contention.update(contentions); + casWriteMetrics.addNano(System.nanoTime() - start); } - - throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); } private static Predicate<InetAddress> sameDCPredicateFor(final String dc) @@ -302,12 +324,13 @@ public class StorageProxy implements StorageProxyMBean * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static UUID beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit) + private static Pair<UUID, Integer> beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, final boolean isWrite) throws WriteTimeoutException { long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); PrepareCallback summary = null; + int contentions = 0; while (System.nanoTime() - start < timeout) { long ballotMillis = summary == null @@ -322,6 +345,7 @@ public class StorageProxy implements StorageProxyMBean if (!summary.promised) { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); + contentions++; // sleep a random amount to give the other proposer a chance to finish Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); continue; @@ -335,6 +359,10 @@ public class StorageProxy implements StorageProxyMBean if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) { Tracing.trace("Finishing incomplete paxos round {}", inProgress); + if(isWrite) + casWriteMetrics.unfinishedCommit.inc(); + else + casReadMetrics.unfinishedCommit.inc(); Commit refreshedInProgress = Commit.newProposal(inProgress.key, ballot, inProgress.update); if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos)) { @@ -344,6 +372,7 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); // sleep a random amount to give the other proposer a chance to finish + contentions++; Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); } continue; @@ -365,7 +394,7 @@ public class StorageProxy implements StorageProxyMBean continue; } - return ballot; + return Pair.create(ballot, contentions); } throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName))); @@ -1139,7 +1168,9 @@ public class StorageProxy implements StorageProxyMBean final ConsistencyLevel consistencyForCommitOrFetch = consistency_level == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; try { - beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistency_level, consistencyForCommitOrFetch); + final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistency_level, consistencyForCommitOrFetch, false); + if(pair.right > 0) + casReadMetrics.contention.update(pair.right); } catch (WriteTimeoutException e) { @@ -1157,18 +1188,24 @@ public class StorageProxy implements StorageProxyMBean { readMetrics.unavailables.mark(); ClientRequestMetrics.readUnavailables.inc(); + if(consistency_level.isSerialConsistency()) + casReadMetrics.unavailables.mark(); throw e; } catch (ReadTimeoutException e) { readMetrics.timeouts.mark(); ClientRequestMetrics.readTimeouts.inc(); + if(consistency_level.isSerialConsistency()) + casReadMetrics.timeouts.mark(); throw e; } finally { long latency = System.nanoTime() - start; readMetrics.addNano(latency); + if(consistency_level.isSerialConsistency()) + casReadMetrics.addNano(latency); // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 for (ReadCommand command : commands) Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca16e5be/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index df7365d..abd173c 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -27,9 +27,7 @@ import com.google.common.util.concurrent.Striped; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.*; import org.apache.cassandra.tracing.Tracing; public class PaxosState @@ -57,67 +55,92 @@ public class PaxosState public static PrepareResponse prepare(Commit toPrepare) { - Lock lock = LOCKS.get(toPrepare.key); - lock.lock(); + long start = System.nanoTime(); try { - PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); - if (toPrepare.isAfter(state.promised)) + Lock lock = LOCKS.get(toPrepare.key); + lock.lock(); + try { - Tracing.trace("Promising ballot {}", toPrepare.ballot); - SystemKeyspace.savePaxosPromise(toPrepare); - return new PrepareResponse(true, state.accepted, state.mostRecentCommit); + PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); + if (toPrepare.isAfter(state.promised)) + { + Tracing.trace("Promising ballot {}", toPrepare.ballot); + SystemKeyspace.savePaxosPromise(toPrepare); + return new PrepareResponse(true, state.accepted, state.mostRecentCommit); + } + else + { + Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised); + // return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667) + return new PrepareResponse(false, state.promised, state.mostRecentCommit); + } } - else + finally { - Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised); - // return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667) - return new PrepareResponse(false, state.promised, state.mostRecentCommit); + lock.unlock(); } } finally { - lock.unlock(); + Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepare.addNano(System.nanoTime() - start); } + } public static Boolean propose(Commit proposal) { - Lock lock = LOCKS.get(proposal.key); - lock.lock(); + long start = System.nanoTime(); try { - PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); - if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) + Lock lock = LOCKS.get(proposal.key); + lock.lock(); + try { - Tracing.trace("Accepting proposal {}", proposal); - SystemKeyspace.savePaxosProposal(proposal); - return true; + PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); + if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) + { + Tracing.trace("Accepting proposal {}", proposal); + SystemKeyspace.savePaxosProposal(proposal); + return true; + } + else + { + Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised); + return false; + } } - else + finally { - Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised); - return false; + lock.unlock(); } } finally { - lock.unlock(); + Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casPropose.addNano(System.nanoTime() - start); } } public static void commit(Commit proposal) { - // There is no guarantee we will see commits in the right order, because messages - // can get delayed, so a proposal can be older than our current most recent ballot/commit. - // Committing it is however always safe due to column timestamps, so always do it. However, - // if our current in-progress ballot is strictly greater than the proposal one, we shouldn't - // erase the in-progress update. - Tracing.trace("Committing proposal {}", proposal); - Mutation mutation = proposal.makeMutation(); - Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true); + long start = System.nanoTime(); + try + { + // There is no guarantee we will see commits in the right order, because messages + // can get delayed, so a proposal can be older than our current most recent ballot/commit. + // Committing it is however always safe due to column timestamps, so always do it. However, + // if our current in-progress ballot is strictly greater than the proposal one, we shouldn't + // erase the in-progress update. + Tracing.trace("Committing proposal {}", proposal); + Mutation mutation = proposal.makeMutation(); + Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true); - // We don't need to lock, we're just blindly updating - SystemKeyspace.savePaxosCommit(proposal); + // We don't need to lock, we're just blindly updating + SystemKeyspace.savePaxosCommit(proposal); + } + finally + { + Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime() - start); + } } }