Emit metrics for CAS Patch by Sankalp Kohli, reviewed by brandonwilliams for CASSANDRA-7341
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/96f433d5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/96f433d5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/96f433d5 Branch: refs/heads/trunk Commit: 96f433d5a2c36c9786525e293b0c6610778fd5b5 Parents: 98bcf40 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Thu Oct 16 11:41:43 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Thu Oct 16 11:41:43 2014 -0500 ---------------------------------------------------------------------- .../cassandra/metrics/ColumnFamilyMetrics.java | 9 ++ .../cassandra/metrics/KeyspaceMetrics.java | 10 ++ .../apache/cassandra/service/StorageProxy.java | 145 ++++++++++++------- .../cassandra/service/paxos/PaxosState.java | 90 +++++++----- 4 files changed, 169 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/96f433d5/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index 3dc269f..a3838a0 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -93,6 +93,12 @@ public class ColumnFamilyMetrics public final ColumnFamilyHistogram tombstoneScannedHistogram; /** Live cells scanned in queries on this CF */ public final ColumnFamilyHistogram liveScannedHistogram; + /** CAS Prepare metrics */ + public final LatencyMetrics casPrepare; + /** CAS Propose metrics */ + public final LatencyMetrics casPropose; + /** CAS Commit metrics */ + public final LatencyMetrics casCommit; public final Timer coordinatorReadLatency; public final Timer coordinatorScanLatency; @@ -444,6 +450,9 @@ public class ColumnFamilyMetrics liveScannedHistogram = createColumnFamilyHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram); coordinatorReadLatency = Metrics.newTimer(factory.createMetricName("CoordinatorReadLatency"), TimeUnit.MICROSECONDS, TimeUnit.SECONDS); coordinatorScanLatency = Metrics.newTimer(factory.createMetricName("CoordinatorScanLatency"), TimeUnit.MICROSECONDS, TimeUnit.SECONDS); + casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare); + casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); + casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit); } public void updateSSTableIterated(int count) http://git-wip-us.apache.org/repos/asf/cassandra/blob/96f433d5/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 7dcf9d5..7a768b8 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -62,6 +62,12 @@ public class KeyspaceMetrics public final Histogram tombstoneScannedHistogram; /** Live cells scanned in queries on this Keyspace */ public final Histogram liveScannedHistogram; + /** CAS Prepare metric */ + public final LatencyMetrics casPrepare; + /** CAS Propose metrics */ + public final LatencyMetrics casPropose; + /** CAS Commit metrics */ + public final LatencyMetrics casCommit; private final MetricNameFactory factory; private Keyspace keyspace; @@ -151,6 +157,10 @@ public class KeyspaceMetrics liveScannedHistogram = Metrics.newHistogram(factory.createMetricName("LiveScannedHistogram"), true); // add manually since histograms do not use createKeyspaceGauge method allMetrics.addAll(Lists.newArrayList("SSTablesPerReadHistogram", "TombstoneScannedHistogram", "LiveScannedHistogram")); + + casPrepare = new LatencyMetrics(factory, "CasPrepare"); + casPropose = new LatencyMetrics(factory, "CasPropose"); + casCommit = new LatencyMetrics(factory, "CasCommit"); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/96f433d5/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 904d602..d8b6619 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -31,6 +31,7 @@ import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.metrics.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +56,6 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.metrics.ClientRequestMetrics; -import org.apache.cassandra.metrics.ReadRepairMetrics; -import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.*; import org.apache.cassandra.sink.SinkManager; @@ -90,6 +88,8 @@ public class StorageProxy implements StorageProxyMBean private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read"); private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice"); private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write"); + private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite"); + private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead"); private StorageProxy() {} @@ -203,62 +203,88 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForCommit) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException { - consistencyForPaxos.validateForCas(); - consistencyForCommit.validateForCasCommit(keyspaceName); - - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - long start = System.nanoTime(); - long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); - while (System.nanoTime() - start < timeout) + int contentions = 0; + try { - // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key, consistencyForPaxos); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; + consistencyForPaxos.validateForCas(); + consistencyForCommit.validateForCasCommit(keyspaceName); - UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit); + CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - // read the current values and check they validate the conditions - Tracing.trace("Reading existing values for CAS precondition"); - long timestamp = System.currentTimeMillis(); - ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, conditions.readFilter()); - List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM); - ColumnFamily current = rows.get(0).cf; - if (!conditions.appliesTo(current)) + long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); + while (System.nanoTime() - start < timeout) { - Tracing.trace("CAS precondition {} does not match current values {}", conditions, current); - // We should not return null as this means success - return current == null ? EmptyColumns.factory.create(metadata) : current; - } + // for simplicity, we'll do a single liveness check at the start of each attempt + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key, consistencyForPaxos); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; - // finish the paxos round w/ the desired updates - // TODO turn null updates into delete? - - // Apply triggers to cas updates. A consideration here is that - // triggers emit RowMutations, and so a given trigger implementation - // may generate mutations for partitions other than the one this - // paxos round is scoped for. In this case, TriggerExecutor will - // validate that the generated mutations are targetted at the same - // partition as the initial updates and reject (via an - // InvalidRequestException) any which aren't. - updates = TriggerExecutor.instance.execute(key, updates); - - Commit proposal = Commit.newProposal(key, ballot, updates); - Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) - { - commitPaxos(proposal, consistencyForCommit); - Tracing.trace("CAS successful"); - return null; + final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true); + final UUID ballot = pair.left; + contentions += pair.right; + + // read the current values and check they validate the conditions + Tracing.trace("Reading existing values for CAS precondition"); + long timestamp = System.currentTimeMillis(); + ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, conditions.readFilter()); + List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM); + ColumnFamily current = rows.get(0).cf; + if (!conditions.appliesTo(current)) + { + Tracing.trace("CAS precondition {} does not match current values {}", conditions, current); + // We should not return null as this means success + casWriteMetrics.conditionNotMet.inc(); + return current == null ? EmptyColumns.factory.create(metadata) : current; + } + + // finish the paxos round w/ the desired updates + // TODO turn null updates into delete? + + // Apply triggers to cas updates. A consideration here is that + // triggers emit RowMutations, and so a given trigger implementation + // may generate mutations for partitions other than the one this + // paxos round is scoped for. In this case, TriggerExecutor will + // validate that the generated mutations are targetted at the same + // partition as the initial updates and reject (via an + // InvalidRequestException) any which aren't. + updates = TriggerExecutor.instance.execute(key, updates); + + Commit proposal = Commit.newProposal(key, ballot, updates); + Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); + if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) + { + commitPaxos(proposal, consistencyForCommit); + Tracing.trace("CAS successful"); + return null; + } + + Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); + contentions++; + Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100), TimeUnit.MILLISECONDS); + // continue to retry } - Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); - Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100), TimeUnit.MILLISECONDS); - // continue to retry + throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); + } + catch (WriteTimeoutException|ReadTimeoutException e) + { + casWriteMetrics.timeouts.mark(); + throw e; + } + catch(UnavailableException e) + { + casWriteMetrics.unavailables.mark(); + throw e; + } + finally + { + if(contentions > 0) + casWriteMetrics.contention.update(contentions); + + casWriteMetrics.addNano(System.nanoTime() - start); } - throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); } private static Predicate<InetAddress> sameDCPredicateFor(final String dc) @@ -299,12 +325,13 @@ public class StorageProxy implements StorageProxyMBean * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static UUID beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit) + private static Pair<UUID, Integer> beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, final boolean isWrite) throws WriteTimeoutException { long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); PrepareCallback summary = null; + int contentions = 0; while (System.nanoTime() - start < timeout) { long ballotMillis = summary == null @@ -319,6 +346,7 @@ public class StorageProxy implements StorageProxyMBean if (!summary.promised) { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); + contentions++; // sleep a random amount to give the other proposer a chance to finish Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100), TimeUnit.MILLISECONDS); continue; @@ -332,6 +360,10 @@ public class StorageProxy implements StorageProxyMBean if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) { Tracing.trace("Finishing incomplete paxos round {}", inProgress); + if(isWrite) + casWriteMetrics.unfinishedCommit.inc(); + else + casReadMetrics.unfinishedCommit.inc(); Commit refreshedInProgress = Commit.newProposal(inProgress.key, ballot, inProgress.update); if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos)) { @@ -341,6 +373,7 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting"); // sleep a random amount to give the other proposer a chance to finish + contentions++; Uninterruptibles.sleepUninterruptibly(FBUtilities.threadLocalRandom().nextInt(100), TimeUnit.MILLISECONDS); } continue; @@ -362,7 +395,7 @@ public class StorageProxy implements StorageProxyMBean continue; } - return ballot; + return Pair.create(ballot, contentions); } throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName))); @@ -1151,7 +1184,9 @@ public class StorageProxy implements StorageProxyMBean final ConsistencyLevel consistencyForCommitOrFetch = consistency_level == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; try { - beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistency_level, consistencyForCommitOrFetch); + final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistency_level, consistencyForCommitOrFetch, false); + if(pair.right > 0) + casReadMetrics.contention.update(pair.right); } catch (WriteTimeoutException e) { @@ -1169,18 +1204,24 @@ public class StorageProxy implements StorageProxyMBean { readMetrics.unavailables.mark(); ClientRequestMetrics.readUnavailables.inc(); + if(consistency_level.isSerialConsistency()) + casReadMetrics.unavailables.mark(); throw e; } catch (ReadTimeoutException e) { readMetrics.timeouts.mark(); ClientRequestMetrics.readTimeouts.inc(); + if(consistency_level.isSerialConsistency()) + casReadMetrics.timeouts.mark(); throw e; } finally { long latency = System.nanoTime() - start; readMetrics.addNano(latency); + if(consistency_level.isSerialConsistency()) + casReadMetrics.addNano(latency); // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 for (ReadCommand command : commands) Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/96f433d5/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index ff0b02c..0196122 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -69,55 +69,79 @@ public class PaxosState public static PrepareResponse prepare(Commit toPrepare) { - synchronized (lockFor(toPrepare.key)) + long start = System.nanoTime(); + try { - PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); - if (toPrepare.isAfter(state.promised)) + synchronized (lockFor(toPrepare.key)) { - Tracing.trace("Promising ballot {}", toPrepare.ballot); - SystemKeyspace.savePaxosPromise(toPrepare); - return new PrepareResponse(true, state.accepted, state.mostRecentCommit); - } - else - { - Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised); - // return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667) - return new PrepareResponse(false, state.promised, state.mostRecentCommit); + PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); + if (toPrepare.isAfter(state.promised)) + { + Tracing.trace("Promising ballot {}", toPrepare.ballot); + SystemKeyspace.savePaxosPromise(toPrepare); + return new PrepareResponse(true, state.accepted, state.mostRecentCommit); + } + else + { + Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised); + // return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667) + return new PrepareResponse(false, state.promised, state.mostRecentCommit); + } } } + finally + { + Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepare.addNano(System.nanoTime() - start); + } } public static Boolean propose(Commit proposal) { - synchronized (lockFor(proposal.key)) + long start = System.nanoTime(); + try { - PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); - if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) - { - Tracing.trace("Accepting proposal {}", proposal); - SystemKeyspace.savePaxosProposal(proposal); - return true; - } - else + synchronized (lockFor(proposal.key)) { - Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised); - return false; + PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); + if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) + { + Tracing.trace("Accepting proposal {}", proposal); + SystemKeyspace.savePaxosProposal(proposal); + return true; + } + else + { + Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised); + return false; + } } } + finally + { + Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casPropose.addNano(System.nanoTime() - start); + } } public static void commit(Commit proposal) { - // There is no guarantee we will see commits in the right order, because messages - // can get delayed, so a proposal can be older than our current most recent ballot/commit. - // Committing it is however always safe due to column timestamps, so always do it. However, - // if our current in-progress ballot is strictly greater than the proposal one, we shouldn't - // erase the in-progress update. - Tracing.trace("Committing proposal {}", proposal); - RowMutation rm = proposal.makeMutation(); - Keyspace.open(rm.getKeyspaceName()).apply(rm, true); + long start = System.nanoTime(); + try + { + // There is no guarantee we will see commits in the right order, because messages + // can get delayed, so a proposal can be older than our current most recent ballot/commit. + // Committing it is however always safe due to column timestamps, so always do it. However, + // if our current in-progress ballot is strictly greater than the proposal one, we shouldn't + // erase the in-progress update. + Tracing.trace("Committing proposal {}", proposal); + RowMutation rm = proposal.makeMutation(); + Keyspace.open(rm.getKeyspaceName()).apply(rm, true); - // We don't need to lock, we're just blindly updating - SystemKeyspace.savePaxosCommit(proposal); + // We don't need to lock, we're just blindly updating + SystemKeyspace.savePaxosCommit(proposal); + } + finally + { + Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime() - start); + } } }