Author: jbellis
Date: Thu Apr  1 01:39:01 2010
New Revision: 929768

URL: http://svn.apache.org/viewvc?rev=929768&view=rev
Log:
keep the replica set constant throughout the read repair process.  patch by 
jbellis; reviewed by gdusbabek for CASSANDRA-937

Modified:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=929768&r1=929767&r2=929768&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
 Thu Apr  1 01:39:01 2010
@@ -97,9 +97,7 @@ public class ReadVerbHandler implements 
             if (message.getHeader(ReadCommand.DO_REPAIR) != null)
             {
                 List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-                /* Remove the local storage endpoint from the list. */
-                endpoints.remove(FBUtilities.getLocalAddress());
-                if (endpoints.size() > 0)
+                if (endpoints.size() > 1)
                     StorageService.instance.doConsistencyCheck(row, endpoints, 
command);
             }
         }

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929768&r1=929767&r2=929768&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 Thu Apr  1 01:39:01 2010
@@ -31,10 +31,13 @@ import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -45,19 +48,18 @@ import org.apache.cassandra.utils.FBUtil
 class ConsistencyChecker implements Runnable
 {
        private static Logger logger_ = 
Logger.getLogger(ConsistencyChecker.class);
-    private static long scheduledTimeMillis_ = 600;
-    private static ExpiringMap<String, String> readRepairTable_ = new 
ExpiringMap<String, String>(scheduledTimeMillis_);
+    private static ExpiringMap<String, String> readRepairTable_ = new 
ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout());
 
     private final String table_;
     private final Row row_;
     protected final List<InetAddress> replicas_;
     private final ReadCommand readCommand_;
 
-    public ConsistencyChecker(String table, Row row, List<InetAddress> 
replicas, ReadCommand readCommand)
+    public ConsistencyChecker(String table, Row row, List<InetAddress> 
endpoints, ReadCommand readCommand)
     {
         table_ = table;
         row_ = row;
-        replicas_ = replicas;
+        replicas_ = endpoints;
         readCommand_ = readCommand;
     }
 
@@ -69,7 +71,13 @@ class ConsistencyChecker implements Runn
                        Message message = 
readCommandDigestOnly.makeReadMessage();
             if (logger_.isDebugEnabled())
               logger_.debug("Reading consistency digest for " + 
readCommand_.key + " from " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");
-            MessagingService.instance.sendRR(message, replicas_.toArray(new 
InetAddress[replicas_.size()]), new DigestResponseHandler());
+
+            MessagingService.instance.addCallback(new DigestResponseHandler(), 
message.getMessageId());
+            for (InetAddress endpoint : replicas_)
+            {
+                if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                    MessagingService.instance.sendOneWay(message, endpoint);
+            }
                }
                catch (IOException ex)
                {
@@ -86,48 +94,49 @@ class ConsistencyChecker implements Runn
 
     class DigestResponseHandler implements IAsyncCallback
        {
-               Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
+        private boolean repairInvoked;
 
-        // syncronized so "size() == " works
-               public synchronized void response(Message msg)
+               public synchronized void response(Message response)
                {
-                       responses_.add(msg);
-            if (responses_.size() != ConsistencyChecker.this.replicas_.size())
+            if (repairInvoked)
                 return;
 
-            for (Message response : responses_)
+            try
             {
-                try
+                byte[] body = response.getMessageBody();
+                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+                ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+                byte[] digest = result.digest();
+
+                if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    byte[] body = response.getMessageBody();
-                    ByteArrayInputStream bufIn = new 
ByteArrayInputStream(body);
-                    ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                    byte[] digest = result.digest();
-                    if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+                    IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
+                    IAsyncCallback responseHandler;
+                    if (replicas_.contains(FBUtilities.getLocalAddress()))
+                        responseHandler = new DataRepairHandler(row_, 
replicas_.size(), readResponseResolver);
+                    else
+                        responseHandler = new 
DataRepairHandler(replicas_.size(), readResponseResolver);
+
+                    ReadCommand readCommand = constructReadMessage(false);
+                    Message message = readCommand.makeReadMessage();
+                    if (logger_.isDebugEnabled())
+                      logger_.debug("Performing read repair for " + 
readCommand_.key + " to " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");
+                    MessagingService.instance.addCallback(responseHandler, 
message.getMessageId());
+                    for (InetAddress endpoint : replicas_)
                     {
-                        doReadRepair();
-                        break;
+                        if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                            MessagingService.instance.sendOneWay(message, 
endpoint);
                     }
+
+                    repairInvoked = true;
                 }
-                catch (Exception e)
-                {
-                    throw new RuntimeException("Error handling responses for " 
+ row_, e);
-                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Error handling responses for " + 
row_, e);
             }
         }
-
-        private void doReadRepair() throws IOException
-               {
-            replicas_.add(FBUtilities.getLocalAddress());
-            IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
-            IAsyncCallback responseHandler = new 
DataRepairHandler(replicas_.size(), readResponseResolver);
-            ReadCommand readCommand = constructReadMessage(false);
-            Message message = readCommand.makeReadMessage();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Performing read repair for " + readCommand_.key + 
" to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + 
"]");
-            MessagingService.instance.sendRR(message, replicas_.toArray(new 
InetAddress[replicas_.size()]), responseHandler);
-               }
-       }
+    }
 
        static class DataRepairHandler implements IAsyncCallback, 
ICacheExpungeHook<String, String>
        {
@@ -141,6 +150,18 @@ class ConsistencyChecker implements Runn
                        majority_ = (responseCount / 2) + 1;  
                }
 
+        public DataRepairHandler(Row localRow, int responseCount, 
IResponseResolver<Row> readResponseResolver) throws IOException
+        {
+            this(responseCount, readResponseResolver);
+            // wrap localRow in a response Message so it doesn't need to be 
special-cased in the resolver
+            ReadResponse readResponse = new ReadResponse(localRow);
+            DataOutputBuffer out = new DataOutputBuffer();
+            ReadResponse.serializer().serialize(readResponse, out);
+            byte[] bytes = new byte[out.getLength()];
+            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
+            responses_.add(new Message(FBUtilities.getLocalAddress(), 
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+        }
+
         // synchronized so the " == majority" is safe
                public synchronized void response(Message message)
                {

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=929768&r1=929767&r2=929768&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu Apr  1 01:39:01 2010
@@ -763,9 +763,7 @@ public class StorageProxy implements Sto
             if (DatabaseDescriptor.getConsistencyCheck())
             {
                 List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-                /* Remove the local storage endpoint from the list. */
-                endpoints.remove(FBUtilities.getLocalAddress());
-                if (endpoints.size() > 0)
+                if (endpoints.size() > 1)
                     StorageService.instance.doConsistencyCheck(row, endpoints, 
command);
             }
 


Reply via email to