Author: jbellis Date: Thu Apr 1 01:53:56 2010 New Revision: 929777 URL: http://svn.apache.org/viewvc?rev=929777&view=rev Log: merge from 0.6
Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 1 01:53:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6:922689-929765 +/cassandra/branches/cassandra-0.6:922689-929775 /incubator/cassandra/branches/cassandra-0.3:774578-796573 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5:888872-915439 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=929777&r1=929776&r2=929777&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Apr 1 01:53:56 2010 @@ -8,6 +8,8 @@ dev 0.6.1 * fix NPE in sstable2json when no excluded keys are given (CASSANDRA-934) + * keep the replica set constant throughout the read repair process + (CASSANDRA-937) 0.6.0-RC1 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 1 01:53:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-929765 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-929775 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 1 01:53:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-929765 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-929775 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 1 01:53:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-929765 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-929775 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 1 01:53:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-929765 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-929775 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 1 01:53:56 2010 @@ -1,4 +1,4 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-929765 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-929775 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=929777&r1=929776&r2=929777&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Apr 1 01:53:56 2010 @@ -98,9 +98,7 @@ public class ReadVerbHandler implements if (message.getHeader(ReadCommand.DO_REPAIR) != null) { List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key); - /* Remove the local storage endpoint from the list. */ - endpoints.remove(FBUtilities.getLocalAddress()); - if (endpoints.size() > 0) + if (endpoints.size() > 1) StorageService.instance.doConsistencyCheck(row, endpoints, command); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929777&r1=929776&r2=929777&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Apr 1 01:53:56 2010 @@ -30,10 +30,13 @@ import java.util.concurrent.LinkedBlocki import org.apache.commons.lang.StringUtils; import org.apache.cassandra.cache.ICacheExpungeHook; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.Row; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -47,19 +50,18 @@ import org.slf4j.LoggerFactory; class ConsistencyChecker implements Runnable { private static Logger logger_ = LoggerFactory.getLogger(ConsistencyManager.class); - private static long scheduledTimeMillis_ = 600; - private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String, String>(scheduledTimeMillis_); + private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout()); private final String table_; private final Row row_; protected final List<InetAddress> replicas_; private final ReadCommand readCommand_; - public ConsistencyChecker(String table, Row row, List<InetAddress> replicas, ReadCommand readCommand) + public ConsistencyChecker(String table, Row row, List<InetAddress> endpoints, ReadCommand readCommand) { table_ = table; row_ = row; - replicas_ = replicas; + replicas_ = endpoints; readCommand_ = readCommand; } @@ -71,7 +73,13 @@ class ConsistencyChecker implements Runn Message message = readCommandDigestOnly.makeReadMessage(); if (logger_.isDebugEnabled()) logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); - MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler()); + + MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId()); + for (InetAddress endpoint : replicas_) + { + if (!endpoint.equals(FBUtilities.getLocalAddress())) + MessagingService.instance.sendOneWay(message, endpoint); + } } catch (IOException ex) { @@ -88,48 +96,49 @@ class ConsistencyChecker implements Runn class DigestResponseHandler implements IAsyncCallback { - Collection<Message> responses_ = new LinkedBlockingQueue<Message>(); + private boolean repairInvoked; - // syncronized so "size() == " works - public synchronized void response(Message msg) + public synchronized void response(Message response) { - responses_.add(msg); - if (responses_.size() != ConsistencyChecker.this.replicas_.size()) + if (repairInvoked) return; - for (Message response : responses_) + try { - try + byte[] body = response.getMessageBody(); + ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + byte[] digest = result.digest(); + + if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest)) { - byte[] body = response.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); - byte[] digest = result.digest(); - if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest)) + IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size()); + IAsyncCallback responseHandler; + if (replicas_.contains(FBUtilities.getLocalAddress())) + responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver); + else + responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver); + + ReadCommand readCommand = constructReadMessage(false); + Message message = readCommand.makeReadMessage(); + if (logger_.isDebugEnabled()) + logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); + MessagingService.instance.addCallback(responseHandler, message.getMessageId()); + for (InetAddress endpoint : replicas_) { - doReadRepair(); - break; + if (!endpoint.equals(FBUtilities.getLocalAddress())) + MessagingService.instance.sendOneWay(message, endpoint); } + + repairInvoked = true; } - catch (Exception e) - { - throw new RuntimeException("Error handling responses for " + row_, e); - } + } + catch (Exception e) + { + throw new RuntimeException("Error handling responses for " + row_, e); } } - - private void doReadRepair() throws IOException - { - replicas_.add(FBUtilities.getLocalAddress()); - IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size()); - IAsyncCallback responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver); - ReadCommand readCommand = constructReadMessage(false); - Message message = readCommand.makeReadMessage(); - if (logger_.isDebugEnabled()) - logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); - MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler); - } - } + } static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String> { @@ -143,6 +152,18 @@ class ConsistencyChecker implements Runn majority_ = (responseCount / 2) + 1; } + public DataRepairHandler(Row localRow, int responseCount, IResponseResolver<Row> readResponseResolver) throws IOException + { + this(responseCount, readResponseResolver); + // wrap localRow in a response Message so it doesn't need to be special-cased in the resolver + ReadResponse readResponse = new ReadResponse(localRow); + DataOutputBuffer out = new DataOutputBuffer(); + ReadResponse.serializer().serialize(readResponse, out); + byte[] bytes = new byte[out.getLength()]; + System.arraycopy(out.getData(), 0, bytes, 0, bytes.length); + responses_.add(new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes)); + } + // synchronized so the " == majority" is safe public synchronized void response(Message message) { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=929777&r1=929776&r2=929777&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Apr 1 01:53:56 2010 @@ -764,9 +764,7 @@ public class StorageProxy implements Sto if (DatabaseDescriptor.getConsistencyCheck()) { List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key); - /* Remove the local storage endpoint from the list. */ - endpoints.remove(FBUtilities.getLocalAddress()); - if (endpoints.size() > 0) + if (endpoints.size() > 1) StorageService.instance.doConsistencyCheck(row, endpoints, command); }