Author: jbellis
Date: Wed Dec 22 18:47:09 2010
New Revision: 1052027

URL: http://svn.apache.org/viewvc?rev=1052027&view=rev
Log:
count timeouts in storageproxy latencies, and include latency
histograms in StorageProxyMBean
patch by Stu Hood; reviewed by jbellis for CASSANDRA-1893

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1052027&r1=1052026&r2=1052027&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Dec 22 18:47:09 2010
@@ -1,5 +1,7 @@
 dev
  * fix cli crash after backgrounding (CASSANDRA-1875)
+ * count timeouts in storageproxy latencies, and include latency 
+   histograms in StorageProxyMBean (CASSANDRA-1893)
 
 
 0.7.0-rc3

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1052027&r1=1052026&r2=1052027&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Wed Dec 22 18:47:09 2010
@@ -176,7 +176,6 @@ public class StorageProxy implements Sto
         {
             writeStats.addNano(System.nanoTime() - startTime);
         }
-
     }
 
     private static void addHintHeader(Message message, InetAddress target) 
throws IOException
@@ -217,19 +216,23 @@ public class StorageProxy implements Sto
         if (StorageService.instance.isBootstrapMode())
             throw new UnavailableException();
         long startTime = System.nanoTime();
-
         List<Row> rows;
-        if (consistency_level == ConsistencyLevel.ONE)
+        try
         {
-            rows = weakRead(commands);
+            if (consistency_level == ConsistencyLevel.ONE)
+            {
+                rows = weakRead(commands);
+            }
+            else
+            {
+                assert consistency_level.getValue() >= 
ConsistencyLevel.QUORUM.getValue();
+                rows = strongRead(commands, consistency_level);
+            }
         }
-        else
+        finally
         {
-            assert consistency_level.getValue() >= 
ConsistencyLevel.QUORUM.getValue();
-            rows = strongRead(commands, consistency_level);
+            readStats.addNano(System.nanoTime() - startTime);
         }
-
-        readStats.addNano(System.nanoTime() - startTime);
         return rows;
     }
 
@@ -415,77 +418,82 @@ public class StorageProxy implements Sto
         if (logger.isDebugEnabled())
             logger.debug(command.toString());
         long startTime = System.nanoTime();
-
-        List<AbstractBounds> ranges = getRestrictedRanges(command.range);
+        List<Row> rows;
         // now scan until we have enough results
-        List<Row> rows = new ArrayList<Row>(command.max_keys);
-        for (AbstractBounds range : ranges)
+        try
         {
-            List<InetAddress> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
-
-            if (consistency_level == ConsistencyLevel.ONE && 
liveEndpoints.contains(FBUtilities.getLocalAddress())) 
+            rows = new ArrayList<Row>(command.max_keys);
+            List<AbstractBounds> ranges = getRestrictedRanges(command.range);
+            for (AbstractBounds range : ranges)
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("local range slice");
-                ColumnFamilyStore cfs = 
Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-                try 
-                {
-                    rows.addAll(cfs.getRangeSlice(command.super_column,
-                                                  range,
-                                                  command.max_keys,
-                                                  
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
-                } 
-                catch (ExecutionException e) 
-                {
-                    throw new RuntimeException(e.getCause());
-                } 
-                catch (InterruptedException e) 
-                {
-                    throw new AssertionError(e);
-                }           
-            }
-            else 
-            {
-                
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
 liveEndpoints);
-                RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, 
command.column_family, command.super_column, command.predicate, range, 
command.max_keys);
-                Message message = c2.getMessage();
-
-                // collect replies and resolve according to consistency level
-                RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-                AbstractReplicationStrategy rs = 
Table.open(command.keyspace).getReplicationStrategy();
-                QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
-                // TODO bail early if live endpoints can't satisfy requested 
consistency level
-                for (InetAddress endpoint : liveEndpoints) 
+                List<InetAddress> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
+
+                if (consistency_level == ConsistencyLevel.ONE && 
liveEndpoints.contains(FBUtilities.getLocalAddress())) 
                 {
-                    MessagingService.instance.sendRR(message, endpoint, 
handler);
                     if (logger.isDebugEnabled())
-                        logger.debug("reading " + c2 + " from " + 
message.getMessageId() + "@" + endpoint);
+                        logger.debug("local range slice");
+                    ColumnFamilyStore cfs = 
Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
+                    try 
+                    {
+                        rows.addAll(cfs.getRangeSlice(command.super_column,
+                                                    range,
+                                                    command.max_keys,
+                                                    
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+                    } 
+                    catch (ExecutionException e) 
+                    {
+                        throw new RuntimeException(e.getCause());
+                    } 
+                    catch (InterruptedException e) 
+                    {
+                        throw new AssertionError(e);
+                    }           
                 }
-                // TODO read repair on remaining replicas?
-
-                // if we're done, great, otherwise, move to the next range
-                try 
+                else 
                 {
-                    if (logger.isDebugEnabled()) 
+                    
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
 liveEndpoints);
+                    RangeSliceCommand c2 = new 
RangeSliceCommand(command.keyspace, command.column_family, 
command.super_column, command.predicate, range, command.max_keys);
+                    Message message = c2.getMessage();
+
+                    // collect replies and resolve according to consistency 
level
+                    RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
+                    AbstractReplicationStrategy rs = 
Table.open(command.keyspace).getReplicationStrategy();
+                    QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
+                    // TODO bail early if live endpoints can't satisfy 
requested consistency level
+                    for (InetAddress endpoint : liveEndpoints) 
                     {
-                        for (Row row : handler.get()) 
+                        MessagingService.instance.sendRR(message, endpoint, 
handler);
+                        if (logger.isDebugEnabled())
+                            logger.debug("reading " + c2 + " from " + 
message.getMessageId() + "@" + endpoint);
+                    }
+                    // TODO read repair on remaining replicas?
+
+                    // if we're done, great, otherwise, move to the next range
+                    try 
+                    {
+                        if (logger.isDebugEnabled()) 
                         {
-                            logger.debug("range slices read " + row.key);
+                            for (Row row : handler.get()) 
+                            {
+                                logger.debug("range slices read " + row.key);
+                            }
                         }
+                        rows.addAll(handler.get());
+                    } 
+                    catch (DigestMismatchException e) 
+                    {
+                        throw new AssertionError(e); // no digests in range 
slices yet
                     }
-                    rows.addAll(handler.get());
-                } 
-                catch (DigestMismatchException e) 
-                {
-                    throw new AssertionError(e); // no digests in range slices 
yet
                 }
+            
+                if (rows.size() >= command.max_keys)
+                    break;
             }
-          
-            if (rows.size() >= command.max_keys)
-                break;
         }
