Author: jbellis Date: Thu Apr 28 13:24:22 2011 New Revision: 1097448 URL: http://svn.apache.org/viewvc?rev=1097448&view=rev Log: fix incorrect use ofNBHM.size in ReadCallback patch by jbellis; reviewed by stuhood for CASSANDRA-2552
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Apr 28 13:24:22 2011 @@ -1,6 +1,8 @@ 0.7.6 * force GC to reclaim disk space on flush, if necessary (CASSANDRA-2404) * move gossip heartbeat back to its own thread (CASSANDRA-2554) + * fix incorrect use of NBHM.size in ReadCallback that could cause + reads to time out even when responses were received (CASSAMDRA-2552) 0.7.5 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java Thu Apr 28 13:24:22 2011 @@ -83,9 +83,4 @@ public abstract class AbstractRowResolve { return replies.keySet(); } - - public int getMessageCount() - { - return replies.size(); - } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java Thu Apr 28 13:24:22 2011 @@ -22,6 +22,7 @@ package org.apache.cassandra.service; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@ -32,18 +33,19 @@ import org.apache.cassandra.utils.Wrappe public class AsyncRepairCallback implements IAsyncCallback { private final RowRepairResolver repairResolver; - private final int count; + private final int blockfor; + protected final AtomicInteger received = new AtomicInteger(0); - public AsyncRepairCallback(RowRepairResolver repairResolver, int count) + public AsyncRepairCallback(RowRepairResolver repairResolver, int blockfor) { this.repairResolver = repairResolver; - this.count = count; + this.blockfor = blockfor; } public void response(Message message) { repairResolver.preprocess(message); - if (repairResolver.getMessageCount() == count) + if (received.incrementAndGet() == blockfor) { StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable() { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Thu Apr 28 13:24:22 2011 @@ -23,7 +23,6 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ReadResponse; @@ -42,12 +41,10 @@ public class DatacenterReadCallback<T> e { private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress()); - private AtomicInteger localResponses; - + public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints) { super(resolver, consistencyLevel, command, endpoints); - localResponses = new AtomicInteger(blockfor); } @Override @@ -56,12 +53,13 @@ public class DatacenterReadCallback<T> e resolver.preprocess(message); int n = localdc.equals(snitch.getDatacenter(message.getFrom())) - ? localResponses.decrementAndGet() - : localResponses.get(); + ? received.incrementAndGet() + : received.get(); - if (n == 0 && resolver.isDataPresent()) + if (n == blockfor && resolver.isDataPresent()) { condition.signal(); + maybeResolveForRepair(); } } @@ -70,13 +68,11 @@ public class DatacenterReadCallback<T> e { ((RowDigestResolver) resolver).injectPreProcessed(result); - int n = localResponses.decrementAndGet(); - if (n == 0 && resolver.isDataPresent()) + if (received.incrementAndGet() == blockfor && resolver.isDataPresent()) { condition.signal(); + maybeResolveForRepair(); } - - maybeResolveForRepair(); } @Override Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java Thu Apr 28 13:24:22 2011 @@ -43,7 +43,4 @@ public interface IResponseResolver<T> { public void preprocess(Message message); public Iterable<Message> getMessages(); - - /** Potentially called by multiple response threads, so must be threadsafe. */ - public int getMessageCount(); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Apr 28 13:24:22 2011 @@ -146,9 +146,4 @@ public class RangeSliceResponseResolver { return responses; } - - public int getMessageCount() - { - return responses.size(); - } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java Thu Apr 28 13:24:22 2011 @@ -24,6 +24,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -62,6 +63,7 @@ public class ReadCallback<T> implements private final long startTime; protected final int blockfor; private final IReadCommand command; + protected final AtomicInteger received = new AtomicInteger(0); /** the list of endpoints that StorageProxy should send requests to */ final List<InetAddress> endpoints; @@ -117,7 +119,7 @@ public class ReadCallback<T> implements StringBuilder sb = new StringBuilder(""); for (Message message : resolver.getMessages()) sb.append(message.getFrom()).append(", "); - throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " ."); + throw new TimeoutException("Operation timed out - received only " + received.get() + " responses from " + sb.toString() + " ."); } return blockfor == 1 ? resolver.getData() : resolver.resolve(); @@ -126,10 +128,7 @@ public class ReadCallback<T> implements public void response(Message message) { resolver.preprocess(message); - assert resolver.getMessageCount() <= endpoints.size() : "Got " + resolver.getMessageCount() + " replies but requests were only sent to " + endpoints.size() + " endpoints"; - if (resolver.getMessageCount() < blockfor) - return; - if (resolver.isDataPresent()) + if (received.incrementAndGet() >= blockfor && resolver.isDataPresent()) { condition.signal(); maybeResolveForRepair(); @@ -139,10 +138,7 @@ public class ReadCallback<T> implements public void response(ReadResponse result) { ((RowDigestResolver) resolver).injectPreProcessed(result); - assert resolver.getMessageCount() <= endpoints.size(); - if (resolver.getMessageCount() < blockfor) - return; - if (resolver.isDataPresent()) + if (received.incrementAndGet() >= blockfor && resolver.isDataPresent()) { condition.signal(); maybeResolveForRepair(); @@ -155,7 +151,7 @@ public class ReadCallback<T> implements */ protected void maybeResolveForRepair() { - if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size()) + if (blockfor < endpoints.size() && received.get() == endpoints.size()) { assert resolver.isDataPresent(); StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner()); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java Thu Apr 28 13:24:22 2011 @@ -26,6 +26,7 @@ import java.net.InetAddress; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.IAsyncCallback; @@ -38,6 +39,7 @@ public class RepairCallback<T> implement private final List<InetAddress> endpoints; private final SimpleCondition condition = new SimpleCondition(); private final long startTime; + protected final AtomicInteger received = new AtomicInteger(0); /** * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel @@ -66,13 +68,13 @@ public class RepairCallback<T> implement throw new AssertionError(ex); } - return resolver.getMessageCount() > 1 ? resolver.resolve() : null; + return received.get() > 1 ? resolver.resolve() : null; } public void response(Message message) { resolver.preprocess(message); - if (resolver.getMessageCount() == endpoints.size()) + if (received.incrementAndGet() == endpoints.size()) condition.signal(); }