Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a56868b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a56868b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a56868b Branch: refs/heads/trunk Commit: 8a56868bcaa7d58c907410a1821e83ada72ee0a9 Parents: 2c58581 353d4a0 Author: Robert Stupp <sn...@snazy.de> Authored: Sun Jun 28 10:27:20 2015 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Sun Jun 28 10:33:59 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/net/MessagingService.java | 2 +- .../cassandra/net/OutboundTcpConnection.java | 2 +- .../cassandra/service/AbstractReadExecutor.java | 12 ++++++++++++ .../apache/cassandra/service/ReadCallback.java | 20 ++++++++++++++++++-- .../cassandra/service/RowDataResolver.java | 2 ++ 6 files changed, 35 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 0b0cf83,6a137a3..3e4fd36 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,5 +1,12 @@@ -2.0.17 +2.1.8 + * Fix IndexOutOfBoundsException when inserting tuple with too many + elements using the string literal notation (CASSANDRA-9559) + * Allow JMX over SSL directly from nodetool (CASSANDRA-9090) + * Fix incorrect result for IN queries where column not found (CASSANDRA-9540) + * Enable describe on indices (CASSANDRA-7814) + * ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637) +Merged from 2.0 + * Improve trace messages for RR (CASSANDRA-9479) * Fix suboptimal secondary index selection when restricted clustering column is also indexed (CASSANDRA-9631) * (cqlsh) Add min_threshold to DTCS option autocomplete (CASSANDRA-9385) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 0546e27,2f2370d..2d02e34 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@@ -77,7 -81,23 +81,8 @@@ public abstract class AbstractReadExecu protected void makeDataRequests(Iterable<InetAddress> endpoints) { - for (InetAddress endpoint : endpoints) - { - if (isLocalRequest(endpoint)) - { - if (traceState != null) - traceState.trace("reading data locally"); - logger.trace("reading data locally"); - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler)); - } - else - { - if (traceState != null) - traceState.trace("reading data from {}", endpoint); - logger.trace("reading data from {}", endpoint); - MessagingService.instance().sendRR(command.createMessage(), endpoint, handler); - } - } + makeRequests(command, endpoints); ++ } protected void makeDigestRequests(Iterable<InetAddress> endpoints) @@@ -94,21 -109,18 +99,23 @@@ { if (isLocalRequest(endpoint)) { - if (traceState != null) - traceState.trace("reading digest locally"); - logger.trace("reading digest locally"); - StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler)); - } - else - { - if (traceState != null) - traceState.trace("reading digest from {}", endpoint); - logger.trace("reading digest from {}", endpoint); - MessagingService.instance().sendRR(message, endpoint, handler); + hasLocalEndpoint = true; + continue; } + ++ if (traceState != null) ++ traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); + logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); + if (message == null) + message = readCommand.createMessage(); + MessagingService.instance().sendRR(message, endpoint, handler); + } + + // We delay the local (potentially blocking) read till the end to avoid stalling remote requests. + if (hasLocalEndpoint) + { + logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data"); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler)); } } @@@ -273,9 -288,14 +283,11 @@@ // Could be waiting on the data, or on enough digests. ReadCommand retryCommand = command; if (resolver.getData() != null) - { - retryCommand = command.copy(); - retryCommand.setDigestQuery(true); - } + retryCommand = command.copy().setIsDigestQuery(true); InetAddress extraReplica = Iterables.getLast(targetReplicas); + if (traceState != null) + traceState.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler); speculated = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ReadCallback.java index 29eaadf,1315102..cf9be55 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@@ -40,8 -40,10 +40,10 @@@ import org.apache.cassandra.net.Message import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.db.ConsistencyLevel; + import org.apache.cassandra.tracing.TraceState; + import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.SimpleCondition; +import org.apache.cassandra.utils.concurrent.SimpleCondition; public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage> { @@@ -100,8 -100,8 +102,8 @@@ if (!await(command.getTimeout(), TimeUnit.MILLISECONDS)) { // Same as for writes, see AbstractWriteResponseHandler - ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent()); + ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); - + Tracing.trace("Read timeout: {}", ex.toString()); if (logger.isDebugEnabled()) logger.debug("Read timeout: {}", ex.toString()); throw ex; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/service/RowDataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/RowDataResolver.java index e92dad7,bf4afbe..394a4c4 --- a/src/java/org/apache/cassandra/service/RowDataResolver.java +++ b/src/java/org/apache/cassandra/service/RowDataResolver.java @@@ -116,12 -116,14 +117,13 @@@ public class RowDataResolver extends Ab if (diffCf == null) // no repair needs to happen continue; - // create and send the row mutation message based on the diff - RowMutation rowMutation = new RowMutation(keyspaceName, key.key, diffCf); - MessageOut repairMessage; + // create and send the mutation message based on the diff + Mutation mutation = new Mutation(keyspaceName, key.getKey(), diffCf); // use a separate verb here because we don't want these to be get the white glove hint- // on-timeout behavior that a "real" mutation gets + Tracing.trace("Sending read-repair-mutation to {}", endpoints.get(i)); - repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR); - results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i))); + results.add(MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), + endpoints.get(i))); } return results;