Author: jbellis
Date: Tue Jul 27 03:32:43 2010
New Revision: 979511

URL: http://svn.apache.org/viewvc?rev=979511&view=rev
Log:
process digest mismatch re-reads in parallel,  patch by jbellis; reviewed by 
brandonwilliams for CASSANDRA-1323

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=979511&r1=979510&r2=979511&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Jul 27 03:32:43 2010
@@ -24,7 +24,7 @@
    when determining whether to do local read for CL.ONE (CASSANDRA-1317)
  * fix read repair to use requested consistency level on digest mismatch,
    rather than assuming QUORUM (CASSANDRA-1316)
-
+ * process digest mismatch re-reads in parallel (CASSANDRA-1323)
 
 
 0.6.3

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=979511&r1=979510&r2=979511&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 Tue Jul 27 03:32:43 2010
@@ -69,8 +69,8 @@ public class ReadResponseResolver implem
        public Row resolve(Collection<Message> responses) throws 
DigestMismatchException, IOException
     {
         long startTime = System.currentTimeMillis();
-               List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
-               List<InetAddress> endPoints = new ArrayList<InetAddress>();
+               List<ColumnFamily> versions = new 
ArrayList<ColumnFamily>(responses.size());
+               List<InetAddress> endPoints = new 
ArrayList<InetAddress>(responses.size());
                String key = null;
                byte[] digest = new byte[0];
                boolean isDigestQuery = false;

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=979511&r1=979510&r2=979511&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 Tue Jul 27 03:32:43 2010
@@ -445,8 +445,7 @@ public class StorageProxy implements Sto
         List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
         List<Row> rows = new ArrayList<Row>();
 
-        int commandIndex = 0;
-
+        // send out read requests
         for (ReadCommand command: commands)
         {
             assert !command.isDigestQuery();
@@ -481,10 +480,13 @@ public class StorageProxy implements Sto
             commandEndPoints.add(endPoints);
         }
 
-        for (QuorumResponseHandler<Row> quorumResponseHandler: 
quorumResponseHandlers)
+        // read results and make a second pass for any digest mismatches
+        List<QuorumResponseHandler<Row>> repairResponseHandlers = null;
+        for (int i = 0; i < commands.size(); i++)
         {
+            QuorumResponseHandler<Row> quorumResponseHandler = 
quorumResponseHandlers.get(i);
             Row row;
-            ReadCommand command = commands.get(commandIndex);
+            ReadCommand command = commands.get(i);
             try
             {
                 long startTime2 = System.currentTimeMillis();
@@ -502,23 +504,32 @@ public class StorageProxy implements Sto
                     if (logger.isDebugEnabled())
                         logger.debug("Digest mismatch:", ex);
                     int responseCount = 
determineBlockFor(DatabaseDescriptor.getReplicationFactor(command.table), 
consistency_level);
-                    IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver(command.table, responseCount);
-                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = 
new QuorumResponseHandler<Row>(responseCount, readResponseResolverRepair);
+                    QuorumResponseHandler<Row> qrhRepair = new 
QuorumResponseHandler<Row>(responseCount, new 
ReadResponseResolver(command.table, responseCount));
                     Message messageRepair = command.makeReadMessage();
-                    MessagingService.instance.sendRR(messageRepair, 
commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
-                    try
-                    {
-                        row = quorumResponseHandlerRepair.get();
-                        if (row != null)
-                            rows.add(row);
-                    }
-                    catch (DigestMismatchException e)
-                    {
-                        throw new AssertionError(e); // full data requested 
from each node here, no digests should be sent
-                    }
+                    MessagingService.instance.sendRR(messageRepair, 
commandEndPoints.get(i), qrhRepair);
+                    if (repairResponseHandlers == null)
+                        repairResponseHandlers = new 
ArrayList<QuorumResponseHandler<Row>>();
+                    repairResponseHandlers.add(qrhRepair);
+                }
+            }
+        }
+
+        // read the results for the digest mismatch retries
+        if (repairResponseHandlers != null)
+        {
+            for (QuorumResponseHandler<Row> handler : repairResponseHandlers)
+            {
+                try
+                {
+                    Row row = handler.get();
+                    if (row != null)
+                        rows.add(row);
+                }
+                catch (DigestMismatchException e)
+                {
+                    throw new AssertionError(e); // full data requested from 
each node here, no digests should be sent
                 }
             }
-            commandIndex++;
         }
 
         return rows;


Reply via email to