Author: jbellis
Date: Thu Apr 28 13:24:22 2011
New Revision: 1097448

URL: http://svn.apache.org/viewvc?rev=1097448&view=rev
Log:
fix incorrect use ofNBHM.size in ReadCallback
patch by jbellis; reviewed by stuhood for CASSANDRA-2552

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Apr 28 13:24:22 2011
@@ -1,6 +1,8 @@
 0.7.6
  * force GC to reclaim disk space on flush, if necessary (CASSANDRA-2404)
  * move gossip heartbeat back to its own thread (CASSANDRA-2554)
+ * fix incorrect use of NBHM.size in ReadCallback that could cause
+   reads to time out even when responses were received (CASSAMDRA-2552)
 
 
 0.7.5

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractRowResolver.java
 Thu Apr 28 13:24:22 2011
@@ -83,9 +83,4 @@ public abstract class AbstractRowResolve
     {
         return replies.keySet();
     }
-
-    public int getMessageCount()
-    {
-        return replies.size();
-    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
 Thu Apr 28 13:24:22 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.service;
 
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -32,18 +33,19 @@ import org.apache.cassandra.utils.Wrappe
 public class AsyncRepairCallback implements IAsyncCallback
 {
     private final RowRepairResolver repairResolver;
-    private final int count;
+    private final int blockfor;
+    protected final AtomicInteger received = new AtomicInteger(0);
 
-    public AsyncRepairCallback(RowRepairResolver repairResolver, int count)
+    public AsyncRepairCallback(RowRepairResolver repairResolver, int blockfor)
     {
         this.repairResolver = repairResolver;
-        this.count = count;
+        this.blockfor = blockfor;
     }
 
     public void response(Message message)
     {
         repairResolver.preprocess(message);
-        if (repairResolver.getMessageCount() == count)
+        if (received.incrementAndGet() == blockfor)
         {
             StageManager.getStage(Stage.READ_REPAIR).execute(new 
WrappedRunnable()
             {

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 Thu Apr 28 13:24:22 2011
@@ -23,7 +23,6 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadResponse;
@@ -42,12 +41,10 @@ public class DatacenterReadCallback<T> e
 {
     private static final IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
     private static final String localdc = 
snitch.getDatacenter(FBUtilities.getLocalAddress());
-    private AtomicInteger localResponses;
-    
+
     public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel 
consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
         super(resolver, consistencyLevel, command, endpoints);
-        localResponses = new AtomicInteger(blockfor);
     }
 
     @Override
@@ -56,12 +53,13 @@ public class DatacenterReadCallback<T> e
         resolver.preprocess(message);
 
         int n = localdc.equals(snitch.getDatacenter(message.getFrom()))
-                ? localResponses.decrementAndGet()
-                : localResponses.get();
+              ? received.incrementAndGet()
+              : received.get();
 
-        if (n == 0 && resolver.isDataPresent())
+        if (n == blockfor && resolver.isDataPresent())
         {
             condition.signal();
+            maybeResolveForRepair();
         }
     }
     
@@ -70,13 +68,11 @@ public class DatacenterReadCallback<T> e
     {
         ((RowDigestResolver) resolver).injectPreProcessed(result);
 
-        int n = localResponses.decrementAndGet();
-        if (n == 0 && resolver.isDataPresent())
+        if (received.incrementAndGet() == blockfor && resolver.isDataPresent())
         {
             condition.signal();
+            maybeResolveForRepair();
         }
-
-        maybeResolveForRepair();
     }
     
     @Override

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
 Thu Apr 28 13:24:22 2011
@@ -43,7 +43,4 @@ public interface IResponseResolver<T> {
 
     public void preprocess(Message message);
     public Iterable<Message> getMessages();
-
-    /** Potentially called by multiple response threads, so must be 
threadsafe. */
-    public int getMessageCount();
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 Thu Apr 28 13:24:22 2011
@@ -146,9 +146,4 @@ public class RangeSliceResponseResolver 
     {
         return responses;
     }
-
-    public int getMessageCount()
-    {
-        return responses.size();
-    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
 Thu Apr 28 13:24:22 2011
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -62,6 +63,7 @@ public class ReadCallback<T> implements 
     private final long startTime;
     protected final int blockfor;
     private final IReadCommand command;
+    protected final AtomicInteger received = new AtomicInteger(0);
 
     /** the list of endpoints that StorageProxy should send requests to */
     final List<InetAddress> endpoints;
@@ -117,7 +119,7 @@ public class ReadCallback<T> implements 
             StringBuilder sb = new StringBuilder("");
             for (Message message : resolver.getMessages())
                 sb.append(message.getFrom()).append(", ");
-            throw new TimeoutException("Operation timed out - received only " 
+ resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+            throw new TimeoutException("Operation timed out - received only " 
+ received.get() + " responses from " + sb.toString() + " .");
         }
 
         return blockfor == 1 ? resolver.getData() : resolver.resolve();
@@ -126,10 +128,7 @@ public class ReadCallback<T> implements 
     public void response(Message message)
     {
         resolver.preprocess(message);
-        assert resolver.getMessageCount() <= endpoints.size() : "Got " + 
resolver.getMessageCount() + " replies but requests were only sent to " + 
endpoints.size() + " endpoints";
-        if (resolver.getMessageCount() < blockfor)
-            return;
-        if (resolver.isDataPresent())
+        if (received.incrementAndGet() >= blockfor && resolver.isDataPresent())
         {
             condition.signal();
             maybeResolveForRepair();
@@ -139,10 +138,7 @@ public class ReadCallback<T> implements 
     public void response(ReadResponse result)
     {
         ((RowDigestResolver) resolver).injectPreProcessed(result);
-        assert resolver.getMessageCount() <= endpoints.size();
-        if (resolver.getMessageCount() < blockfor)
-            return;
-        if (resolver.isDataPresent())
+        if (received.incrementAndGet() >= blockfor && resolver.isDataPresent())
         {
             condition.signal();
             maybeResolveForRepair();
@@ -155,7 +151,7 @@ public class ReadCallback<T> implements 
      */
     protected void maybeResolveForRepair()
     {
-        if (blockfor < endpoints.size() && resolver.getMessageCount() == 
endpoints.size())
+        if (blockfor < endpoints.size() && received.get() == endpoints.size())
         {
             assert resolver.isDataPresent();
             StageManager.getStage(Stage.READ_REPAIR).execute(new 
AsyncRepairRunner());

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1097448&r1=1097447&r2=1097448&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RepairCallback.java
 Thu Apr 28 13:24:22 2011
@@ -26,6 +26,7 @@ import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -38,6 +39,7 @@ public class RepairCallback<T> implement
     private final List<InetAddress> endpoints;
     private final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
+    protected final AtomicInteger received = new AtomicInteger(0);
 
     /**
      * The main difference between this and ReadCallback is, ReadCallback has 
a ConsistencyLevel
@@ -66,13 +68,13 @@ public class RepairCallback<T> implement
             throw new AssertionError(ex);
         }
 
-        return resolver.getMessageCount() > 1 ? resolver.resolve() : null;
+        return received.get() > 1 ? resolver.resolve() : null;
     }
 
     public void response(Message message)
     {
         resolver.preprocess(message);
-        if (resolver.getMessageCount() == endpoints.size())
+        if (received.incrementAndGet() == endpoints.size())
             condition.signal();
     }
 


Reply via email to