Author: jbellis Date: Mon Jul 26 21:05:24 2010 New Revision: 979441 URL: http://svn.apache.org/viewvc?rev=979441&view=rev Log: parallelize local and remote reads during multiget, and respect snitch when determining whether to do local read for CL.ONE. patch by jbellis; tested by Tupshin Harper for CASSANDRA-1317
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=979441&r1=979440&r2=979441&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jul 26 21:05:24 2010 @@ -20,6 +20,8 @@ * add ack to Binary write verb and update CassandraBulkLoader to wait for acks for each row (CASSANDRA-1093) * remove gossip message size limit (CASSANDRA-1138) + * parallelize local and remote reads during multiget, and respect snitch + when determining whether to do local read for CL.ONE (CASSANDRA-1317) 0.6.3 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=979441&r1=979440&r2=979441&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jul 26 21:05:24 2010 @@ -36,6 +36,7 @@ import org.apache.commons.lang.StringUti import com.google.common.collect.AbstractIterator; import com.google.common.collect.Multimap; + import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; @@ -49,10 +50,10 @@ import org.apache.cassandra.net.IAsyncRe import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.LatencyTracker; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; @@ -335,88 +336,94 @@ public class StorageProxy implements Sto } /** - * Read the data from one replica. When we get - * the data we perform consistency checks and figure out if any repairs need to be done to the replicas. - * @param commands a set of commands to perform reads - * @return the row associated with command.key - * @throws Exception + * Performs the actual reading of a row out of the StorageService, fetching + * a specific set of column names from a given column family. */ - private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException + public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level) + throws IOException, UnavailableException, TimeoutException, InvalidRequestException { - if (logger.isDebugEnabled()) - logger.debug("weakreadremote reading " + StringUtils.join(commands, ", ")); - - List<Row> rows = new ArrayList<Row>(); - List<IAsyncResult> iars = new ArrayList<IAsyncResult>(); + if (StorageService.instance.isBootstrapMode()) + throw new InvalidRequestException("This node cannot accept reads until it has bootstrapped"); + long startTime = System.nanoTime(); - for (ReadCommand command: commands) + List<Row> rows; + if (consistency_level == ConsistencyLevel.ONE) { - InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key); - Message message = command.makeReadMessage(); - - if (logger.isDebugEnabled()) - logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint); - if (DatabaseDescriptor.getConsistencyCheck()) - message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes()); - iars.add(MessagingService.instance.sendRR(message, endPoint)); + rows = weakRead(commands); } - - for (IAsyncResult iar: iars) + else { - byte[] body; - body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); - if (response.row() != null) - rows.add(response.row()); + assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue(); + rows = strongRead(commands, consistency_level); } + + readStats.addNano(System.nanoTime() - startTime); return rows; } - /** - * Performs the actual reading of a row out of the StorageService, fetching - * a specific set of column names from a given column family. - */ - public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level) - throws IOException, UnavailableException, TimeoutException + private static List<Row> weakRead(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException { - long startTime = System.nanoTime(); - List<Row> rows = new ArrayList<Row>(); - if (consistency_level == ConsistencyLevel.ONE) + // send off all the commands asynchronously + List<Future<Object>> localFutures = null; + List<IAsyncResult> remoteResults = null; + for (ReadCommand command: commands) { - List<ReadCommand> localCommands = new ArrayList<ReadCommand>(); - List<ReadCommand> remoteCommands = new ArrayList<ReadCommand>(); + InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key); + if (endPoint.equals(FBUtilities.getLocalAddress())) + { + if (logger.isDebugEnabled()) + logger.debug("weakread reading " + command + " locally"); - for (ReadCommand command: commands) + if (localFutures == null) + localFutures = new ArrayList<Future<Object>>(); + Callable<Object> callable = new weakReadLocalCallable(command); + localFutures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable)); + } + else { - List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.table, command.key); - boolean foundLocal = endpoints.contains(FBUtilities.getLocalAddress()); - //TODO: Throw InvalidRequest if we're in bootstrap mode? - if (foundLocal && !StorageService.instance.isBootstrapMode()) + if (remoteResults == null) + remoteResults = new ArrayList<IAsyncResult>(); + Message message = command.makeReadMessage(); + if (logger.isDebugEnabled()) + logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint); + if (DatabaseDescriptor.getConsistencyCheck()) + message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes()); + remoteResults.add(MessagingService.instance.sendRR(message, endPoint)); + } + } + + // wait for results + if (localFutures != null) + { + for (Future<Object> future : localFutures) + { + Row row; + try { - localCommands.add(command); + row = (Row) future.get(); } - else + catch (Exception e) { - remoteCommands.add(command); + throw new RuntimeException(e); } + rows.add(row); } - if (localCommands.size() > 0) - rows.addAll(weakReadLocal(localCommands)); - - if (remoteCommands.size() > 0) - rows.addAll(weakReadRemote(remoteCommands)); } - else + if (remoteResults != null) { - assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue(); - rows = strongRead(commands, consistency_level); + for (IAsyncResult iar: remoteResults) + { + byte[] body; + body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + if (response.row() != null) + rows.add(response.row()); + } } - readStats.addNano(System.nanoTime() - startTime); - return rows; } @@ -521,31 +528,6 @@ public class StorageProxy implements Sto /* * This function executes the read protocol locally. Consistency checks are performed in the background. */ - private static List<Row> weakReadLocal(List<ReadCommand> commands) - { - List<Row> rows = new ArrayList<Row>(); - List<Future<Object>> futures = new ArrayList<Future<Object>>(); - - for (ReadCommand command: commands) - { - Callable<Object> callable = new weakReadLocalCallable(command); - futures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable)); - } - for (Future<Object> future : futures) - { - Row row; - try - { - row = (Row) future.get(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - rows.add(row); - } - return rows; - } public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException