Author: jbellis Date: Wed Jan 5 07:00:33 2011 New Revision: 1055320 URL: http://svn.apache.org/viewvc?rev=1055320&view=rev Log: rename [Datacenter]QuorumResponseHandler -> [Datacenter]ReadCallback patch by jbellis
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java Removed: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1055320&r1=1055319&r2=1055320&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Jan 5 07:00:33 2011 @@ -223,15 +223,6 @@ public abstract class AbstractReplicatio return getAddressRanges(temp).get(pendingAddress); } - public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver responseResolver, ConsistencyLevel consistencyLevel) - { - if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM)) - { - return new DatacenterQuorumResponseHandler(responseResolver, consistencyLevel, table); - } - return new QuorumResponseHandler(responseResolver, consistencyLevel, table); - } - public void invalidateCachedTokenEndpointValues() { clearEndpointCache(); Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1055320&view=auto ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (added) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Wed Jan 5 07:00:33 2011 @@ -0,0 +1,88 @@ +package org.apache.cassandra.service; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.net.InetAddress; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Datacenter Quorum response handler blocks for a quorum of responses from the local DC + */ +public class DatacenterReadCallback<T> extends ReadCallback<T> +{ + private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress()); + private AtomicInteger localResponses; + + public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table) + { + super(resolver, consistencyLevel, table); + localResponses = new AtomicInteger(blockfor); + } + + @Override + public void response(Message message) + { + resolver.preprocess(message); + + int n; + n = localdc.equals(snitch.getDatacenter(message.getFrom())) + ? localResponses.decrementAndGet() + : localResponses.get(); + + if (n == 0 && resolver.isDataPresent()) + { + condition.signal(); + } + } + + @Override + public int determineBlockFor(ConsistencyLevel consistency_level, String table) + { + NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy(); + return (stategy.getReplicationFactor(localdc) / 2) + 1; + } + + @Override + public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException + { + int localEndpoints = 0; + for (InetAddress endpoint : endpoints) + { + if (localdc.equals(snitch.getDatacenter(endpoint))) + localEndpoints++; + } + + if(localEndpoints < blockfor) + throw new UnavailableException(); + } +} Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1055320&view=auto ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java (added) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java Wed Jan 5 07:00:33 2011 @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.SimpleCondition; + +public class ReadCallback<T> implements IAsyncCallback +{ + protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); + + public final IResponseResolver<T> resolver; + protected final SimpleCondition condition = new SimpleCondition(); + private final long startTime; + protected final int blockfor; + + /** + * Constructor when response count has to be calculated and blocked for. + */ + public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table) + { + this.blockfor = determineBlockFor(consistencyLevel, table); + this.resolver = resolver; + this.startTime = System.currentTimeMillis(); + + logger.debug("ReadCallback blocking for {} responses", blockfor); + } + + public T get() throws TimeoutException, DigestMismatchException, IOException + { + long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + boolean success; + try + { + success = condition.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) + { + throw new AssertionError(ex); + } + + if (!success) + { + StringBuilder sb = new StringBuilder(""); + for (Message message : resolver.getMessages()) + sb.append(message.getFrom()).append(", "); + throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " ."); + } + + return blockfor == 1 ? resolver.getData() : resolver.resolve(); + } + + public void close() + { + for (Message response : resolver.getMessages()) + { + MessagingService.removeRegisteredCallback(response.getMessageId()); + } + } + + public void response(Message message) + { + resolver.preprocess(message); + if (resolver.getMessageCount() < blockfor) + return; + if (resolver.isDataPresent()) + condition.signal(); + } + + public int determineBlockFor(ConsistencyLevel consistencyLevel, String table) + { + switch (consistencyLevel) + { + case ONE: + case ANY: + return 1; + case QUORUM: + return (Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1; + case ALL: + return Table.open(table).getReplicationStrategy().getReplicationFactor(); + default: + throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel); + } + } + + public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException + { + if (endpoints.size() < blockfor) + throw new UnavailableException(); + } +} Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055320&r1=1055319&r2=1055320&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 5 07:00:33 2011 @@ -328,7 +328,7 @@ public class StorageProxy implements Sto */ private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException { - List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>(); + List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>(); List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>(); List<Row> rows = new ArrayList<Row>(); Set<ReadCommand> repairs = new HashSet<ReadCommand>(); @@ -347,7 +347,7 @@ public class StorageProxy implements Sto AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy(); ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key); - QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver, consistency_level); + ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level); handler.assureSufficientLiveNodes(endpoints); int targets; @@ -374,7 +374,7 @@ public class StorageProxy implements Sto logger.debug("reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint); } MessagingService.instance().sendRR(messages, endpoints, handler); - quorumResponseHandlers.add(handler); + readCallbacks.add(handler); commandEndpoints.add(endpoints); } @@ -382,22 +382,22 @@ public class StorageProxy implements Sto List<RepairCallback<Row>> repairResponseHandlers = null; for (int i = 0; i < commands.size(); i++) { - QuorumResponseHandler<Row> quorumResponseHandler = quorumResponseHandlers.get(i); + ReadCallback<Row> readCallback = readCallbacks.get(i); Row row; ReadCommand command = commands.get(i); List<InetAddress> endpoints = commandEndpoints.get(i); try { long startTime2 = System.currentTimeMillis(); - row = quorumResponseHandler.get(); + row = readCallback.get(); if (row != null) rows.add(row); if (logger.isDebugEnabled()) - logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms."); + logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms."); if (repairs.contains(command)) - repairExecutor.schedule(new RepairRunner(quorumResponseHandler.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); } catch (DigestMismatchException ex) { @@ -431,6 +431,15 @@ public class StorageProxy implements Sto return rows; } + static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel) + { + if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM)) + { + return new DatacenterReadCallback(resolver, consistencyLevel, table); + } + return new ReadCallback(resolver, consistencyLevel, table); + } + // TODO repair resolver shouldn't take consistencylevel (it should repair exactly as many as it receives replies for) private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints) throws IOException @@ -492,7 +501,7 @@ public class StorageProxy implements Sto // collect replies and resolve according to consistency level RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints); AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy(); - QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); + ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level); // TODO bail early if live endpoints can't satisfy requested consistency level for (InetAddress endpoint : liveEndpoints) { @@ -741,7 +750,7 @@ public class StorageProxy implements Sto // collect replies and resolve according to consistency level RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints); AbstractReplicationStrategy rs = Table.open(keyspace).getReplicationStrategy(); - QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); + ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level); // bail early if live endpoints can't satisfy requested consistency level if(handler.blockfor > liveEndpoints.size()) Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1055320&r1=1055319&r2=1055320&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original) +++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Wed Jan 5 07:00:33 2011 @@ -96,7 +96,7 @@ public class ConsistencyLevelTest extend IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c); - QuorumResponseHandler<Row> readHandler = strategy.getQuorumResponseHandler(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), c); + ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c); boolean isWriteUnavailable = false; boolean isReadUnavailable = false;