Author: jbellis Date: Thu Apr 1 01:39:01 2010 New Revision: 929768 URL: http://svn.apache.org/viewvc?rev=929768&view=rev Log: keep the replica set constant throughout the read repair process. patch by jbellis; reviewed by gdusbabek for CASSANDRA-937
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=929768&r1=929767&r2=929768&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Apr 1 01:39:01 2010 @@ -97,9 +97,7 @@ public class ReadVerbHandler implements if (message.getHeader(ReadCommand.DO_REPAIR) != null) { List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key); - /* Remove the local storage endpoint from the list. */ - endpoints.remove(FBUtilities.getLocalAddress()); - if (endpoints.size() > 0) + if (endpoints.size() > 1) StorageService.instance.doConsistencyCheck(row, endpoints, command); } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929768&r1=929767&r2=929768&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Apr 1 01:39:01 2010 @@ -31,10 +31,13 @@ import org.apache.log4j.Logger; import org.apache.commons.lang.StringUtils; import org.apache.cassandra.cache.ICacheExpungeHook; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.Row; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -45,19 +48,18 @@ import org.apache.cassandra.utils.FBUtil class ConsistencyChecker implements Runnable { private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class); - private static long scheduledTimeMillis_ = 600; - private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String, String>(scheduledTimeMillis_); + private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout()); private final String table_; private final Row row_; protected final List<InetAddress> replicas_; private final ReadCommand readCommand_; - public ConsistencyChecker(String table, Row row, List<InetAddress> replicas, ReadCommand readCommand) + public ConsistencyChecker(String table, Row row, List<InetAddress> endpoints, ReadCommand readCommand) { table_ = table; row_ = row; - replicas_ = replicas; + replicas_ = endpoints; readCommand_ = readCommand; } @@ -69,7 +71,13 @@ class ConsistencyChecker implements Runn Message message = readCommandDigestOnly.makeReadMessage(); if (logger_.isDebugEnabled()) logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); - MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler()); + + MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId()); + for (InetAddress endpoint : replicas_) + { + if (!endpoint.equals(FBUtilities.getLocalAddress())) + MessagingService.instance.sendOneWay(message, endpoint); + } } catch (IOException ex) { @@ -86,48 +94,49 @@ class ConsistencyChecker implements Runn class DigestResponseHandler implements IAsyncCallback { - Collection<Message> responses_ = new LinkedBlockingQueue<Message>(); + private boolean repairInvoked; - // syncronized so "size() == " works - public synchronized void response(Message msg) + public synchronized void response(Message response) { - responses_.add(msg); - if (responses_.size() != ConsistencyChecker.this.replicas_.size()) + if (repairInvoked) return; - for (Message response : responses_) + try { - try + byte[] body = response.getMessageBody(); + ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + byte[] digest = result.digest(); + + if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest)) { - byte[] body = response.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); - byte[] digest = result.digest(); - if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest)) + IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size()); + IAsyncCallback responseHandler; + if (replicas_.contains(FBUtilities.getLocalAddress())) + responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver); + else + responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver); + + ReadCommand readCommand = constructReadMessage(false); + Message message = readCommand.makeReadMessage(); + if (logger_.isDebugEnabled()) + logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); + MessagingService.instance.addCallback(responseHandler, message.getMessageId()); + for (InetAddress endpoint : replicas_) { - doReadRepair(); - break; + if (!endpoint.equals(FBUtilities.getLocalAddress())) + MessagingService.instance.sendOneWay(message, endpoint); } + + repairInvoked = true; } - catch (Exception e) - { - throw new RuntimeException("Error handling responses for " + row_, e); - } + } + catch (Exception e) + { + throw new RuntimeException("Error handling responses for " + row_, e); } } - - private void doReadRepair() throws IOException - { - replicas_.add(FBUtilities.getLocalAddress()); - IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size()); - IAsyncCallback responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver); - ReadCommand readCommand = constructReadMessage(false); - Message message = readCommand.makeReadMessage(); - if (logger_.isDebugEnabled()) - logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); - MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler); - } - } + } static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String> { @@ -141,6 +150,18 @@ class ConsistencyChecker implements Runn majority_ = (responseCount / 2) + 1; } + public DataRepairHandler(Row localRow, int responseCount, IResponseResolver<Row> readResponseResolver) throws IOException + { + this(responseCount, readResponseResolver); + // wrap localRow in a response Message so it doesn't need to be special-cased in the resolver + ReadResponse readResponse = new ReadResponse(localRow); + DataOutputBuffer out = new DataOutputBuffer(); + ReadResponse.serializer().serialize(readResponse, out); + byte[] bytes = new byte[out.getLength()]; + System.arraycopy(out.getData(), 0, bytes, 0, bytes.length); + responses_.add(new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes)); + } + // synchronized so the " == majority" is safe public synchronized void response(Message message) { Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=929768&r1=929767&r2=929768&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Thu Apr 1 01:39:01 2010 @@ -763,9 +763,7 @@ public class StorageProxy implements Sto if (DatabaseDescriptor.getConsistencyCheck()) { List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key); - /* Remove the local storage endpoint from the list. */ - endpoints.remove(FBUtilities.getLocalAddress()); - if (endpoints.size() > 0) + if (endpoints.size() > 1) StorageService.instance.doConsistencyCheck(row, endpoints, command); }