Author: jbellis
Date: Thu Apr  1 01:53:56 2010
New Revision: 929777

URL: http://svn.apache.org/viewvc?rev=929777&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-929765
+/cassandra/branches/cassandra-0.6:922689-929775
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Apr  1 01:53:56 2010
@@ -8,6 +8,8 @@ dev
 
 0.6.1
  * fix NPE in sstable2json when no excluded keys are given (CASSANDRA-934)
+ * keep the replica set constant throughout the read repair process
+   (CASSANDRA-937)
 
 
 0.6.0-RC1

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-929775
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-929775
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-929775
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-929775
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-929775
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu 
Apr  1 01:53:56 2010
@@ -98,9 +98,7 @@ public class ReadVerbHandler implements 
             if (message.getHeader(ReadCommand.DO_REPAIR) != null)
             {
                 List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-                /* Remove the local storage endpoint from the list. */
-                endpoints.remove(FBUtilities.getLocalAddress());
-                if (endpoints.size() > 0)
+                if (endpoints.size() > 1)
                     StorageService.instance.doConsistencyCheck(row, endpoints, 
command);
             }
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
Thu Apr  1 01:53:56 2010
@@ -30,10 +30,13 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -47,19 +50,18 @@ import org.slf4j.LoggerFactory;
 class ConsistencyChecker implements Runnable
 {
     private static Logger logger_ = 
LoggerFactory.getLogger(ConsistencyManager.class);
-    private static long scheduledTimeMillis_ = 600;
-    private static ExpiringMap<String, String> readRepairTable_ = new 
ExpiringMap<String, String>(scheduledTimeMillis_);
+    private static ExpiringMap<String, String> readRepairTable_ = new 
ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout());
 
     private final String table_;
     private final Row row_;
     protected final List<InetAddress> replicas_;
     private final ReadCommand readCommand_;
 
-    public ConsistencyChecker(String table, Row row, List<InetAddress> 
replicas, ReadCommand readCommand)
+    public ConsistencyChecker(String table, Row row, List<InetAddress> 
endpoints, ReadCommand readCommand)
     {
         table_ = table;
         row_ = row;
-        replicas_ = replicas;
+        replicas_ = endpoints;
         readCommand_ = readCommand;
     }
 
@@ -71,7 +73,13 @@ class ConsistencyChecker implements Runn
                        Message message = 
readCommandDigestOnly.makeReadMessage();
             if (logger_.isDebugEnabled())
               logger_.debug("Reading consistency digest for " + 
readCommand_.key + " from " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");
-            MessagingService.instance.sendRR(message, replicas_.toArray(new 
InetAddress[replicas_.size()]), new DigestResponseHandler());
+
+            MessagingService.instance.addCallback(new DigestResponseHandler(), 
message.getMessageId());
+            for (InetAddress endpoint : replicas_)
+            {
+                if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                    MessagingService.instance.sendOneWay(message, endpoint);
+            }
                }
                catch (IOException ex)
                {
@@ -88,48 +96,49 @@ class ConsistencyChecker implements Runn
 
     class DigestResponseHandler implements IAsyncCallback
        {
-               Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
+        private boolean repairInvoked;
 
-        // syncronized so "size() == " works
-               public synchronized void response(Message msg)
+               public synchronized void response(Message response)
                {
-                       responses_.add(msg);
-            if (responses_.size() != ConsistencyChecker.this.replicas_.size())
+            if (repairInvoked)
                 return;
 
-            for (Message response : responses_)
+            try
             {
-                try
+                byte[] body = response.getMessageBody();
+                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+                ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+                byte[] digest = result.digest();
+
+                if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    byte[] body = response.getMessageBody();
-                    ByteArrayInputStream bufIn = new 
ByteArrayInputStream(body);
-                    ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                    byte[] digest = result.digest();
-                    if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+                    IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
+                    IAsyncCallback responseHandler;
+                    if (replicas_.contains(FBUtilities.getLocalAddress()))
+                        responseHandler = new DataRepairHandler(row_, 
replicas_.size(), readResponseResolver);
+                    else
+                        responseHandler = new 
DataRepairHandler(replicas_.size(), readResponseResolver);
+
+                    ReadCommand readCommand = constructReadMessage(false);
+                    Message message = readCommand.makeReadMessage();
+                    if (logger_.isDebugEnabled())
+                      logger_.debug("Performing read repair for " + 
readCommand_.key + " to " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");
+                    MessagingService.instance.addCallback(responseHandler, 
message.getMessageId());
+                    for (InetAddress endpoint : replicas_)
                     {
-                        doReadRepair();
-                        break;
+                        if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                            MessagingService.instance.sendOneWay(message, 
endpoint);
                     }
+
+                    repairInvoked = true;
                 }
-                catch (Exception e)
-                {
-                    throw new RuntimeException("Error handling responses for " 
+ row_, e);
-                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Error handling responses for " + 
row_, e);
             }
         }
-
-        private void doReadRepair() throws IOException
-               {
-            replicas_.add(FBUtilities.getLocalAddress());
-            IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
-            IAsyncCallback responseHandler = new 
DataRepairHandler(replicas_.size(), readResponseResolver);
-            ReadCommand readCommand = constructReadMessage(false);
-            Message message = readCommand.makeReadMessage();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Performing read repair for " + readCommand_.key + 
" to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + 
"]");
-            MessagingService.instance.sendRR(message, replicas_.toArray(new 
InetAddress[replicas_.size()]), responseHandler);
-               }
-       }
+    }
 
        static class DataRepairHandler implements IAsyncCallback, 
ICacheExpungeHook<String, String>
        {
@@ -143,6 +152,18 @@ class ConsistencyChecker implements Runn
                        majority_ = (responseCount / 2) + 1;  
                }
 
+        public DataRepairHandler(Row localRow, int responseCount, 
IResponseResolver<Row> readResponseResolver) throws IOException
+        {
+            this(responseCount, readResponseResolver);
+            // wrap localRow in a response Message so it doesn't need to be 
special-cased in the resolver
+            ReadResponse readResponse = new ReadResponse(localRow);
+            DataOutputBuffer out = new DataOutputBuffer();
+            ReadResponse.serializer().serialize(readResponse, out);
+            byte[] bytes = new byte[out.getLength()];
+            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
+            responses_.add(new Message(FBUtilities.getLocalAddress(), 
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+        }
+
         // synchronized so the " == majority" is safe
                public synchronized void response(Message message)
                {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu 
Apr  1 01:53:56 2010
@@ -764,9 +764,7 @@ public class StorageProxy implements Sto
             if (DatabaseDescriptor.getConsistencyCheck())
             {
                 List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-                /* Remove the local storage endpoint from the list. */
-                endpoints.remove(FBUtilities.getLocalAddress());
-                if (endpoints.size() > 0)
+                if (endpoints.size() > 1)
                     StorageService.instance.doConsistencyCheck(row, endpoints, 
command);
             }
 


Reply via email to