Author: jbellis
Date: Thu Aug 11 19:29:38 2011
New Revision: 1156758

URL: http://svn.apache.org/viewvc?rev=1156758&view=rev
Log:
provide monotonic read consistency
patch by jbellis; reviewed by slebresne for CASSANDRA-2494

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Aug 11 19:29:38 2011
@@ -30,6 +30,9 @@
  * make column family backed column map pluggable and introduce unsynchronized
    ArrayList backed one to speedup reads (CASSANDRA-2843)
  * refactoring of the secondary index api (CASSANDRA-2982)
+ * make CL > ONE reads wait for digest reconciliation before returning
+   (CASSANDRA-2494)
+
 
 0.8.4
  * include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 Thu Aug 11 19:29:38 2011
@@ -22,15 +22,18 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.AbstractIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RangeSliceReply;
 import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -43,9 +46,19 @@ import org.apache.cassandra.utils.MergeI
 public class RangeSliceResponseResolver implements 
IResponseResolver<Iterable<Row>>
 {
     private static final Logger logger_ = 
LoggerFactory.getLogger(RangeSliceResponseResolver.class);
+
+    private static final Comparator<Pair<Row,InetAddress>> pairComparator = 
new Comparator<Pair<Row, InetAddress>>()
+    {
+        public int compare(Pair<Row, InetAddress> o1, Pair<Row, InetAddress> 
o2)
+        {
+            return o1.left.key.compareTo(o2.left.key);
+        }
+    };
+
     private final String table;
     private final List<InetAddress> sources;
     protected final Collection<Message> responses = new 
LinkedBlockingQueue<Message>();;
+    public final List<IAsyncResult> repairResults = new 
ArrayList<IAsyncResult>();
 
     public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
@@ -73,50 +86,7 @@ public class RangeSliceResponseResolver 
             iters.add(new RowIterator(reply.rows.iterator(), 
response.getFrom()));
         }
         // for each row, compute the combination of all different versions 
seen, and repair incomplete versions
-        MergeIterator<Pair<Row,InetAddress>, Row> iter = 
MergeIterator.get(iters, new Comparator<Pair<Row,InetAddress>>()
-        {
-            public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress> 
o2)
-            {
-                return o1.left.key.compareTo(o2.left.key);
-            }
-        }, new MergeIterator.Reducer<Pair<Row,InetAddress>, Row>()
-        {
-            List<ColumnFamily> versions = new 
ArrayList<ColumnFamily>(sources.size());
-            List<InetAddress> versionSources = new 
ArrayList<InetAddress>(sources.size());
-            DecoratedKey key;
-
-            public void reduce(Pair<Row,InetAddress> current)
-            {
-                key = current.left.key;
-                versions.add(current.left.cf);
-                versionSources.add(current.right);
-            }
-
-            protected Row getReduced()
-            {
-                ColumnFamily resolved = versions.size() > 1
-                                      ? 
RowRepairResolver.resolveSuperset(versions)
-                                      : versions.get(0);
-                if (versions.size() < sources.size())
-                {
-                    // add placeholder rows for sources that didn't have any 
data, so maybeScheduleRepairs sees them
-                    for (InetAddress source : sources)
-                    {
-                        if (!versionSources.contains(source))
-                        {
-                            versions.add(null);
-                            versionSources.add(source);
-                        }
-                    }
-                }
-                // resolved can be null even if versions doesn't have all 
nulls because of the call to removeDeleted in resolveSuperSet
-                if (resolved != null)
-                    RowRepairResolver.maybeScheduleRepairs(resolved, table, 
key, versions, versionSources);
-                versions.clear();
-                versionSources.clear();
-                return new Row(key, resolved);
-            }
-        });
+        MergeIterator<Pair<Row,InetAddress>, Row> iter = 
MergeIterator.get(iters, pairComparator, new Reducer());
 
         List<Row> resolvedRows = new ArrayList<Row>(n);
         while (iter.hasNext())
