Author: jbellis
Date: Wed Jan  5 07:00:33 2011
New Revision: 1055320

URL: http://svn.apache.org/viewvc?rev=1055320&view=rev
Log:
rename [Datacenter]QuorumResponseHandler -> [Datacenter]ReadCallback
patch by jbellis

Added:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
Removed:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1055320&r1=1055319&r2=1055320&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 Wed Jan  5 07:00:33 2011
@@ -223,15 +223,6 @@ public abstract class AbstractReplicatio
         return getAddressRanges(temp).get(pendingAddress);
     }
 
-    public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver 
responseResolver, ConsistencyLevel consistencyLevel)
-    {
-        if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || 
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
-        {
-            return new DatacenterQuorumResponseHandler(responseResolver, 
consistencyLevel, table);
-        }
-        return new QuorumResponseHandler(responseResolver, consistencyLevel, 
table);
-    }
-
     public void invalidateCachedTokenEndpointValues()
     {
         clearEndpointCache();

Added: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1055320&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
 Wed Jan  5 07:00:33 2011
@@ -0,0 +1,88 @@
+package org.apache.cassandra.service;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Datacenter Quorum response handler blocks for a quorum of responses from 
the local DC
+ */
+public class DatacenterReadCallback<T> extends ReadCallback<T>
+{
+    private static final IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
+       private static final String localdc = 
snitch.getDatacenter(FBUtilities.getLocalAddress());
+    private AtomicInteger localResponses;
+    
+    public DatacenterReadCallback(IResponseResolver<T> resolver, 
ConsistencyLevel consistencyLevel, String table)
+    {
+        super(resolver, consistencyLevel, table);
+        localResponses = new AtomicInteger(blockfor);
+    }
+    
+    @Override
+    public void response(Message message)
+    {
+        resolver.preprocess(message);
+
+        int n;
+        n = localdc.equals(snitch.getDatacenter(message.getFrom())) 
+                ? localResponses.decrementAndGet()
+                : localResponses.get();
+
+        if (n == 0 && resolver.isDataPresent())
+        {
+            condition.signal();
+        }
+    }
+    
+    @Override
+    public int determineBlockFor(ConsistencyLevel consistency_level, String 
table)
+       {
+        NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) 
Table.open(table).getReplicationStrategy();
+               return (stategy.getReplicationFactor(localdc) / 2) + 1;
+       }
+
+    @Override
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) 
throws UnavailableException
+    {
+        int localEndpoints = 0;
+        for (InetAddress endpoint : endpoints)
+        {
+            if (localdc.equals(snitch.getDatacenter(endpoint)))
+                localEndpoints++;
+        }
+        
+        if(localEndpoints < blockfor)
+            throw new UnavailableException();
+    }
+}

Added: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1055320&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
 Wed Jan  5 07:00:33 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class ReadCallback<T> implements IAsyncCallback
