Author: jbellis
Date: Wed Jan  5 07:18:16 2011
New Revision: 1055326

URL: http://svn.apache.org/viewvc?rev=1055326&view=rev
Log:
merge from 0.7

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
Removed:
    
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 07:18:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7:1026516-1055313
+/cassandra/branches/cassandra-0.7:1026516-1055325
 /cassandra/branches/cassandra-0.7.0:1053690-1054631
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan  5 07:18:16 2011
@@ -15,6 +15,8 @@
  * Make snitches configurable at runtime (CASSANDRA-1374)
  * retry hadoop split requests on connection failure (CASSANDRA-1927)
  * implement describeOwnership for BOP, COPP (CASSANDRA-1928)
+ * make read repair behave as expected for ConsistencyLevel > ONE
+   (CASSANDRA-982)
 
 
 0.7.0-rc4

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 07:18:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055325
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1054631
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 07:18:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055325
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1054631
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 07:18:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055325
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1054631
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 07:18:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055325
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1054631
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 07:18:16 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055325
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1054631
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 Wed Jan  5 07:18:16 2011
@@ -223,15 +223,6 @@ public abstract class AbstractReplicatio
         return getAddressRanges(temp).get(pendingAddress);
     }
 
-    public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver 
responseResolver, ConsistencyLevel consistencyLevel)
-    {
-        if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || 
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
-        {
-            return new DatacenterQuorumResponseHandler(responseResolver, 
consistencyLevel, table);
-        }
-        return new QuorumResponseHandler(responseResolver, consistencyLevel, 
table);
-    }
-
     public void invalidateCachedTokenEndpointValues()
     {
         clearEndpointCache();

Added: 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1055326&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 Wed Jan  5 07:18:16 2011
@@ -0,0 +1,88 @@
+package org.apache.cassandra.service;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Datacenter Quorum response handler blocks for a quorum of responses from 
the local DC
+ */
+public class DatacenterReadCallback<T> extends ReadCallback<T>
+{
+    private static final IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
+       private static final String localdc = 
snitch.getDatacenter(FBUtilities.getLocalAddress());
+    private AtomicInteger localResponses;
+    
+    public DatacenterReadCallback(IResponseResolver<T> resolver, 
ConsistencyLevel consistencyLevel, String table)
+    {
+        super(resolver, consistencyLevel, table);
+        localResponses = new AtomicInteger(blockfor);
+    }
+    
+    @Override
+    public void response(Message message)
+    {
+        resolver.preprocess(message);
+
+        int n;
+        n = localdc.equals(snitch.getDatacenter(message.getFrom())) 
+                ? localResponses.decrementAndGet()
+                : localResponses.get();
+
+        if (n == 0 && resolver.isDataPresent())
+        {
+            condition.signal();
+        }
+    }
+    
+    @Override
+    public int determineBlockFor(ConsistencyLevel consistency_level, String 
table)
+       {
+        NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) 
Table.open(table).getReplicationStrategy();
+               return (stategy.getReplicationFactor(localdc) / 2) + 1;
+       }
+
+    @Override
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) 
throws UnavailableException
+    {
+        int localEndpoints = 0;
+        for (InetAddress endpoint : endpoints)
+        {
+            if (localdc.equals(snitch.getDatacenter(endpoint)))
+                localEndpoints++;
+        }
+        
+        if(localEndpoints < blockfor)
+            throw new UnavailableException();
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java 
Wed Jan  5 07:18:16 2011
@@ -24,7 +24,7 @@ import org.apache.cassandra.net.Message;
 
 public interface IResponseResolver<T> {
 
-       /*
+       /**
         * This Method resolves the responses that are passed in . for example 
: if
         * its write response then all we get is true or false return values 
which
         * implies if the writes were successful but for reads its more 
complicated
@@ -33,8 +33,14 @@ public interface IResponseResolver<T> {
         * needs from this interface.
         */
        public T resolve() throws DigestMismatchException, IOException;
+
        public boolean isDataPresent();
 
+    /**
+     * returns the data response without comparing with any digests
+     */
+    public T getData() throws IOException;
+
     public void preprocess(Message message);
     public Iterable<Message> getMessages();
     public int getMessageCount();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 Wed Jan  5 07:18:16 2011
@@ -54,6 +54,15 @@ public class RangeSliceResponseResolver 
         this.table = table;
     }
 
+    public List<Row> getData() throws IOException
+    {
+        Message response = responses.iterator().next();
+        RangeSliceReply reply = 
RangeSliceReply.read(response.getMessageBody());
+        return reply.rows;
+    }
+
+    // Note: this deserializes the response a 2nd time if getData was called 
first
+    // (this is not currently an issue since we don't do read repair for range 
queries.)
     public List<Row> resolve() throws IOException
     {
         CollatingIterator collator = new CollatingIterator(new 
Comparator<Pair<Row,InetAddress>>()

Added: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1055326&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Wed 
Jan  5 07:18:16 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class ReadCallback<T> implements IAsyncCallback
+{
+    protected static final Logger logger = LoggerFactory.getLogger( 
ReadCallback.class );
+
+    public final IResponseResolver<T> resolver;
+    protected final SimpleCondition condition = new SimpleCondition();
+    private final long startTime;
+    protected final int blockfor;
+    
+    /**
+     * Constructor when response count has to be calculated and blocked for.
+     */
+    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel 
consistencyLevel, String table)
+    {
+        this.blockfor = determineBlockFor(consistencyLevel, table);
+        this.resolver = resolver;
+        this.startTime = System.currentTimeMillis();
+
+        logger.debug("ReadCallback blocking for {} responses", blockfor);
+    }
+    
+    public T get() throws TimeoutException, DigestMismatchException, 
IOException
+    {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - 
(System.currentTimeMillis() - startTime);
+        boolean success;
+        try
+        {
+            success = condition.await(timeout, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+
+        if (!success)
+        {
+            StringBuilder sb = new StringBuilder("");
+            for (Message message : resolver.getMessages())
+                sb.append(message.getFrom()).append(", ");
+            throw new TimeoutException("Operation timed out - received only " 
+ resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+        }
+
+        return blockfor == 1 ? resolver.getData() : resolver.resolve();
+    }
+
+    public void close()
+    {
+        for (Message response : resolver.getMessages())
+        {
+            MessagingService.removeRegisteredCallback(response.getMessageId());
+        }
+    }
+    
+    public void response(Message message)
+    {
+        resolver.preprocess(message);
+        if (resolver.getMessageCount() < blockfor)
+            return;
+        if (resolver.isDataPresent())
+            condition.signal();
+    }
+    
+    public int determineBlockFor(ConsistencyLevel consistencyLevel, String 
table)
+    {
+        switch (consistencyLevel)
+        {
+            case ONE:
+            case ANY:
+                return 1;
+            case QUORUM:
+                return 
(Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
+            case ALL:
+                return 
Table.open(table).getReplicationStrategy().getReplicationFactor();
+            default:
+                throw new UnsupportedOperationException("invalid consistency 
level: " + consistencyLevel);
+        }
+    }
+
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) 
throws UnavailableException
+    {
+        if (endpoints.size() < blockfor)
+            throw new UnavailableException();
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java 
Wed Jan  5 07:18:16 2011
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,8 +48,9 @@ public class ReadResponseResolver implem
 {
        private static Logger logger_ = 
LoggerFactory.getLogger(ReadResponseResolver.class);
     private final String table;
-    private final Map<Message, ReadResponse> results = new 
NonBlockingHashMap<Message, ReadResponse>();
+    private final ConcurrentMap<Message, ReadResponse> results = new 
NonBlockingHashMap<Message, ReadResponse>();
     private DecoratedKey key;
+    private ByteBuffer digest;
 
     public ReadResponseResolver(String table, ByteBuffer key)
     {
@@ -56,14 +58,29 @@ public class ReadResponseResolver implem
         this.key = StorageService.getPartitioner().decorateKey(key);
     }
 
+    public Row getData() throws IOException
+    {
+        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            if (!result.isDigestQuery())
+                return result.row();
+        }
+
+        throw new AssertionError("getData should not be invoked when no data 
is present");
+    }
+
     /*
-     * This method handles two different scenarios:
+     * This method handles three different scenarios:
      *
-     * 1) we're handling the initial read, of data from the closest replica + 
digests
+     * 1a)we're handling the initial read, of data from the closest replica + 
digests
      *    from the rest.  In this case we check the digests against each other,
      *    throw an exception if there is a mismatch, otherwise return the data 
row.
      *
-     * 2) there was a mismatch on the initial read, so we redid the digest 
requests
+     * 1b)we're checking additional digests that arrived after the minimum to 
handle
+     *    the requested ConsistencyLevel, i.e. asynchronouse read repair check
+     *
+     * 2) there was a mismatch on the initial read (1a or 1b), so we redid the 
digest requests
      *    as full data reads.  In this case we need to compute the most recent 
version
      *    of each column, and send diffs to out-of-date replicas.
      */
@@ -75,10 +92,13 @@ public class ReadResponseResolver implem
         long startTime = System.currentTimeMillis();
                List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
                List<InetAddress> endpoints = new ArrayList<InetAddress>();
-               ByteBuffer digest = null;
 
         // validate digests against each other; throw immediately on mismatch.
         // also, collects data results into versions/endpoints lists.
+        //
+        // results are cleared as we process them, to avoid unnecessary 
duplication of work
+        // when resolve() is called a second time for read repair on responses 
that were not
+        // necessary to satisfy ConsistencyLevel.
         for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
         {
             ReadResponse result = entry.getValue();
@@ -106,6 +126,8 @@ public class ReadResponseResolver implem
                 versions.add(cf);
                 endpoints.add(from);
             }
+
+            results.remove(message);
         }
 
         if (logger_.isDebugEnabled())

Added: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1055326&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java 
Wed Jan  5 07:18:16 2011
@@ -0,0 +1,55 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class RepairCallback<T> implements IAsyncCallback
+{
+    public final IResponseResolver<T> resolver;
+    private final List<InetAddress> endpoints;
+    protected final SimpleCondition condition = new SimpleCondition();
+    private final long startTime;
+
+    public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> 
endpoints)
+    {
+        this.resolver = resolver;
+        this.endpoints = endpoints;
+        this.startTime = System.currentTimeMillis();
+    }
+
+    /**
+     * The main difference between this and ReadCallback is, ReadCallback has 
a ConsistencyLevel
+     * it needs to achieve.  Repair on the other hand is happy to repair 
whoever replies within the timeout.
+     */
+    public T get() throws TimeoutException, DigestMismatchException, 
IOException
+    {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - 
(System.currentTimeMillis() - startTime);
+        try
+        {
+            condition.await(timeout, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+
+        return resolver.resolve();
+    }
+
+
+    public void response(Message message)
+    {
+        resolver.preprocess(message);
+        if (resolver.getMessageCount() == endpoints.size())
+            condition.signal();
+    }
+
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed 
Jan  5 07:18:16 2011
@@ -50,7 +50,6 @@ import org.apache.cassandra.gms.Gossiper
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.*;
@@ -65,6 +64,8 @@ public class StorageProxy implements Sto
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StorageProxy.class);
 
+    private static ScheduledExecutorService repairExecutor = new 
ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
+
     private static final Random random = new Random();
     // mbean stuff
     private static final LatencyTracker readStats = new LatencyTracker();
@@ -351,15 +352,7 @@ public class StorageProxy implements Sto
         List<Row> rows;
         try
         {
-            if (consistency_level == ConsistencyLevel.ONE)
-            {
-                rows = weakRead(commands);
-            }
-            else
-            {
-                assert consistency_level.getValue() >= 
ConsistencyLevel.QUORUM.getValue();
-                rows = strongRead(commands, consistency_level);
-            }
+            rows = fetchRows(commands, consistency_level);
         }
         finally
         {
@@ -368,91 +361,23 @@ public class StorageProxy implements Sto
         return rows;
     }
 
-    private static List<Row> weakRead(List<ReadCommand> commands) throws 
IOException, UnavailableException, TimeoutException
-    {
-        List<Row> rows = new ArrayList<Row>();
-
-        // send off all the commands asynchronously
-        List<Future<Object>> localFutures = null;
-        HashMap<ReadCommand, IAsyncResult> remoteResults = null;
-        for (ReadCommand command: commands)
-        {
-            InetAddress endPoint = 
StorageService.instance.findSuitableEndpoint(command.table, command.key);
-            if (endPoint.equals(FBUtilities.getLocalAddress()))
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("weakread reading " + command + " locally");
-
-                if (localFutures == null)
-                    localFutures = new ArrayList<Future<Object>>();
-                Callable<Object> callable = new weakReadLocalCallable(command);
-                
localFutures.add(StageManager.getStage(Stage.READ).submit(callable));
-            }
-            else
-            {
-                if (remoteResults == null)
-                    remoteResults = new HashMap<ReadCommand, IAsyncResult>();
-                Message message = command.makeReadMessage();
-                if (logger.isDebugEnabled())
-                    logger.debug("weakread reading " + command + " from " + 
message.getMessageId() + "@" + endPoint);
-                remoteResults.put(command, 
MessagingService.instance().sendRR(message, endPoint));
-            }
-        }
-
-        // wait for results
-        if (localFutures != null)
-        {
-            for (Future<Object> future : localFutures)
-            {
-                Row row;
-                try
-                {
-                    row = (Row) future.get();
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-                rows.add(row);
-            }
-        }
-        if (remoteResults != null)
-        {
-            for (Map.Entry<ReadCommand, IAsyncResult> entry : 
remoteResults.entrySet())
-            {
-                ReadCommand command = entry.getKey();
-                IAsyncResult iar = entry.getValue();
-                byte[] body;
-                body = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
-                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-                ReadResponse response = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                assert response.row() != null;
-                rows.add(response.row());
-                if (randomlyReadRepair(command))
-                    StorageService.instance.doConsistencyCheck(response.row(), 
command, iar.getFrom());
-            }
-        }
-
-        return rows;
-    }
-
-    /*
-     * This function executes the read protocol.
-        // 1. Get the N nodes from storage service where the data needs to be
-        // replicated
-        // 2. Construct a message for read\write
-         * 3. Set one of the messages to get the data and the rest to get the 
digest
-        // 4. SendRR ( to all the nodes above )
-        // 5. Wait for a response from at least X nodes where X <= N and the 
data node
-         * 6. If the digest matches return the data.
-         * 7. else carry out read repair by getting data from all the nodes.
-        // 5. return success
+    /**
+     * This function executes local and remote reads, and blocks for the 
results:
+     *
+     * 1. Get the replica locations, sorted by response time according to the 
snitch
+     * 2. Send a data request to the closest replica, and digest requests to 
either
+     *    a) all the replicas, if read repair is enabled
+     *    b) the closest R-1 replicas, where R is the number required to 
satisfy the ConsistencyLevel
+     * 3. Wait for a response from R replicas
+     * 4. If the digests (if any) match the data return the data
+     * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static List<Row> strongRead(List<ReadCommand> commands, 
ConsistencyLevel consistency_level) throws IOException, UnavailableException, 
TimeoutException
+    private static List<Row> fetchRows(List<ReadCommand> commands, 
ConsistencyLevel consistency_level) throws IOException, UnavailableException, 
TimeoutException
     {
-        List<QuorumResponseHandler<Row>> quorumResponseHandlers = new 
ArrayList<QuorumResponseHandler<Row>>();
+        List<ReadCallback<Row>> readCallbacks = new 
ArrayList<ReadCallback<Row>>();
         List<List<InetAddress>> commandEndpoints = new 
ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
+        Set<ReadCommand> repairs = new HashSet<ReadCommand>();
 
         // send out read requests
         for (ReadCommand command: commands)
@@ -468,53 +393,65 @@ public class StorageProxy implements Sto
 
             AbstractReplicationStrategy rs = 
Table.open(command.table).getReplicationStrategy();
             ReadResponseResolver resolver = new 
ReadResponseResolver(command.table, command.key);
-            QuorumResponseHandler<Row> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
+            ReadCallback<Row> handler = getReadCallback(resolver, 
command.table, consistency_level);
             handler.assureSufficientLiveNodes(endpoints);
 
-            Message messages[] = new Message[endpoints.size()];
+            int targets;
+            if (randomlyReadRepair(command))
+            {
+                targets = endpoints.size();
+                if (targets > handler.blockfor)
+                    repairs.add(command);
+            }
+            else
+            {
+                targets = handler.blockfor;
+            }
+            Message[] messages = new Message[targets];
+
             // data-request message is sent to dataPoint, the node that will 
actually get
             // the data for us. The other replicas are only sent a digest 
query.
-            int n = 0;
-            for (InetAddress endpoint : endpoints)
+            for (int i = 0; i < messages.length; i++)
             {
+                InetAddress endpoint = endpoints.get(i);
                 Message m = endpoint.equals(dataPoint) ? message : 
messageDigestOnly;
-                messages[n++] = m;
+                messages[i] = m;
                 if (logger.isDebugEnabled())
-                    logger.debug("strongread reading " + (m == message ? 
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + 
endpoint);
+                    logger.debug("reading " + (m == message ? "data" : 
"digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
             MessagingService.instance().sendRR(messages, endpoints, handler);
-            quorumResponseHandlers.add(handler);
+            readCallbacks.add(handler);
             commandEndpoints.add(endpoints);
         }
 
         // read results and make a second pass for any digest mismatches
-        List<QuorumResponseHandler<Row>> repairResponseHandlers = null;
+        List<RepairCallback<Row>> repairResponseHandlers = null;
         for (int i = 0; i < commands.size(); i++)
         {
-            QuorumResponseHandler<Row> quorumResponseHandler = 
quorumResponseHandlers.get(i);
+            ReadCallback<Row> readCallback = readCallbacks.get(i);
             Row row;
             ReadCommand command = commands.get(i);
+            List<InetAddress> endpoints = commandEndpoints.get(i);
             try
             {
                 long startTime2 = System.currentTimeMillis();
-                row = quorumResponseHandler.get();
+                row = readCallback.get();
                 if (row != null)
                     rows.add(row);
 
                 if (logger.isDebugEnabled())
-                    logger.debug("quorumResponseHandler: " + 
(System.currentTimeMillis() - startTime2) + " ms.");
+                    logger.debug("Read: " + (System.currentTimeMillis() - 
startTime2) + " ms.");
+
+                if (repairs.contains(command))
+                    repairExecutor.schedule(new 
RepairRunner(readCallback.resolver, command, endpoints), 
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
             catch (DigestMismatchException ex)
             {
-                AbstractReplicationStrategy rs = 
Table.open(command.table).getReplicationStrategy();
-                ReadResponseResolver resolver = new 
ReadResponseResolver(command.table, command.key);
-                QuorumResponseHandler<Row> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
-                Message messageRepair = command.makeReadMessage();
-                MessagingService.instance().sendRR(messageRepair, 
commandEndpoints.get(i), handler);
+                RepairCallback<Row> handler = repair(command, endpoints);
                 if (repairResponseHandlers == null)
-                    repairResponseHandlers = new 
ArrayList<QuorumResponseHandler<Row>>();
+                    repairResponseHandlers = new 
ArrayList<RepairCallback<Row>>();
                 repairResponseHandlers.add(handler);
             }
         }
@@ -522,7 +459,7 @@ public class StorageProxy implements Sto
         // read the results for the digest mismatch retries
         if (repairResponseHandlers != null)
         {
-            for (QuorumResponseHandler<Row> handler : repairResponseHandlers)
+            for (RepairCallback<Row> handler : repairResponseHandlers)
             {
                 try
                 {
@@ -540,6 +477,26 @@ public class StorageProxy implements Sto
         return rows;
     }
 
+    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, 
String table, ConsistencyLevel consistencyLevel)
+    {
+        if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || 
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
+        {
+            return new DatacenterReadCallback(resolver, consistencyLevel, 
table);
+        }
+        return new ReadCallback(resolver, consistencyLevel, table);
+    }
+
+    // TODO repair resolver shouldn't take consistencylevel (it should repair 
exactly as many as it receives replies for)
+    private static RepairCallback<Row> repair(ReadCommand command, 
List<InetAddress> endpoints)
+    throws IOException
+    {
+        ReadResponseResolver resolver = new 
ReadResponseResolver(command.table, command.key);
+        RepairCallback<Row> handler = new RepairCallback<Row>(resolver, 
endpoints);
+        Message messageRepair = command.makeReadMessage();
+        MessagingService.instance().sendRR(messageRepair, endpoints, handler);
+        return handler;
+    }
+
     /*
     * This function executes the read protocol locally.  Consistency checks 
are performed in the background.
     */
@@ -590,7 +547,7 @@ public class StorageProxy implements Sto
                     // collect replies and resolve according to consistency 
level
                     RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
                     AbstractReplicationStrategy rs = 
Table.open(command.keyspace).getReplicationStrategy();
-                    QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
+                    ReadCallback<List<Row>> handler = 
getReadCallback(resolver, command.keyspace, consistency_level);
                     // TODO bail early if live endpoints can't satisfy 
requested consistency level
                     for (InetAddress endpoint : liveEndpoints) 
                     {
@@ -837,7 +794,7 @@ public class StorageProxy implements Sto
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(keyspace, liveEndpoints);
             AbstractReplicationStrategy rs = 
Table.open(keyspace).getReplicationStrategy();
-            QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
+            ReadCallback<List<Row>> handler = getReadCallback(resolver, 
keyspace, consistency_level);
             
             // bail early if live endpoints can't satisfy requested 
consistency level
             if(handler.blockfor > liveEndpoints.size())
@@ -889,31 +846,6 @@ public class StorageProxy implements Sto
         return hintedHandoffEnabled;
     }
 
-    static class weakReadLocalCallable implements Callable<Object>
-    {
-        private ReadCommand command;
-
-        weakReadLocalCallable(ReadCommand command)
-        {
-            this.command = command;
-        }
-
-        public Object call() throws IOException
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("weakreadlocal reading " + command);
-
-            Table table = Table.open(command.table);
-            Row row = command.getRow(table);
-
-            // Do the consistency checks in the background
-            if (randomlyReadRepair(command))
-                StorageService.instance.doConsistencyCheck(row, command, 
FBUtilities.getLocalAddress());
-
-            return row;
-        }
-    }
-
     /**
      * Performs the truncate operatoin, which effectively deletes all data from
      * the column family cfname
@@ -959,4 +891,32 @@ public class StorageProxy implements Sto
     {
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
+
+    private static class RepairRunner extends WrappedRunnable
+    {
+        private final IResponseResolver<Row> resolver;
+        private final ReadCommand command;
+        private final List<InetAddress> endpoints;
+
+        public RepairRunner(IResponseResolver<Row> resolver, ReadCommand 
command, List<InetAddress> endpoints)
+        {
+            this.resolver = resolver;
+            this.command = command;
+            this.endpoints = endpoints;
+        }
+
+        protected void runMayThrow() throws IOException
+        {
+            try
+            {
+                resolver.resolve();
+            }
+            catch (DigestMismatchException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Digest mismatch:", e);
+                repair(command, endpoints);
+            }
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Wed Jan  5 07:18:16 2011
@@ -454,18 +454,6 @@ public class StorageService implements I
     }
 
     /**
-     * This method performs the requisite operations to make
-     * sure that the N replicas are in sync. We do this in the
-     * background when we do not care much about consistency.
-     */
-    public void doConsistencyCheck(Row row, ReadCommand command, InetAddress 
dataSource)
-    {
-        List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-        if (endpoints.size() > 1)
-            consistencyManager_.submit(new ConsistencyChecker(command, row, 
endpoints, dataSource));
-    }
-
-    /**
      * for a keyspace, return the ranges and corresponding hosts for a given 
keyspace.
      * @param keyspace
      * @return

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
 Wed Jan  5 07:18:16 2011
@@ -96,7 +96,7 @@ public class ConsistencyLevelTest extend
 
                     IWriteResponseHandler writeHandler = 
strategy.getWriteResponseHandler(hosts, hintedNodes, c);
 
-                    QuorumResponseHandler<Row> readHandler = 
strategy.getQuorumResponseHandler(new ReadResponseResolver(table, 
ByteBufferUtil.bytes("foo")), c);
+                    ReadCallback<Row> readHandler = 
StorageProxy.getReadCallback(new ReadResponseResolver(table, 
ByteBufferUtil.bytes("foo")), table, c);
 
                     boolean isWriteUnavailable = false;
                     boolean isReadUnavailable = false;


Reply via email to