Author: jbellis Date: Wed Jan 5 07:18:16 2011 New Revision: 1055326 URL: http://svn.apache.org/viewvc?rev=1055326&view=rev Log: merge from 0.7
Added: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java Removed: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 5 07:18:16 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311 -/cassandra/branches/cassandra-0.7:1026516-1055313 +/cassandra/branches/cassandra-0.7:1026516-1055325 /cassandra/branches/cassandra-0.7.0:1053690-1054631 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3:774578-796573 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Jan 5 07:18:16 2011 @@ -15,6 +15,8 @@ * Make snitches configurable at runtime (CASSANDRA-1374) * retry hadoop split requests on connection failure (CASSANDRA-1927) * implement describeOwnership for BOP, COPP (CASSANDRA-1928) + * make read repair behave as expected for ConsistencyLevel > ONE + (CASSANDRA-982) 0.7.0-rc4 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 5 07:18:16 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055313 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055325 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1054631 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 5 07:18:16 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055313 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055325 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1054631 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 5 07:18:16 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055313 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055325 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1054631 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 5 07:18:16 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055313 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055325 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1054631 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 5 07:18:16 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055313 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055325 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1054631 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Jan 5 07:18:16 2011 @@ -223,15 +223,6 @@ public abstract class AbstractReplicatio return getAddressRanges(temp).get(pendingAddress); } - public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver responseResolver, ConsistencyLevel consistencyLevel) - { - if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM)) - { - return new DatacenterQuorumResponseHandler(responseResolver, consistencyLevel, table); - } - return new QuorumResponseHandler(responseResolver, consistencyLevel, table); - } - public void invalidateCachedTokenEndpointValues() { clearEndpointCache(); Added: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1055326&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Wed Jan 5 07:18:16 2011 @@ -0,0 +1,88 @@ +package org.apache.cassandra.service; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.net.InetAddress; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Datacenter Quorum response handler blocks for a quorum of responses from the local DC + */ +public class DatacenterReadCallback<T> extends ReadCallback<T> +{ + private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress()); + private AtomicInteger localResponses; + + public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table) + { + super(resolver, consistencyLevel, table); + localResponses = new AtomicInteger(blockfor); + } + + @Override + public void response(Message message) + { + resolver.preprocess(message); + + int n; + n = localdc.equals(snitch.getDatacenter(message.getFrom())) + ? localResponses.decrementAndGet() + : localResponses.get(); + + if (n == 0 && resolver.isDataPresent()) + { + condition.signal(); + } + } + + @Override + public int determineBlockFor(ConsistencyLevel consistency_level, String table) + { + NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy(); + return (stategy.getReplicationFactor(localdc) / 2) + 1; + } + + @Override + public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException + { + int localEndpoints = 0; + for (InetAddress endpoint : endpoints) + { + if (localdc.equals(snitch.getDatacenter(endpoint))) + localEndpoints++; + } + + if(localEndpoints < blockfor) + throw new UnavailableException(); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Wed Jan 5 07:18:16 2011 @@ -24,7 +24,7 @@ import org.apache.cassandra.net.Message; public interface IResponseResolver<T> { - /* + /** * This Method resolves the responses that are passed in . for example : if * its write response then all we get is true or false return values which * implies if the writes were successful but for reads its more complicated @@ -33,8 +33,14 @@ public interface IResponseResolver<T> { * needs from this interface. */ public T resolve() throws DigestMismatchException, IOException; + public boolean isDataPresent(); + /** + * returns the data response without comparing with any digests + */ + public T getData() throws IOException; + public void preprocess(Message message); public Iterable<Message> getMessages(); public int getMessageCount(); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Wed Jan 5 07:18:16 2011 @@ -54,6 +54,15 @@ public class RangeSliceResponseResolver this.table = table; } + public List<Row> getData() throws IOException + { + Message response = responses.iterator().next(); + RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody()); + return reply.rows; + } + + // Note: this deserializes the response a 2nd time if getData was called first + // (this is not currently an issue since we don't do read repair for range queries.) public List<Row> resolve() throws IOException { CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>() Added: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1055326&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Wed Jan 5 07:18:16 2011 @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.SimpleCondition; + +public class ReadCallback<T> implements IAsyncCallback +{ + protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); + + public final IResponseResolver<T> resolver; + protected final SimpleCondition condition = new SimpleCondition(); + private final long startTime; + protected final int blockfor; + + /** + * Constructor when response count has to be calculated and blocked for. + */ + public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table) + { + this.blockfor = determineBlockFor(consistencyLevel, table); + this.resolver = resolver; + this.startTime = System.currentTimeMillis(); + + logger.debug("ReadCallback blocking for {} responses", blockfor); + } + + public T get() throws TimeoutException, DigestMismatchException, IOException + { + long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + boolean success; + try + { + success = condition.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) + { + throw new AssertionError(ex); + } + + if (!success) + { + StringBuilder sb = new StringBuilder(""); + for (Message message : resolver.getMessages()) + sb.append(message.getFrom()).append(", "); + throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " ."); + } + + return blockfor == 1 ? resolver.getData() : resolver.resolve(); + } + + public void close() + { + for (Message response : resolver.getMessages()) + { + MessagingService.removeRegisteredCallback(response.getMessageId()); + } + } + + public void response(Message message) + { + resolver.preprocess(message); + if (resolver.getMessageCount() < blockfor) + return; + if (resolver.isDataPresent()) + condition.signal(); + } + + public int determineBlockFor(ConsistencyLevel consistencyLevel, String table) + { + switch (consistencyLevel) + { + case ONE: + case ANY: + return 1; + case QUORUM: + return (Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1; + case ALL: + return Table.open(table).getReplicationStrategy().getReplicationFactor(); + default: + throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel); + } + } + + public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException + { + if (endpoints.size() < blockfor) + throw new UnavailableException(); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Jan 5 07:18:16 2011 @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +48,9 @@ public class ReadResponseResolver implem { private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class); private final String table; - private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>(); + private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>(); private DecoratedKey key; + private ByteBuffer digest; public ReadResponseResolver(String table, ByteBuffer key) { @@ -56,14 +58,29 @@ public class ReadResponseResolver implem this.key = StorageService.getPartitioner().decorateKey(key); } + public Row getData() throws IOException + { + for (Map.Entry<Message, ReadResponse> entry : results.entrySet()) + { + ReadResponse result = entry.getValue(); + if (!result.isDigestQuery()) + return result.row(); + } + + throw new AssertionError("getData should not be invoked when no data is present"); + } + /* - * This method handles two different scenarios: + * This method handles three different scenarios: * - * 1) we're handling the initial read, of data from the closest replica + digests + * 1a)we're handling the initial read, of data from the closest replica + digests * from the rest. In this case we check the digests against each other, * throw an exception if there is a mismatch, otherwise return the data row. * - * 2) there was a mismatch on the initial read, so we redid the digest requests + * 1b)we're checking additional digests that arrived after the minimum to handle + * the requested ConsistencyLevel, i.e. asynchronouse read repair check + * + * 2) there was a mismatch on the initial read (1a or 1b), so we redid the digest requests * as full data reads. In this case we need to compute the most recent version * of each column, and send diffs to out-of-date replicas. */ @@ -75,10 +92,13 @@ public class ReadResponseResolver implem long startTime = System.currentTimeMillis(); List<ColumnFamily> versions = new ArrayList<ColumnFamily>(); List<InetAddress> endpoints = new ArrayList<InetAddress>(); - ByteBuffer digest = null; // validate digests against each other; throw immediately on mismatch. // also, collects data results into versions/endpoints lists. + // + // results are cleared as we process them, to avoid unnecessary duplication of work + // when resolve() is called a second time for read repair on responses that were not + // necessary to satisfy ConsistencyLevel. for (Map.Entry<Message, ReadResponse> entry : results.entrySet()) { ReadResponse result = entry.getValue(); @@ -106,6 +126,8 @@ public class ReadResponseResolver implem versions.add(cf); endpoints.add(from); } + + results.remove(message); } if (logger_.isDebugEnabled()) Added: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1055326&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java Wed Jan 5 07:18:16 2011 @@ -0,0 +1,55 @@ +package org.apache.cassandra.service; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.utils.SimpleCondition; + +public class RepairCallback<T> implements IAsyncCallback +{ + public final IResponseResolver<T> resolver; + private final List<InetAddress> endpoints; + protected final SimpleCondition condition = new SimpleCondition(); + private final long startTime; + + public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints) + { + this.resolver = resolver; + this.endpoints = endpoints; + this.startTime = System.currentTimeMillis(); + } + + /** + * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel + * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout. + */ + public T get() throws TimeoutException, DigestMismatchException, IOException + { + long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + try + { + condition.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) + { + throw new AssertionError(ex); + } + + return resolver.resolve(); + } + + + public void response(Message message) + { + resolver.preprocess(message); + if (resolver.getMessageCount() == endpoints.size()) + condition.signal(); + } + +} Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 5 07:18:16 2011 @@ -50,7 +50,6 @@ import org.apache.cassandra.gms.Gossiper import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.thrift.*; @@ -65,6 +64,8 @@ public class StorageProxy implements Sto { private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class); + private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this + private static final Random random = new Random(); // mbean stuff private static final LatencyTracker readStats = new LatencyTracker(); @@ -351,15 +352,7 @@ public class StorageProxy implements Sto List<Row> rows; try { - if (consistency_level == ConsistencyLevel.ONE) - { - rows = weakRead(commands); - } - else - { - assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue(); - rows = strongRead(commands, consistency_level); - } + rows = fetchRows(commands, consistency_level); } finally { @@ -368,91 +361,23 @@ public class StorageProxy implements Sto return rows; } - private static List<Row> weakRead(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException - { - List<Row> rows = new ArrayList<Row>(); - - // send off all the commands asynchronously - List<Future<Object>> localFutures = null; - HashMap<ReadCommand, IAsyncResult> remoteResults = null; - for (ReadCommand command: commands) - { - InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key); - if (endPoint.equals(FBUtilities.getLocalAddress())) - { - if (logger.isDebugEnabled()) - logger.debug("weakread reading " + command + " locally"); - - if (localFutures == null) - localFutures = new ArrayList<Future<Object>>(); - Callable<Object> callable = new weakReadLocalCallable(command); - localFutures.add(StageManager.getStage(Stage.READ).submit(callable)); - } - else - { - if (remoteResults == null) - remoteResults = new HashMap<ReadCommand, IAsyncResult>(); - Message message = command.makeReadMessage(); - if (logger.isDebugEnabled()) - logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint); - remoteResults.put(command, MessagingService.instance().sendRR(message, endPoint)); - } - } - - // wait for results - if (localFutures != null) - { - for (Future<Object> future : localFutures) - { - Row row; - try - { - row = (Row) future.get(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - rows.add(row); - } - } - if (remoteResults != null) - { - for (Map.Entry<ReadCommand, IAsyncResult> entry : remoteResults.entrySet()) - { - ReadCommand command = entry.getKey(); - IAsyncResult iar = entry.getValue(); - byte[] body; - body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); - assert response.row() != null; - rows.add(response.row()); - if (randomlyReadRepair(command)) - StorageService.instance.doConsistencyCheck(response.row(), command, iar.getFrom()); - } - } - - return rows; - } - - /* - * This function executes the read protocol. - // 1. Get the N nodes from storage service where the data needs to be - // replicated - // 2. Construct a message for read\write - * 3. Set one of the messages to get the data and the rest to get the digest - // 4. SendRR ( to all the nodes above ) - // 5. Wait for a response from at least X nodes where X <= N and the data node - * 6. If the digest matches return the data. - * 7. else carry out read repair by getting data from all the nodes. - // 5. return success + /** + * This function executes local and remote reads, and blocks for the results: + * + * 1. Get the replica locations, sorted by response time according to the snitch + * 2. Send a data request to the closest replica, and digest requests to either + * a) all the replicas, if read repair is enabled + * b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel + * 3. Wait for a response from R replicas + * 4. If the digests (if any) match the data return the data + * 5. else carry out read repair by getting data from all the nodes. */ - private static List<Row> strongRead(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException + private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException { - List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>(); + List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>(); List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>(); List<Row> rows = new ArrayList<Row>(); + Set<ReadCommand> repairs = new HashSet<ReadCommand>(); // send out read requests for (ReadCommand command: commands) @@ -468,53 +393,65 @@ public class StorageProxy implements Sto AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy(); ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key); - QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver, consistency_level); + ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level); handler.assureSufficientLiveNodes(endpoints); - Message messages[] = new Message[endpoints.size()]; + int targets; + if (randomlyReadRepair(command)) + { + targets = endpoints.size(); + if (targets > handler.blockfor) + repairs.add(command); + } + else + { + targets = handler.blockfor; + } + Message[] messages = new Message[targets]; + // data-request message is sent to dataPoint, the node that will actually get // the data for us. The other replicas are only sent a digest query. - int n = 0; - for (InetAddress endpoint : endpoints) + for (int i = 0; i < messages.length; i++) { + InetAddress endpoint = endpoints.get(i); Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly; - messages[n++] = m; + messages[i] = m; if (logger.isDebugEnabled()) - logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint); + logger.debug("reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint); } MessagingService.instance().sendRR(messages, endpoints, handler); - quorumResponseHandlers.add(handler); + readCallbacks.add(handler); commandEndpoints.add(endpoints); } // read results and make a second pass for any digest mismatches - List<QuorumResponseHandler<Row>> repairResponseHandlers = null; + List<RepairCallback<Row>> repairResponseHandlers = null; for (int i = 0; i < commands.size(); i++) { - QuorumResponseHandler<Row> quorumResponseHandler = quorumResponseHandlers.get(i); + ReadCallback<Row> readCallback = readCallbacks.get(i); Row row; ReadCommand command = commands.get(i); + List<InetAddress> endpoints = commandEndpoints.get(i); try { long startTime2 = System.currentTimeMillis(); - row = quorumResponseHandler.get(); + row = readCallback.get(); if (row != null) rows.add(row); if (logger.isDebugEnabled()) - logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms."); + logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms."); + + if (repairs.contains(command)) + repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); } catch (DigestMismatchException ex) { - AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy(); - ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key); - QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver, consistency_level); if (logger.isDebugEnabled()) logger.debug("Digest mismatch:", ex); - Message messageRepair = command.makeReadMessage(); - MessagingService.instance().sendRR(messageRepair, commandEndpoints.get(i), handler); + RepairCallback<Row> handler = repair(command, endpoints); if (repairResponseHandlers == null) - repairResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>(); + repairResponseHandlers = new ArrayList<RepairCallback<Row>>(); repairResponseHandlers.add(handler); } } @@ -522,7 +459,7 @@ public class StorageProxy implements Sto // read the results for the digest mismatch retries if (repairResponseHandlers != null) { - for (QuorumResponseHandler<Row> handler : repairResponseHandlers) + for (RepairCallback<Row> handler : repairResponseHandlers) { try { @@ -540,6 +477,26 @@ public class StorageProxy implements Sto return rows; } + static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel) + { + if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM)) + { + return new DatacenterReadCallback(resolver, consistencyLevel, table); + } + return new ReadCallback(resolver, consistencyLevel, table); + } + + // TODO repair resolver shouldn't take consistencylevel (it should repair exactly as many as it receives replies for) + private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints) + throws IOException + { + ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key); + RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints); + Message messageRepair = command.makeReadMessage(); + MessagingService.instance().sendRR(messageRepair, endpoints, handler); + return handler; + } + /* * This function executes the read protocol locally. Consistency checks are performed in the background. */ @@ -590,7 +547,7 @@ public class StorageProxy implements Sto // collect replies and resolve according to consistency level RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints); AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy(); - QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); + ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level); // TODO bail early if live endpoints can't satisfy requested consistency level for (InetAddress endpoint : liveEndpoints) { @@ -837,7 +794,7 @@ public class StorageProxy implements Sto // collect replies and resolve according to consistency level RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints); AbstractReplicationStrategy rs = Table.open(keyspace).getReplicationStrategy(); - QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); + ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level); // bail early if live endpoints can't satisfy requested consistency level if(handler.blockfor > liveEndpoints.size()) @@ -889,31 +846,6 @@ public class StorageProxy implements Sto return hintedHandoffEnabled; } - static class weakReadLocalCallable implements Callable<Object> - { - private ReadCommand command; - - weakReadLocalCallable(ReadCommand command) - { - this.command = command; - } - - public Object call() throws IOException - { - if (logger.isDebugEnabled()) - logger.debug("weakreadlocal reading " + command); - - Table table = Table.open(command.table); - Row row = command.getRow(table); - - // Do the consistency checks in the background - if (randomlyReadRepair(command)) - StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress()); - - return row; - } - } - /** * Performs the truncate operatoin, which effectively deletes all data from * the column family cfname @@ -959,4 +891,32 @@ public class StorageProxy implements Sto { return !Gossiper.instance.getUnreachableMembers().isEmpty(); } + + private static class RepairRunner extends WrappedRunnable + { + private final IResponseResolver<Row> resolver; + private final ReadCommand command; + private final List<InetAddress> endpoints; + + public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints) + { + this.resolver = resolver; + this.command = command; + this.endpoints = endpoints; + } + + protected void runMayThrow() throws IOException + { + try + { + resolver.resolve(); + } + catch (DigestMismatchException e) + { + if (logger.isDebugEnabled()) + logger.debug("Digest mismatch:", e); + repair(command, endpoints); + } + } + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jan 5 07:18:16 2011 @@ -454,18 +454,6 @@ public class StorageService implements I } /** - * This method performs the requisite operations to make - * sure that the N replicas are in sync. We do this in the - * background when we do not care much about consistency. - */ - public void doConsistencyCheck(Row row, ReadCommand command, InetAddress dataSource) - { - List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key); - if (endpoints.size() > 1) - consistencyManager_.submit(new ConsistencyChecker(command, row, endpoints, dataSource)); - } - - /** * for a keyspace, return the ranges and corresponding hosts for a given keyspace. * @param keyspace * @return Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1055326&r1=1055325&r2=1055326&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Wed Jan 5 07:18:16 2011 @@ -96,7 +96,7 @@ public class ConsistencyLevelTest extend IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c); - QuorumResponseHandler<Row> readHandler = strategy.getQuorumResponseHandler(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), c); + ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c); boolean isWriteUnavailable = false; boolean isReadUnavailable = false;