@@ -163,4 +133,43 @@ public class RangeSliceResponseResolver 
     {
         throw new UnsupportedOperationException();
     }
+
+    private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, 
Row>
+    {
+        List<ColumnFamily> versions = new 
ArrayList<ColumnFamily>(sources.size());
+        List<InetAddress> versionSources = new 
ArrayList<InetAddress>(sources.size());
+        DecoratedKey key;
+
+        public void reduce(Pair<Row,InetAddress> current)
+        {
+            key = current.left.key;
+            versions.add(current.left.cf);
+            versionSources.add(current.right);
+        }
+
+        protected Row getReduced()
+        {
+            ColumnFamily resolved = versions.size() > 1
+                                  ? RowRepairResolver.resolveSuperset(versions)
+                                  : versions.get(0);
+            if (versions.size() < sources.size())
+            {
+                // add placeholder rows for sources that didn't have any data, 
so maybeScheduleRepairs sees them
+                for (InetAddress source : sources)
+                {
+                    if (!versionSources.contains(source))
+                    {
+                        versions.add(null);
+                        versionSources.add(source);
+                    }
+                }
+            }
+            // resolved can be null even if versions doesn't have all nulls 
because of the call to removeDeleted in resolveSuperSet
+            if (resolved != null)
+                
repairResults.addAll(RowRepairResolver.scheduleRepairs(resolved, table, key, 
versions, versionSources));
+            versions.clear();
+            versionSources.clear();
+            return new Row(key, resolved);
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java 
Thu Aug 11 19:29:38 2011
@@ -29,13 +29,14 @@ import java.util.concurrent.TimeoutExcep
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Row;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.SimpleCondition;
 
-public class RepairCallback<T> implements IAsyncCallback
+public class RepairCallback implements IAsyncCallback
 {
-    private final IResponseResolver<T> resolver;
+    public final RowRepairResolver resolver;
     private final List<InetAddress> endpoints;
     private final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
@@ -49,14 +50,14 @@ public class RepairCallback<T> implement
      * mismatch, and we're going to do full-data reads from everyone -- that 
is, this is the final
      * stage in the read process.)
      */
-    public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> 
endpoints)
+    public RepairCallback(RowRepairResolver resolver, List<InetAddress> 
endpoints)
     {
         this.resolver = resolver;
         this.endpoints = endpoints;
         this.startTime = System.currentTimeMillis();
     }
 
-    public T get() throws TimeoutException, DigestMismatchException, 
IOException
+    public Row get() throws TimeoutException, DigestMismatchException, 
IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - 
(System.currentTimeMillis() - startTime);
         try

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java 
Thu Aug 11 19:29:38 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -31,13 +32,16 @@ import org.apache.cassandra.db.columnite
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class RowRepairResolver extends AbstractRowResolver
 {
     protected int maxLiveColumns = 0;
+    public List<IAsyncResult> repairResults = Collections.emptyList();
 
     public RowRepairResolver(String table, ByteBuffer key)
     {
@@ -89,7 +93,7 @@ public class RowRepairResolver extends A
                 logger.debug("versions merged");
             // resolved can be null even if versions doesn't have all nulls 
because of the call to removeDeleted in resolveSuperSet
             if (resolved != null)
-                maybeScheduleRepairs(resolved, table, key, versions, 
endpoints);
+                repairResults = scheduleRepairs(resolved, table, key, 
versions, endpoints);
         }
         else
         {
@@ -106,8 +110,10 @@ public class RowRepairResolver extends A
      * For each row version, compare with resolved (the superset of all row 
versions);
      * if it is missing anything, send a mutation to the endpoint it come from.
      */
-    public static void maybeScheduleRepairs(ColumnFamily resolved, String 
table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> 
endpoints)
+    public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, 
String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> 
endpoints)
     {
+        List<IAsyncResult> results = new 
ArrayList<IAsyncResult>(versions.size());
+
         for (int i = 0; i < versions.size(); i++)
         {
             ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
@@ -126,8 +132,10 @@ public class RowRepairResolver extends A
             {
                 throw new IOError(e);
             }
-            MessagingService.instance().sendOneWay(repairMessage, 
endpoints.get(i));
+            results.add(MessagingService.instance().sendRR(repairMessage, 
endpoints.get(i)));
         }
+
+        return results;
     }
 
     static ColumnFamily resolveSuperset(List<ColumnFamily> versions)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu 
Aug 11 19:29:38 2011
@@ -30,8 +30,9 @@ import javax.management.ObjectName;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import org.apache.cassandra.net.CachingMessageProducer;
-import org.apache.cassandra.net.MessageProducer;
+
+import org.apache.cassandra.net.*;
+
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -49,9 +50,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.IndexClause;
@@ -571,7 +569,7 @@ public class StorageProxy implements Sto
                 repairCommands.clear();
 
             // read results and make a second pass for any digest mismatches
-            List<RepairCallback<Row>> repairResponseHandlers = null;
+            List<RepairCallback> repairResponseHandlers = null;
             for (int i = 0; i < commandsToSend.size(); i++)
             {
                 ReadCallback<Row> handler = readCallbacks.get(i);
@@ -598,7 +596,7 @@ public class StorageProxy implements Sto
                     if (logger.isDebugEnabled())
                         logger.debug("Digest mismatch: {}", ex.toString());
                     RowRepairResolver resolver = new 
RowRepairResolver(command.table, command.key);
-                    RepairCallback<Row> repairHandler = new 
RepairCallback<Row>(resolver, handler.endpoints);
+                    RepairCallback repairHandler = new 
RepairCallback(resolver, handler.endpoints);
 
                     if (repairCommands == Collections.EMPTY_LIST)
                         repairCommands = new ArrayList<ReadCommand>();
@@ -608,7 +606,7 @@ public class StorageProxy implements Sto
                         MessagingService.instance().sendRR(command, endpoint, 
repairHandler);
 
                     if (repairResponseHandlers == null)
-                        repairResponseHandlers = new 
ArrayList<RepairCallback<Row>>();
+                        repairResponseHandlers = new 
ArrayList<RepairCallback>();
                     repairResponseHandlers.add(repairHandler);
                 }
             }
@@ -622,48 +620,49 @@ public class StorageProxy implements Sto
                 for (int i = 0; i < repairCommands.size(); i++)
                 {
                     ReadCommand command = repairCommands.get(i);
-                    RepairCallback<Row> handler = 
repairResponseHandlers.get(i);
+                    RepairCallback handler = repairResponseHandlers.get(i);
+                    FBUtilities.waitOnFutures(handler.resolver.repairResults, 
DatabaseDescriptor.getRpcTimeout());
 
+                    Row row;
                     try
                     {
-                        Row row = handler.get();
-
-                        if (command instanceof SliceFromReadCommand)
-                        {
-                            // short reads are only possible on 
SliceFromReadCommand
-                            SliceFromReadCommand sliceCommand = 
(SliceFromReadCommand)command;
-                            int maxLiveColumns = handler.getMaxLiveColumns();
-                            int liveColumnsInRow = row != null ? 
row.cf.getLiveColumnCount() : 0;
-
-                            assert maxLiveColumns <= sliceCommand.count;
-                            if ((maxLiveColumns == sliceCommand.count) && 
(liveColumnsInRow < sliceCommand.count))
-                            {
-                                if (logger.isDebugEnabled())
-                                    logger.debug("detected short read: 
expected {} columns, but only resolved {} columns",
-                                                 sliceCommand.count, 
liveColumnsInRow);
-
-                                int retryCount = sliceCommand.count + 
sliceCommand.count - liveColumnsInRow;
-                                SliceFromReadCommand retryCommand = new 
SliceFromReadCommand(command.table,
-                                                                               
              command.key,
-                                                                               
              command.queryPath,
-                                                                               
              sliceCommand.start,
-                                                                               
              sliceCommand.finish,
-                                                                               
              sliceCommand.reversed,
-                                                                               
              retryCount);
-                                if (commandsToRetry == Collections.EMPTY_LIST)
-                                    commandsToRetry = new 
ArrayList<ReadCommand>();
-                                commandsToRetry.add(retryCommand);
-                            }
-                            else if (row != null)
-                                rows.add(row);
-                        }
-                        else if (row != null)
-                            rows.add(row);
+                        row = handler.get();
                     }
                     catch (DigestMismatchException e)
                     {
                         throw new AssertionError(e); // full data requested 
from each node here, no digests should be sent
                     }
+
+                    // retry short reads, otherwise add the row to our 
resultset
+                    if (command instanceof SliceFromReadCommand)
+                    {
+                        // short reads are only possible on 
SliceFromReadCommand
+                        SliceFromReadCommand sliceCommand = 
(SliceFromReadCommand) command;
+                        int maxLiveColumns = handler.getMaxLiveColumns();
+                        int liveColumnsInRow = row != null ? 
row.cf.getLiveColumnCount() : 0;
+
+                        assert maxLiveColumns <= sliceCommand.count;
+                        if ((maxLiveColumns == sliceCommand.count) && 
(liveColumnsInRow < sliceCommand.count))
+                        {
+                            if (logger.isDebugEnabled())
+                                logger.debug("detected short read: expected {} 
columns, but only resolved {} columns",
+                                             sliceCommand.count, 
liveColumnsInRow);
+
+                            int retryCount = sliceCommand.count + 
sliceCommand.count - liveColumnsInRow;
+                            SliceFromReadCommand retryCommand = new 
SliceFromReadCommand(command.table,
+                                                                               
          command.key,
+                                                                               
          command.queryPath,
+                                                                               
          sliceCommand.start,
+                                                                               
          sliceCommand.finish,
+                                                                               
          sliceCommand.reversed,
+                                                                               
          retryCount);
+                            if (commandsToRetry == Collections.EMPTY_LIST)
+                                commandsToRetry = new ArrayList<ReadCommand>();
+                            commandsToRetry.add(retryCommand);
+                            continue;
+                        }
+                    }
+                    rows.add(row);
                 }
             }
         } while (!commandsToRetry.isEmpty());
