Author: jbellis Date: Thu Aug 11 19:29:38 2011 New Revision: 1156758 URL: http://svn.apache.org/viewvc?rev=1156758&view=rev Log: provide monotonic read consistency patch by jbellis; reviewed by slebresne for CASSANDRA-2494
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1156758&r1=1156757&r2=1156758&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Aug 11 19:29:38 2011 @@ -30,6 +30,9 @@ * make column family backed column map pluggable and introduce unsynchronized ArrayList backed one to speedup reads (CASSANDRA-2843) * refactoring of the secondary index api (CASSANDRA-2982) + * make CL > ONE reads wait for digest reconciliation before returning + (CASSANDRA-2494) + 0.8.4 * include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972) Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1156758&r1=1156757&r2=1156758&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Aug 11 19:29:38 2011 @@ -22,15 +22,18 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import com.google.common.collect.AbstractIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RangeSliceReply; import org.apache.cassandra.db.Row; +import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.CloseableIterator; @@ -43,9 +46,19 @@ import org.apache.cassandra.utils.MergeI public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Row>> { private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class); + + private static final Comparator<Pair<Row,InetAddress>> pairComparator = new Comparator<Pair<Row, InetAddress>>() + { + public int compare(Pair<Row, InetAddress> o1, Pair<Row, InetAddress> o2) + { + return o1.left.key.compareTo(o2.left.key); + } + }; + private final String table; private final List<InetAddress> sources; protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();; + public final List<IAsyncResult> repairResults = new ArrayList<IAsyncResult>(); public RangeSliceResponseResolver(String table, List<InetAddress> sources) { @@ -73,50 +86,7 @@ public class RangeSliceResponseResolver iters.add(new RowIterator(reply.rows.iterator(), response.getFrom())); } // for each row, compute the combination of all different versions seen, and repair incomplete versions - MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, new Comparator<Pair<Row,InetAddress>>() - { - public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress> o2) - { - return o1.left.key.compareTo(o2.left.key); - } - }, new MergeIterator.Reducer<Pair<Row,InetAddress>, Row>() - { - List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size()); - List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size()); - DecoratedKey key; - - public void reduce(Pair<Row,InetAddress> current) - { - key = current.left.key; - versions.add(current.left.cf); - versionSources.add(current.right); - } - - protected Row getReduced() - { - ColumnFamily resolved = versions.size() > 1 - ? RowRepairResolver.resolveSuperset(versions) - : versions.get(0); - if (versions.size() < sources.size()) - { - // add placeholder rows for sources that didn't have any data, so maybeScheduleRepairs sees them - for (InetAddress source : sources) - { - if (!versionSources.contains(source)) - { - versions.add(null); - versionSources.add(source); - } - } - } - // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet - if (resolved != null) - RowRepairResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources); - versions.clear(); - versionSources.clear(); - return new Row(key, resolved); - } - }); + MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, pairComparator, new Reducer()); List<Row> resolvedRows = new ArrayList<Row>(n); while (iter.hasNext()) @@ -163,4 +133,43 @@ public class RangeSliceResponseResolver { throw new UnsupportedOperationException(); } + + private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, Row> + { + List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size()); + List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size()); + DecoratedKey key; + + public void reduce(Pair<Row,InetAddress> current) + { + key = current.left.key; + versions.add(current.left.cf); + versionSources.add(current.right); + } + + protected Row getReduced() + { + ColumnFamily resolved = versions.size() > 1 + ? RowRepairResolver.resolveSuperset(versions) + : versions.get(0); + if (versions.size() < sources.size()) + { + // add placeholder rows for sources that didn't have any data, so maybeScheduleRepairs sees them + for (InetAddress source : sources) + { + if (!versionSources.contains(source)) + { + versions.add(null); + versionSources.add(source); + } + } + } + // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet + if (resolved != null) + repairResults.addAll(RowRepairResolver.scheduleRepairs(resolved, table, key, versions, versionSources)); + versions.clear(); + versionSources.clear(); + return new Row(key, resolved); + } + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1156758&r1=1156757&r2=1156758&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java Thu Aug 11 19:29:38 2011 @@ -29,13 +29,14 @@ import java.util.concurrent.TimeoutExcep import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Row; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.Message; import org.apache.cassandra.utils.SimpleCondition; -public class RepairCallback<T> implements IAsyncCallback +public class RepairCallback implements IAsyncCallback { - private final IResponseResolver<T> resolver; + public final RowRepairResolver resolver; private final List<InetAddress> endpoints; private final SimpleCondition condition = new SimpleCondition(); private final long startTime; @@ -49,14 +50,14 @@ public class RepairCallback<T> implement * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final * stage in the read process.) */ - public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints) + public RepairCallback(RowRepairResolver resolver, List<InetAddress> endpoints) { this.resolver = resolver; this.endpoints = endpoints; this.startTime = System.currentTimeMillis(); } - public T get() throws TimeoutException, DigestMismatchException, IOException + public Row get() throws TimeoutException, DigestMismatchException, IOException { long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); try Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1156758&r1=1156757&r2=1156758&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Aug 11 19:29:38 2011 @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -31,13 +32,16 @@ import org.apache.cassandra.db.columnite import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; public class RowRepairResolver extends AbstractRowResolver { protected int maxLiveColumns = 0; + public List<IAsyncResult> repairResults = Collections.emptyList(); public RowRepairResolver(String table, ByteBuffer key) { @@ -89,7 +93,7 @@ public class RowRepairResolver extends A logger.debug("versions merged"); // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet if (resolved != null) - maybeScheduleRepairs(resolved, table, key, versions, endpoints); + repairResults = scheduleRepairs(resolved, table, key, versions, endpoints); } else { @@ -106,8 +110,10 @@ public class RowRepairResolver extends A * For each row version, compare with resolved (the superset of all row versions); * if it is missing anything, send a mutation to the endpoint it come from. */ - public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints) + public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints) { + List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size()); + for (int i = 0; i < versions.size(); i++) { ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved); @@ -126,8 +132,10 @@ public class RowRepairResolver extends A { throw new IOError(e); } - MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i)); + results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i))); } + + return results; } static ColumnFamily resolveSuperset(List<ColumnFamily> versions) Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1156758&r1=1156757&r2=1156758&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Aug 11 19:29:38 2011 @@ -30,8 +30,9 @@ import javax.management.ObjectName; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import org.apache.cassandra.net.CachingMessageProducer; -import org.apache.cassandra.net.MessageProducer; + +import org.apache.cassandra.net.*; + import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -49,9 +50,6 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.*; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.IndexClause; @@ -571,7 +569,7 @@ public class StorageProxy implements Sto repairCommands.clear(); // read results and make a second pass for any digest mismatches - List<RepairCallback<Row>> repairResponseHandlers = null; + List<RepairCallback> repairResponseHandlers = null; for (int i = 0; i < commandsToSend.size(); i++) { ReadCallback<Row> handler = readCallbacks.get(i); @@ -598,7 +596,7 @@ public class StorageProxy implements Sto if (logger.isDebugEnabled()) logger.debug("Digest mismatch: {}", ex.toString()); RowRepairResolver resolver = new RowRepairResolver(command.table, command.key); - RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints); + RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints); if (repairCommands == Collections.EMPTY_LIST) repairCommands = new ArrayList<ReadCommand>(); @@ -608,7 +606,7 @@ public class StorageProxy implements Sto MessagingService.instance().sendRR(command, endpoint, repairHandler); if (repairResponseHandlers == null) - repairResponseHandlers = new ArrayList<RepairCallback<Row>>(); + repairResponseHandlers = new ArrayList<RepairCallback>(); repairResponseHandlers.add(repairHandler); } } @@ -622,48 +620,49 @@ public class StorageProxy implements Sto for (int i = 0; i < repairCommands.size(); i++) { ReadCommand command = repairCommands.get(i); - RepairCallback<Row> handler = repairResponseHandlers.get(i); + RepairCallback handler = repairResponseHandlers.get(i); + FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); + Row row; try { - Row row = handler.get(); - - if (command instanceof SliceFromReadCommand) - { - // short reads are only possible on SliceFromReadCommand - SliceFromReadCommand sliceCommand = (SliceFromReadCommand)command; - int maxLiveColumns = handler.getMaxLiveColumns(); - int liveColumnsInRow = row != null ? row.cf.getLiveColumnCount() : 0; - - assert maxLiveColumns <= sliceCommand.count; - if ((maxLiveColumns == sliceCommand.count) && (liveColumnsInRow < sliceCommand.count)) - { - if (logger.isDebugEnabled()) - logger.debug("detected short read: expected {} columns, but only resolved {} columns", - sliceCommand.count, liveColumnsInRow); - - int retryCount = sliceCommand.count + sliceCommand.count - liveColumnsInRow; - SliceFromReadCommand retryCommand = new SliceFromReadCommand(command.table, - command.key, - command.queryPath, - sliceCommand.start, - sliceCommand.finish, - sliceCommand.reversed, - retryCount); - if (commandsToRetry == Collections.EMPTY_LIST) - commandsToRetry = new ArrayList<ReadCommand>(); - commandsToRetry.add(retryCommand); - } - else if (row != null) - rows.add(row); - } - else if (row != null) - rows.add(row); + row = handler.get(); } catch (DigestMismatchException e) { throw new AssertionError(e); // full data requested from each node here, no digests should be sent } + + // retry short reads, otherwise add the row to our resultset + if (command instanceof SliceFromReadCommand) + { + // short reads are only possible on SliceFromReadCommand + SliceFromReadCommand sliceCommand = (SliceFromReadCommand) command; + int maxLiveColumns = handler.getMaxLiveColumns(); + int liveColumnsInRow = row != null ? row.cf.getLiveColumnCount() : 0; + + assert maxLiveColumns <= sliceCommand.count; + if ((maxLiveColumns == sliceCommand.count) && (liveColumnsInRow < sliceCommand.count)) + { + if (logger.isDebugEnabled()) + logger.debug("detected short read: expected {} columns, but only resolved {} columns", + sliceCommand.count, liveColumnsInRow); + + int retryCount = sliceCommand.count + sliceCommand.count - liveColumnsInRow; + SliceFromReadCommand retryCommand = new SliceFromReadCommand(command.table, + command.key, + command.queryPath, + sliceCommand.start, + sliceCommand.finish, + sliceCommand.reversed, + retryCount); + if (commandsToRetry == Collections.EMPTY_LIST) + commandsToRetry = new ArrayList<ReadCommand>(); + commandsToRetry.add(retryCommand); + continue; + } + } + rows.add(row); } } } while (!commandsToRetry.isEmpty()); @@ -769,6 +768,7 @@ public class StorageProxy implements Sto rows.add(row); logger.debug("range slices read {}", row.key); } + FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); } catch (TimeoutException ex) { @@ -1035,6 +1035,7 @@ public class StorageProxy implements Sto rows.add(row); logger.debug("read {}", row); } + FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); } catch (TimeoutException ex) { @@ -1044,7 +1045,7 @@ public class StorageProxy implements Sto } catch (DigestMismatchException e) { - throw new RuntimeException(e); + throw new AssertionError(e); } if (rows.size() >= index_clause.count) return rows.subList(0, index_clause.count); Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1156758&r1=1156757&r2=1156758&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Aug 11 19:29:38 2011 @@ -32,6 +32,8 @@ import java.security.NoSuchAlgorithmExce import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -50,6 +52,7 @@ import org.apache.cassandra.dht.IPartiti import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.PropertyFileSnitch; +import org.apache.cassandra.net.IAsyncResult; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -575,6 +578,12 @@ public class FBUtilities } } + public static void waitOnFutures(List<IAsyncResult> results, long ms) throws TimeoutException + { + for (IAsyncResult result : results) + result.get(ms, TimeUnit.MILLISECONDS); + } + public static IPartitioner newPartitioner(String partitionerClassName) throws ConfigurationException { if (!partitionerClassName.contains("."))