-
-        rangeStats.addNano(System.nanoTime() - startTime);
+        finally
+        {
+            rangeStats.addNano(System.nanoTime() - startTime);
+        }
         return rows.size() > command.max_keys ? rows.subList(0, 
command.max_keys) : rows;
     }
 
@@ -620,6 +628,16 @@ public class StorageProxy implements Sto
         return readStats.getRecentLatencyMicros();
     }
 
+    public long[] getTotalReadLatencyHistogramMicros()
+    {
+        return readStats.getTotalLatencyHistogramMicros();
+    }
+
+    public long[] getRecentReadLatencyHistogramMicros()
+    {
+        return readStats.getRecentLatencyHistogramMicros();
+    }
+
     public long getRangeOperations()
     {
         return rangeStats.getOpCount();
@@ -635,6 +653,16 @@ public class StorageProxy implements Sto
         return rangeStats.getRecentLatencyMicros();
     }
 
+    public long[] getTotalRangeLatencyHistogramMicros()
+    {
+        return rangeStats.getTotalLatencyHistogramMicros();
+    }
+
+    public long[] getRecentRangeLatencyHistogramMicros()
+    {
+        return rangeStats.getRecentLatencyHistogramMicros();
+    }
+
     public long getWriteOperations()
     {
         return writeStats.getOpCount();
@@ -650,6 +678,16 @@ public class StorageProxy implements Sto
         return writeStats.getRecentLatencyMicros();
     }
 
+    public long[] getTotalWriteLatencyHistogramMicros()
+    {
+        return writeStats.getTotalLatencyHistogramMicros();
+    }
+
+    public long[] getRecentWriteLatencyHistogramMicros()
+    {
+        return writeStats.getRecentLatencyHistogramMicros();
+    }
+
     public static List<Row> scan(String keyspace, String column_family, 
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel 
consistency_level)
     throws IOException, TimeoutException, UnavailableException
     {

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1052027&r1=1052026&r2=1052027&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java
 Wed Dec 22 18:47:09 2010
@@ -23,14 +23,20 @@ public interface StorageProxyMBean
     public long getReadOperations();
     public long getTotalReadLatencyMicros();
     public double getRecentReadLatencyMicros();
+    public long[] getTotalReadLatencyHistogramMicros();
+    public long[] getRecentReadLatencyHistogramMicros();
 
     public long getRangeOperations();
     public long getTotalRangeLatencyMicros();
     public double getRecentRangeLatencyMicros();
+    public long[] getTotalRangeLatencyHistogramMicros();
+    public long[] getRecentRangeLatencyHistogramMicros();
 
     public long getWriteOperations();
     public long getTotalWriteLatencyMicros();
     public double getRecentWriteLatencyMicros();
+    public long[] getTotalWriteLatencyHistogramMicros();
+    public long[] getRecentWriteLatencyHistogramMicros();
 
     public boolean getHintedHandoffEnabled();
     public void setHintedHandoffEnabled(boolean b);


Reply via email to