Updated Branches: refs/heads/cassandra-2.0 cc01b3165 -> 01370bb6c
Fix AbstractRowResolver and RowDigestResolver for speculative retries patch by Jonathan Ellis; reviewed by Aleksey Yeschenko for CASSANDRA-6194 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01370bb6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01370bb6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01370bb6 Branch: refs/heads/cassandra-2.0 Commit: 01370bb6c7d7fa816c7162a379bee4dc710a5556 Parents: cc01b31 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Wed Oct 23 23:40:57 2013 +0800 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Oct 23 23:43:36 2013 +0800 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../cassandra/service/AbstractReadExecutor.java | 9 +++--- .../cassandra/service/AbstractRowResolver.java | 20 +----------- .../cassandra/service/IResponseResolver.java | 2 +- .../service/RangeSliceResponseResolver.java | 3 +- .../apache/cassandra/service/ReadCallback.java | 11 ++----- .../cassandra/service/RowDigestResolver.java | 32 ++++++-------------- .../apache/cassandra/service/StorageProxy.java | 7 ++++- 8 files changed, 27 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b55e0cf..32c74aa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,7 +6,7 @@ * Add configurable metrics reporting (CASSANDRA-4430) * drop queries exceeding a configurable number of tombstones (CASSANDRA-6117) * Track and persist sstable read activity (CASSANDRA-5515) - * Fixes for speculative retry (CASSANDRA-5932) + * Fixes for speculative retry (CASSANDRA-5932, CASSANDRA-6194) * Improve memory usage of metadata min/max column names (CASSANDRA-6077) * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081) * Fix insertion of collections with CAS (CASSANDRA-6069) http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index c56975c..3f57e73 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service; import java.net.InetAddress; +import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; @@ -122,7 +123,7 @@ public abstract class AbstractReadExecutor * * @return target replicas + the extra replica, *IF* we speculated. */ - public abstract Iterable<InetAddress> getContactedReplicas(); + public abstract Collection<InetAddress> getContactedReplicas(); /** * send the initial set of requests @@ -216,7 +217,7 @@ public abstract class AbstractReadExecutor // no-op } - public Iterable<InetAddress> getContactedReplicas() + public Collection<InetAddress> getContactedReplicas() { return targetReplicas; } @@ -286,7 +287,7 @@ public abstract class AbstractReadExecutor } } - public Iterable<InetAddress> getContactedReplicas() + public Collection<InetAddress> getContactedReplicas() { return speculated ? targetReplicas @@ -312,7 +313,7 @@ public abstract class AbstractReadExecutor // no-op } - public Iterable<InetAddress> getContactedReplicas() + public Collection<InetAddress> getContactedReplicas() { return targetReplicas; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/AbstractRowResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java index 2ebaaf1..47a00da 100644 --- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java +++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java @@ -43,27 +43,9 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo this.keyspaceName = keyspaceName; } - public boolean preprocess(MessageIn<ReadResponse> message) + public void preprocess(MessageIn<ReadResponse> message) { - MessageIn<ReadResponse> toReplace = null; - for (MessageIn<ReadResponse> reply : replies) - { - if (reply.from.equals(message.from)) - { - if (!message.payload.isDigestQuery()) - toReplace = reply; - break; - } - } - // replace old message - if (toReplace != null) - { - replies.remove(toReplace); - replies.add(message); - return false; - } replies.add(message); - return true; } public Iterable<MessageIn<ReadResponse>> getMessages() http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/IResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java index 0c54690..17c8bff 100644 --- a/src/java/org/apache/cassandra/service/IResponseResolver.java +++ b/src/java/org/apache/cassandra/service/IResponseResolver.java @@ -38,6 +38,6 @@ public interface IResponseResolver<TMessage, TResolved> { */ public TResolved getData(); - public boolean preprocess(MessageIn<TMessage> message); + public void preprocess(MessageIn<TMessage> message); public Iterable<MessageIn<TMessage>> getMessages(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java index 72ea69c..640681b 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java +++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java @@ -93,10 +93,9 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR return resolvedRows; } - public boolean preprocess(MessageIn message) + public void preprocess(MessageIn message) { responses.add(message); - return true; } public boolean isDataPresent() http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index b7d5380..d4cc7f5 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -67,7 +67,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ","))); } - private ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress> endpoints) + public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress> endpoints) { this.command = command; this.keyspace = keyspace; @@ -78,11 +78,6 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag this.endpoints = endpoints; } - public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver) - { - return new ReadCallback<TMessage, TResolved>(newResolver, consistencyLevel, blockfor, command, keyspace, endpoints); - } - public boolean await(long timePastStart, TimeUnit unit) { long time = unit.toNanos(timePastStart) - (System.nanoTime() - start); @@ -111,8 +106,8 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag public void response(MessageIn<TMessage> message) { - boolean hasAdded = resolver.preprocess(message); - int n = (waitingFor(message) && hasAdded) + resolver.preprocess(message); + int n = waitingFor(message) ? received.incrementAndGet() : received.get(); if (n >= blockfor && resolver.isDataPresent()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/RowDigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java index bc4cf49..ec9f0d3 100644 --- a/src/java/org/apache/cassandra/service/RowDigestResolver.java +++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java @@ -71,37 +71,23 @@ public class RowDigestResolver extends AbstractRowResolver for (MessageIn<ReadResponse> message : replies) { ReadResponse response = message.payload; + + ByteBuffer newDigest; if (response.isDigestQuery()) { - if (digest == null) - { - digest = response.digest(); - } - else - { - ByteBuffer digest2 = response.digest(); - if (!digest.equals(digest2)) - throw new DigestMismatchException(key, digest, digest2); - } + newDigest = response.digest(); } else { + // note that this allows for multiple data replies, post-CASSANDRA-5932 data = response.row().cf; + newDigest = ColumnFamily.digest(data); } - } - // Compare digest (only one, since we threw earlier if there were different replies) - // with the data response. If there is a mismatch then throw an exception so that read repair can happen. - // - // It's important to note that we do not consider the possibility of multiple data responses -- - // that can only happen when we're doing the repair post-mismatch, and will be handled by RowDataResolver. - if (digest != null) - { - ByteBuffer digest2 = ColumnFamily.digest(data); - if (!digest.equals(digest2)) - throw new DigestMismatchException(key, digest, digest2); - if (logger.isDebugEnabled()) - logger.debug("digests verified"); + if (digest == null) + digest = newDigest; + else if (!digest.equals(newDigest)) + throw new DigestMismatchException(key, digest, newDigest); } if (logger.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e177eed..6dd702b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1292,7 +1292,12 @@ public class StorageProxy implements StorageProxyMBean // Do a full data read to resolve the correct response (and repair node that need be) RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp); - ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver); + ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver, + ConsistencyLevel.ALL, + exec.getContactedReplicas().size(), + exec.command, + Keyspace.open(exec.command.getKeyspace()), + exec.handler.endpoints); if (repairCommands == null) {