Author: jbellis Date: Thu Nov 11 20:05:14 2010 New Revision: 1034095 URL: http://svn.apache.org/viewvc?rev=1034095&view=rev Log: merge from 0.6
Modified: cassandra/branches/cassandra-0.7/ (props changed) cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java Propchange: cassandra/branches/cassandra-0.7/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 11 20:05:14 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6:922689-1033044 +/cassandra/branches/cassandra-0.6:922689-1034086 /cassandra/trunk:1026516-1026734,1028929 /incubator/cassandra/branches/cassandra-0.3:774578-796573 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350 Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1034095&r1=1034094&r2=1034095&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Nov 11 20:05:14 2010 @@ -2,6 +2,9 @@ dev * Update windows .bat files to work outside of main Cassandra directory (CASSANDRA-1713) * log threshold causing memtable flush (CASSANDRA-1675) + * fix read repair regression from 0.6.7 (CASSANDRA-1727) + * more-efficient read repair (CASSANDRA-1719) + * fix hinted handoff replay (CASSANDRA-1656) * log type of dropped messages (CASSANDRA-1677) * upgrade to SLF4J 1.6.1 * fix ByteBuffer bug in ExpiringColumn.updateDigest (CASSANDRA-1679) Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 11 20:05:14 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1033044 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1034086 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350 Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 11 20:05:14 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1033044 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1034086 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350 Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 11 20:05:14 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1033044 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1034086 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350 Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 11 20:05:14 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1033044 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1034086 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350 Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 11 20:05:14 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1033044 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1034086 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1034095&r1=1034094&r2=1034095&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Nov 11 20:05:14 2010 @@ -30,8 +30,12 @@ import java.util.concurrent.ScheduledExe import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ReadCommand; @@ -42,12 +46,22 @@ import org.apache.cassandra.net.IAsyncCa import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; - import org.apache.cassandra.utils.WrappedRunnable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +/** + * ConsistencyChecker does the following: + * + * [ConsistencyChecker.run] + * (1) sends DIGEST read requests to each other replica of the given row. + * + * [DigestResponseHandler] + * (2) If any of the digests to not match the local one, it sends a second round of requests + * to each replica, this time for the full data + * + * [DataRepairHandler] + * (3) processes full-read responses and invokes resolve. The actual sending of messages + * repairing out-of-date or missing data is handled by ReadResponseResolver. + */ class ConsistencyChecker implements Runnable { private static Logger logger_ = LoggerFactory.getLogger(ConsistencyChecker.class); @@ -65,6 +79,7 @@ class ConsistencyChecker implements Runn row_ = row; replicas_ = endpoints; readCommand_ = readCommand; + assert replicas_.contains(FBUtilities.getLocalAddress()); } public void run() @@ -99,8 +114,9 @@ class ConsistencyChecker implements Runn class DigestResponseHandler implements IAsyncCallback { private boolean repairInvoked; + private final ByteBuffer localDigest = ColumnFamily.digest(row_.cf); - public synchronized void response(Message response) + public synchronized void response(Message response) { if (repairInvoked) return; @@ -112,19 +128,15 @@ class ConsistencyChecker implements Runn ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); ByteBuffer digest = result.digest(); - if (!ColumnFamily.digest(row_.cf).equals(digest)) + if (!localDigest.equals(digest)) { - IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_); - IAsyncCallback responseHandler; - if (replicas_.contains(FBUtilities.getLocalAddress())) - responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver); - else - responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver); + ReadResponseResolver readResponseResolver = new ReadResponseResolver(table_); + IAsyncCallback responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver); ReadCommand readCommand = constructReadMessage(false); Message message = readCommand.makeReadMessage(); if (logger_.isDebugEnabled()) - logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); + logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); MessagingService.instance.addCallback(responseHandler, message.getMessageId()); for (InetAddress endpoint : replicas_) { @@ -145,33 +157,27 @@ class ConsistencyChecker implements Runn static class DataRepairHandler implements IAsyncCallback { private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>(); - private final IResponseResolver<Row> readResponseResolver_; + private final ReadResponseResolver readResponseResolver_; private final int majority_; - DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver) - { - readResponseResolver_ = readResponseResolver; - majority_ = (responseCount / 2) + 1; - } - - public DataRepairHandler(Row localRow, int responseCount, IResponseResolver<Row> readResponseResolver) throws IOException + public DataRepairHandler(Row localRow, int responseCount, ReadResponseResolver readResponseResolver) throws IOException { - this(responseCount, readResponseResolver); + readResponseResolver_ = readResponseResolver; + majority_ = (responseCount / 2) + 1; // wrap localRow in a response Message so it doesn't need to be special-cased in the resolver ReadResponse readResponse = new ReadResponse(localRow); - DataOutputBuffer out = new DataOutputBuffer(); - ReadResponse.serializer().serialize(readResponse, out); - byte[] bytes = new byte[out.getLength()]; - System.arraycopy(out.getData(), 0, bytes, 0, bytes.length); - responses_.add(new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, bytes)); + Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY); + responses_.add(fakeMessage); + readResponseResolver_.injectPreProcessed(fakeMessage, readResponse); } // synchronized so the " == majority" is safe public synchronized void response(Message message) { if (logger_.isDebugEnabled()) - logger_.debug("Received responses in DataRepairHandler : " + message.toString()); + logger_.debug("Received response in DataRepairHandler : " + message.toString()); responses_.add(message); + readResponseResolver_.preprocess(message); if (responses_.size() == majority_) { Runnable runnable = new WrappedRunnable() Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1034095&r1=1034094&r2=1034095&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java Thu Nov 11 20:05:14 2010 @@ -195,6 +195,12 @@ public class ReadResponseResolver implem } } + /** hack so ConsistencyChecker doesn't have to serialize/deserialize an extra real Message */ + public void injectPreProcessed(Message message, ReadResponse result) + { + results.put(message, result); + } + public boolean isDataPresent(Collection<Message> responses) { for (Message message : responses)