@@ -769,6 +768,7 @@ public class StorageProxy implements Sto
                             rows.add(row);
                             logger.debug("range slices read {}", row.key);
                         }
+                        FBUtilities.waitOnFutures(resolver.repairResults, 
DatabaseDescriptor.getRpcTimeout());
                     }
                     catch (TimeoutException ex)
                     {
@@ -1035,6 +1035,7 @@ public class StorageProxy implements Sto
                     rows.add(row);
                     logger.debug("read {}", row);
                 }
+                FBUtilities.waitOnFutures(resolver.repairResults, 
DatabaseDescriptor.getRpcTimeout());
             }
             catch (TimeoutException ex)
             {
@@ -1044,7 +1045,7 @@ public class StorageProxy implements Sto
             }
             catch (DigestMismatchException e)
             {
-                throw new RuntimeException(e);
+                throw new AssertionError(e);
             }
             if (rows.size() >= index_clause.count)
                 return rows.subList(0, index_clause.count);

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu 
Aug 11 19:29:38 2011
@@ -32,6 +32,8 @@ import java.security.NoSuchAlgorithmExce
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -50,6 +52,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.PropertyFileSnitch;
+import org.apache.cassandra.net.IAsyncResult;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -575,6 +578,12 @@ public class FBUtilities
         }
     }
 
+    public static void waitOnFutures(List<IAsyncResult> results, long ms) 
throws TimeoutException
+    {
+        for (IAsyncResult result : results)
+            result.get(ms, TimeUnit.MILLISECONDS);
+    }
+
     public static IPartitioner newPartitioner(String partitionerClassName) 
throws ConfigurationException
     {
         if (!partitionerClassName.contains("."))


Reply via email to