+{
+    protected static final Logger logger = LoggerFactory.getLogger( 
ReadCallback.class );
+
+    public final IResponseResolver<T> resolver;
+    protected final SimpleCondition condition = new SimpleCondition();
+    private final long startTime;
+    protected final int blockfor;
+    
+    /**
+     * Constructor when response count has to be calculated and blocked for.
+     */
+    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel 
consistencyLevel, String table)
+    {
+        this.blockfor = determineBlockFor(consistencyLevel, table);
+        this.resolver = resolver;
+        this.startTime = System.currentTimeMillis();
+
+        logger.debug("ReadCallback blocking for {} responses", blockfor);
+    }
+    
+    public T get() throws TimeoutException, DigestMismatchException, 
IOException
+    {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - 
(System.currentTimeMillis() - startTime);
+        boolean success;
+        try
+        {
+            success = condition.await(timeout, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+
+        if (!success)
+        {
+            StringBuilder sb = new StringBuilder("");
+            for (Message message : resolver.getMessages())
+                sb.append(message.getFrom()).append(", ");
+            throw new TimeoutException("Operation timed out - received only " 
+ resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+        }
+
+        return blockfor == 1 ? resolver.getData() : resolver.resolve();
+    }
+
+    public void close()
+    {
+        for (Message response : resolver.getMessages())
+        {
+            MessagingService.removeRegisteredCallback(response.getMessageId());
+        }
+    }
+    
+    public void response(Message message)
+    {
+        resolver.preprocess(message);
+        if (resolver.getMessageCount() < blockfor)
+            return;
+        if (resolver.isDataPresent())
+            condition.signal();
+    }
+    
+    public int determineBlockFor(ConsistencyLevel consistencyLevel, String 
table)
+    {
+        switch (consistencyLevel)
+        {
+            case ONE:
+            case ANY:
+                return 1;
+            case QUORUM:
+                return 
(Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
+            case ALL:
+                return 
Table.open(table).getReplicationStrategy().getReplicationFactor();
+            default:
+                throw new UnsupportedOperationException("invalid consistency 
level: " + consistencyLevel);
+        }
+    }
+
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) 
throws UnavailableException
+    {
+        if (endpoints.size() < blockfor)
+            throw new UnavailableException();
+    }
+}

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055320&r1=1055319&r2=1055320&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Wed Jan  5 07:00:33 2011
@@ -328,7 +328,7 @@ public class StorageProxy implements Sto
      */
     private static List<Row> fetchRows(List<ReadCommand> commands, 
ConsistencyLevel consistency_level) throws IOException, UnavailableException, 
TimeoutException
     {
-        List<QuorumResponseHandler<Row>> quorumResponseHandlers = new 
ArrayList<QuorumResponseHandler<Row>>();
+        List<ReadCallback<Row>> readCallbacks = new 
ArrayList<ReadCallback<Row>>();
         List<List<InetAddress>> commandEndpoints = new 
ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
         Set<ReadCommand> repairs = new HashSet<ReadCommand>();
@@ -347,7 +347,7 @@ public class StorageProxy implements Sto
 
             AbstractReplicationStrategy rs = 
Table.open(command.table).getReplicationStrategy();
             ReadResponseResolver resolver = new 
ReadResponseResolver(command.table, command.key);
-            QuorumResponseHandler<Row> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
+            ReadCallback<Row> handler = getReadCallback(resolver, 
command.table, consistency_level);
             handler.assureSufficientLiveNodes(endpoints);
 
             int targets;
@@ -374,7 +374,7 @@ public class StorageProxy implements Sto
                     logger.debug("reading " + (m == message ? "data" : 
"digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
             MessagingService.instance().sendRR(messages, endpoints, handler);
-            quorumResponseHandlers.add(handler);
+            readCallbacks.add(handler);
             commandEndpoints.add(endpoints);
         }
 
@@ -382,22 +382,22 @@ public class StorageProxy implements Sto
         List<RepairCallback<Row>> repairResponseHandlers = null;
         for (int i = 0; i < commands.size(); i++)
         {
-            QuorumResponseHandler<Row> quorumResponseHandler = 
quorumResponseHandlers.get(i);
+            ReadCallback<Row> readCallback = readCallbacks.get(i);
             Row row;
             ReadCommand command = commands.get(i);
             List<InetAddress> endpoints = commandEndpoints.get(i);
             try
             {
                 long startTime2 = System.currentTimeMillis();
-                row = quorumResponseHandler.get();
+                row = readCallback.get();
                 if (row != null)
                     rows.add(row);
 
                 if (logger.isDebugEnabled())
-                    logger.debug("quorumResponseHandler: " + 
(System.currentTimeMillis() - startTime2) + " ms.");
+                    logger.debug("Read: " + (System.currentTimeMillis() - 
startTime2) + " ms.");
 
                 if (repairs.contains(command))
-                    repairExecutor.schedule(new 
RepairRunner(quorumResponseHandler.resolver, command, endpoints), 
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                    repairExecutor.schedule(new 
RepairRunner(readCallback.resolver, command, endpoints), 
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
             catch (DigestMismatchException ex)
             {
@@ -431,6 +431,15 @@ public class StorageProxy implements Sto
         return rows;
     }
 
+    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, 
String table, ConsistencyLevel consistencyLevel)
+    {
+        if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || 
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
+        {
+            return new DatacenterReadCallback(resolver, consistencyLevel, 
table);
+        }
+        return new ReadCallback(resolver, consistencyLevel, table);
+    }
+
     // TODO repair resolver shouldn't take consistencylevel (it should repair 
exactly as many as it receives replies for)
     private static RepairCallback<Row> repair(ReadCommand command, 
List<InetAddress> endpoints)
     throws IOException
@@ -492,7 +501,7 @@ public class StorageProxy implements Sto
                     // collect replies and resolve according to consistency 
level
                     RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
                     AbstractReplicationStrategy rs = 
Table.open(command.keyspace).getReplicationStrategy();
-                    QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
+                    ReadCallback<List<Row>> handler = 
getReadCallback(resolver, command.keyspace, consistency_level);
                     // TODO bail early if live endpoints can't satisfy 
requested consistency level
                     for (InetAddress endpoint : liveEndpoints) 
                     {
@@ -741,7 +750,7 @@ public class StorageProxy implements Sto
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(keyspace, liveEndpoints);
             AbstractReplicationStrategy rs = 
Table.open(keyspace).getReplicationStrategy();
-            QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level);
+            ReadCallback<List<Row>> handler = getReadCallback(resolver, 
keyspace, consistency_level);
             
             // bail early if live endpoints can't satisfy requested 
consistency level
             if(handler.blockfor > liveEndpoints.size())

Modified: 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1055320&r1=1055319&r2=1055320&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
 Wed Jan  5 07:00:33 2011
@@ -96,7 +96,7 @@ public class ConsistencyLevelTest extend
 
                     IWriteResponseHandler writeHandler = 
strategy.getWriteResponseHandler(hosts, hintedNodes, c);
 
-                    QuorumResponseHandler<Row> readHandler = 
strategy.getQuorumResponseHandler(new ReadResponseResolver(table, 
ByteBufferUtil.bytes("foo")), c);
+                    ReadCallback<Row> readHandler = 
StorageProxy.getReadCallback(new ReadResponseResolver(table, 
ByteBufferUtil.bytes("foo")), table, c);
 
                     boolean isWriteUnavailable = false;
                     boolean isReadUnavailable = false;


Reply via email to