Author: jbellis Date: Wed Dec 22 18:47:09 2010 New Revision: 1052027 URL: http://svn.apache.org/viewvc?rev=1052027&view=rev Log: count timeouts in storageproxy latencies, and include latency histograms in StorageProxyMBean patch by Stu Hood; reviewed by jbellis for CASSANDRA-1893
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1052027&r1=1052026&r2=1052027&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Dec 22 18:47:09 2010 @@ -1,5 +1,7 @@ dev * fix cli crash after backgrounding (CASSANDRA-1875) + * count timeouts in storageproxy latencies, and include latency + histograms in StorageProxyMBean (CASSANDRA-1893) 0.7.0-rc3 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1052027&r1=1052026&r2=1052027&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Wed Dec 22 18:47:09 2010 @@ -176,7 +176,6 @@ public class StorageProxy implements Sto { writeStats.addNano(System.nanoTime() - startTime); } - } private static void addHintHeader(Message message, InetAddress target) throws IOException @@ -217,19 +216,23 @@ public class StorageProxy implements Sto if (StorageService.instance.isBootstrapMode()) throw new UnavailableException(); long startTime = System.nanoTime(); - List<Row> rows; - if (consistency_level == ConsistencyLevel.ONE) + try { - rows = weakRead(commands); + if (consistency_level == ConsistencyLevel.ONE) + { + rows = weakRead(commands); + } + else + { + assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue(); + rows = strongRead(commands, consistency_level); + } } - else + finally { - assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue(); - rows = strongRead(commands, consistency_level); + readStats.addNano(System.nanoTime() - startTime); } - - readStats.addNano(System.nanoTime() - startTime); return rows; } @@ -415,77 +418,82 @@ public class StorageProxy implements Sto if (logger.isDebugEnabled()) logger.debug(command.toString()); long startTime = System.nanoTime(); - - List<AbstractBounds> ranges = getRestrictedRanges(command.range); + List<Row> rows; // now scan until we have enough results - List<Row> rows = new ArrayList<Row>(command.max_keys); - for (AbstractBounds range : ranges) + try { - List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right); - - if (consistency_level == ConsistencyLevel.ONE && liveEndpoints.contains(FBUtilities.getLocalAddress())) + rows = new ArrayList<Row>(command.max_keys); + List<AbstractBounds> ranges = getRestrictedRanges(command.range); + for (AbstractBounds range : ranges) { - if (logger.isDebugEnabled()) - logger.debug("local range slice"); - ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family); - try - { - rows.addAll(cfs.getRangeSlice(command.super_column, - range, - command.max_keys, - QueryFilter.getFilter(command.predicate, cfs.getComparator()))); - } - catch (ExecutionException e) - { - throw new RuntimeException(e.getCause()); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - } - else - { - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints); - RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys); - Message message = c2.getMessage(); - - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints); - AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy(); - QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); - // TODO bail early if live endpoints can't satisfy requested consistency level - for (InetAddress endpoint : liveEndpoints) + List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right); + + if (consistency_level == ConsistencyLevel.ONE && liveEndpoints.contains(FBUtilities.getLocalAddress())) { - MessagingService.instance.sendRR(message, endpoint, handler); if (logger.isDebugEnabled()) - logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint); + logger.debug("local range slice"); + ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family); + try + { + rows.addAll(cfs.getRangeSlice(command.super_column, + range, + command.max_keys, + QueryFilter.getFilter(command.predicate, cfs.getComparator()))); + } + catch (ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } } - // TODO read repair on remaining replicas? - - // if we're done, great, otherwise, move to the next range - try + else { - if (logger.isDebugEnabled()) + DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints); + RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys); + Message message = c2.getMessage(); + + // collect replies and resolve according to consistency level + RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints); + AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy(); + QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); + // TODO bail early if live endpoints can't satisfy requested consistency level + for (InetAddress endpoint : liveEndpoints) { - for (Row row : handler.get()) + MessagingService.instance.sendRR(message, endpoint, handler); + if (logger.isDebugEnabled()) + logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint); + } + // TODO read repair on remaining replicas? + + // if we're done, great, otherwise, move to the next range + try + { + if (logger.isDebugEnabled()) { - logger.debug("range slices read " + row.key); + for (Row row : handler.get()) + { + logger.debug("range slices read " + row.key); + } } + rows.addAll(handler.get()); + } + catch (DigestMismatchException e) + { + throw new AssertionError(e); // no digests in range slices yet } - rows.addAll(handler.get()); - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet } + + if (rows.size() >= command.max_keys) + break; } - - if (rows.size() >= command.max_keys) - break; } - - rangeStats.addNano(System.nanoTime() - startTime); + finally + { + rangeStats.addNano(System.nanoTime() - startTime); + } return rows.size() > command.max_keys ? rows.subList(0, command.max_keys) : rows; } @@ -620,6 +628,16 @@ public class StorageProxy implements Sto return readStats.getRecentLatencyMicros(); } + public long[] getTotalReadLatencyHistogramMicros() + { + return readStats.getTotalLatencyHistogramMicros(); + } + + public long[] getRecentReadLatencyHistogramMicros() + { + return readStats.getRecentLatencyHistogramMicros(); + } + public long getRangeOperations() { return rangeStats.getOpCount(); @@ -635,6 +653,16 @@ public class StorageProxy implements Sto return rangeStats.getRecentLatencyMicros(); } + public long[] getTotalRangeLatencyHistogramMicros() + { + return rangeStats.getTotalLatencyHistogramMicros(); + } + + public long[] getRecentRangeLatencyHistogramMicros() + { + return rangeStats.getRecentLatencyHistogramMicros(); + } + public long getWriteOperations() { return writeStats.getOpCount(); @@ -650,6 +678,16 @@ public class StorageProxy implements Sto return writeStats.getRecentLatencyMicros(); } + public long[] getTotalWriteLatencyHistogramMicros() + { + return writeStats.getTotalLatencyHistogramMicros(); + } + + public long[] getRecentWriteLatencyHistogramMicros() + { + return writeStats.getRecentLatencyHistogramMicros(); + } + public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws IOException, TimeoutException, UnavailableException { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1052027&r1=1052026&r2=1052027&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java Wed Dec 22 18:47:09 2010 @@ -23,14 +23,20 @@ public interface StorageProxyMBean public long getReadOperations(); public long getTotalReadLatencyMicros(); public double getRecentReadLatencyMicros(); + public long[] getTotalReadLatencyHistogramMicros(); + public long[] getRecentReadLatencyHistogramMicros(); public long getRangeOperations(); public long getTotalRangeLatencyMicros(); public double getRecentRangeLatencyMicros(); + public long[] getTotalRangeLatencyHistogramMicros(); + public long[] getRecentRangeLatencyHistogramMicros(); public long getWriteOperations(); public long getTotalWriteLatencyMicros(); public double getRecentWriteLatencyMicros(); + public long[] getTotalWriteLatencyHistogramMicros(); + public long[] getRecentWriteLatencyHistogramMicros(); public boolean getHintedHandoffEnabled(); public void setHintedHandoffEnabled(boolean b);