Author: jbellis
Date: Thu Nov 11 20:05:14 2010
New Revision: 1034095

URL: http://svn.apache.org/viewvc?rev=1034095&view=rev
Log:
merge from 0.6

Modified:
    cassandra/branches/cassandra-0.7/   (props changed)
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java

Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 11 20:05:14 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1033044
+/cassandra/branches/cassandra-0.6:922689-1034086
 /cassandra/trunk:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1034095&r1=1034094&r2=1034095&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Nov 11 20:05:14 2010
@@ -2,6 +2,9 @@ dev
  * Update windows .bat files to work outside of main Cassandra
    directory (CASSANDRA-1713)
  * log threshold causing memtable flush (CASSANDRA-1675)
+ * fix read repair regression from 0.6.7 (CASSANDRA-1727)
+ * more-efficient read repair (CASSANDRA-1719)
+ * fix hinted handoff replay (CASSANDRA-1656)
  * log type of dropped messages (CASSANDRA-1677)
  * upgrade to SLF4J 1.6.1
  * fix ByteBuffer bug in ExpiringColumn.updateDigest (CASSANDRA-1679)

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 11 20:05:14 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1033044
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1034086
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 11 20:05:14 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1033044
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1034086
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 11 20:05:14 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1033044
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1034086
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 11 20:05:14 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1033044
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1034086
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 11 20:05:14 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1033044
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1034086
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1034095&r1=1034094&r2=1034095&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 Thu Nov 11 20:05:14 2010
@@ -30,8 +30,12 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadCommand;
@@ -42,12 +46,22 @@ import org.apache.cassandra.net.IAsyncCa
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
+/**
+ * ConsistencyChecker does the following:
+ *
+ * [ConsistencyChecker.run]
+ * (1) sends DIGEST read requests to each other replica of the given row.
+ *
+ * [DigestResponseHandler]
+ * (2) If any of the digests to not match the local one, it sends a second 
round of requests
+ * to each replica, this time for the full data
+ *
+ * [DataRepairHandler]
+ * (3) processes full-read responses and invokes resolve.  The actual sending 
of messages
+ * repairing out-of-date or missing data is handled by ReadResponseResolver.
+ */
 class ConsistencyChecker implements Runnable
 {
     private static Logger logger_ = 
LoggerFactory.getLogger(ConsistencyChecker.class);
@@ -65,6 +79,7 @@ class ConsistencyChecker implements Runn
         row_ = row;
         replicas_ = endpoints;
         readCommand_ = readCommand;
+        assert replicas_.contains(FBUtilities.getLocalAddress());
     }
 
     public void run()
@@ -99,8 +114,9 @@ class ConsistencyChecker implements Runn
     class DigestResponseHandler implements IAsyncCallback
        {
         private boolean repairInvoked;
+        private final ByteBuffer localDigest = ColumnFamily.digest(row_.cf);
 
-               public synchronized void response(Message response)
+        public synchronized void response(Message response)
                {
             if (repairInvoked)
                 return;
@@ -112,19 +128,15 @@ class ConsistencyChecker implements Runn
                 ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
                 ByteBuffer digest = result.digest();
 
-                if (!ColumnFamily.digest(row_.cf).equals(digest))
+                if (!localDigest.equals(digest))
                 {
-                    IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_);
-                    IAsyncCallback responseHandler;
-                    if (replicas_.contains(FBUtilities.getLocalAddress()))
-                        responseHandler = new DataRepairHandler(row_, 
replicas_.size(), readResponseResolver);
-                    else
-                        responseHandler = new 
DataRepairHandler(replicas_.size(), readResponseResolver);
+                    ReadResponseResolver readResponseResolver = new 
ReadResponseResolver(table_);
+                    IAsyncCallback responseHandler = new 
DataRepairHandler(row_, replicas_.size(), readResponseResolver);
 
                     ReadCommand readCommand = constructReadMessage(false);
                     Message message = readCommand.makeReadMessage();
                     if (logger_.isDebugEnabled())
-                      logger_.debug("Performing read repair for " + 
readCommand_.key + " to " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");
+                        logger_.debug("Digest mismatch; re-reading " + 
readCommand_.key + " from " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");                         
                     MessagingService.instance.addCallback(responseHandler, 
message.getMessageId());
                     for (InetAddress endpoint : replicas_)
                     {
@@ -145,33 +157,27 @@ class ConsistencyChecker implements Runn
        static class DataRepairHandler implements IAsyncCallback
        {
                private final Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
-               private final IResponseResolver<Row> readResponseResolver_;
+               private final ReadResponseResolver readResponseResolver_;
                private final int majority_;
                
-               DataRepairHandler(int responseCount, IResponseResolver<Row> 
readResponseResolver)
-               {
-                       readResponseResolver_ = readResponseResolver;
-                       majority_ = (responseCount / 2) + 1;  
-               }
-
-        public DataRepairHandler(Row localRow, int responseCount, 
IResponseResolver<Row> readResponseResolver) throws IOException
+        public DataRepairHandler(Row localRow, int responseCount, 
ReadResponseResolver readResponseResolver) throws IOException
         {
-            this(responseCount, readResponseResolver);
+            readResponseResolver_ = readResponseResolver;
+            majority_ = (responseCount / 2) + 1;
             // wrap localRow in a response Message so it doesn't need to be 
special-cased in the resolver
             ReadResponse readResponse = new ReadResponse(localRow);
-            DataOutputBuffer out = new DataOutputBuffer();
-            ReadResponse.serializer().serialize(readResponse, out);
-            byte[] bytes = new byte[out.getLength()];
-            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
-            responses_.add(new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.INTERNAL_RESPONSE, bytes));
+            Message fakeMessage = new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
+            responses_.add(fakeMessage);
+            readResponseResolver_.injectPreProcessed(fakeMessage, 
readResponse);
         }
 
         // synchronized so the " == majority" is safe
                public synchronized void response(Message message)
                {
                        if (logger_.isDebugEnabled())
-                         logger_.debug("Received responses in 
DataRepairHandler : " + message.toString());
+                         logger_.debug("Received response in DataRepairHandler 
: " + message.toString());
                        responses_.add(message);
+            readResponseResolver_.preprocess(message);
             if (responses_.size() == majority_)
             {
                 Runnable runnable = new WrappedRunnable()

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1034095&r1=1034094&r2=1034095&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 Thu Nov 11 20:05:14 2010
@@ -195,6 +195,12 @@ public class ReadResponseResolver implem
         }
     }
 
+    /** hack so ConsistencyChecker doesn't have to serialize/deserialize an 
extra real Message */
+    public void injectPreProcessed(Message message, ReadResponse result)
+    {
+        results.put(message, result);
+    }
+
     public boolean isDataPresent(Collection<Message> responses)
        {
         for (Message message : responses)


Reply via email to