Merge branch 'cassandra-1.2' into cassandra-2.0 Conflicts: src/java/org/apache/cassandra/net/MessagingService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a73e392 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a73e392 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a73e392 Branch: refs/heads/cassandra-2.1 Commit: 3a73e392fa424bff5378d4bb72117cfa28f9b0b7 Parents: 1c2a812 2890cc5 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Apr 22 19:47:42 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Apr 22 19:47:42 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/BatchlogManager.java | 2 +- .../cassandra/db/HintedHandOffManager.java | 4 +-- .../apache/cassandra/net/MessagingService.java | 27 ++++++++++++++------ .../apache/cassandra/net/WriteCallbackInfo.java | 16 +++++++++--- .../apache/cassandra/service/StorageProxy.java | 7 ++++- 6 files changed, 41 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 9b73c89,74ddcfd..dbed949 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -8,61 -9,10 +8,62 @@@ Merged from 1.2 * Fix batchlog to account for CF truncation records (CASSANDRA-6999) * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018) * Require nodetool rebuild_index to specify index names (CASSANDRA-7038) + * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058) -1.2.16 +2.0.7 + * Put nodes in hibernate when join_ring is false (CASSANDRA-6961) + * Avoid early loading of non-system keyspaces before compaction-leftovers + cleanup at startup (CASSANDRA-6913) + * Restrict Windows to parallel repairs (CASSANDRA-6907) + * (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436) + * Fix NPE in MeteredFlusher (CASSANDRA-6820) + * Fix race processing range scan responses (CASSANDRA-6820) + * Allow deleting snapshots from dropped keyspaces (CASSANDRA-6821) + * Add uuid() function (CASSANDRA-6473) + * Omit tombstones from schema digests (CASSANDRA-6862) + * Include correct consistencyLevel in LWT timeout (CASSANDRA-6884) + * Lower chances for losing new SSTables during nodetool refresh and + ColumnFamilyStore.loadNewSSTables (CASSANDRA-6514) + * Add support for DELETE ... IF EXISTS to CQL3 (CASSANDRA-5708) + * Update hadoop_cql3_word_count example (CASSANDRA-6793) + * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788) + * Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865) + * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864) + * Fix schema concurrency exceptions (CASSANDRA-6841) + * Fix leaking validator FH in StreamWriter (CASSANDRA-6832) + * Fix saving triggers to schema (CASSANDRA-6789) + * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790) + * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838) + * Fix static counter columns (CASSANDRA-6827) + * Restore expiring->deleted (cell) compaction optimization (CASSANDRA-6844) + * Fix CompactionManager.needsCleanup (CASSANDRA-6845) + * Correctly compare BooleanType values other than 0 and 1 (CASSANDRA-6779) + * Read message id as string from earlier versions (CASSANDRA-6840) + * Properly use the Paxos consistency for (non-protocol) batch (CASSANDRA-6837) + * Add paranoid disk failure option (CASSANDRA-6646) + * Improve PerRowSecondaryIndex performance (CASSANDRA-6876) + * Extend triggers to support CAS updates (CASSANDRA-6882) + * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873) + * Fix paging with SELECT DISTINCT (CASSANDRA-6857) + * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923) + * Improve MeteredFlusher handling of MF-unaffected column families + (CASSANDRA-6867) + * Add CqlRecordReader using native pagination (CASSANDRA-6311) + * Add QueryHandler interface (CASSANDRA-6659) + * Track liveRatio per-memtable, not per-CF (CASSANDRA-6945) + * Make sure upgradesstables keeps sstable level (CASSANDRA-6958) + * Fix LIMIT with static columns (CASSANDRA-6956) + * Fix clash with CQL column name in thrift validation (CASSANDRA-6892) + * Fix error with super columns in mixed 1.2-2.0 clusters (CASSANDRA-6966) + * Fix bad skip of sstables on slice query with composite start/finish (CASSANDRA-6825) + * Fix unintended update with conditional statement (CASSANDRA-6893) + * Fix map element access in IF (CASSANDRA-6914) + * Avoid costly range calculations for range queries on system keyspaces + (CASSANDRA-6906) + * Fix SSTable not released if stream session fails (CASSANDRA-6818) + * Avoid build failure due to ANTLR timeout (CASSANDRA-6991) +Merged from 1.2: * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) * add extra SSL cipher suites (CASSANDRA-6613) * fix nodetool getsstables for blob PK (CASSANDRA-6803) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java index 5770994,02af9d3..5aea736 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@@ -323,7 -328,7 +323,7 @@@ public class BatchlogManager implement } }; WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback); - MessagingService.instance().sendRR(mutation.createMessage(), ep, handler); - MessagingService.instance().sendUnhintableMutation(mutation, ep, handler); ++ MessagingService.instance().sendRR(mutation.createMessage(), ep, handler, false); handlers.add(handler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java index 942707e,a7a3e06..13d1bb0 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@@ -450,8 -399,8 +450,8 @@@ public class HintedHandOffManager imple deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); } }; - WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback); - MessagingService.instance().sendRR(message, endpoint, responseHandler); + WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback); - MessagingService.instance().sendUnhintableMutation(rm, endpoint, responseHandler); ++ MessagingService.instance().sendRR(message, endpoint, responseHandler, false); responseHandlers.add(responseHandler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index cc5dae5,3f90d7f..cccf698 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -537,21 -527,18 +537,33 @@@ public final class MessagingService imp return verbHandlers.get(type); } - public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout) + public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout) { - String messageId = nextId(); - CallbackInfo previous; - - // If HH is enabled and this is a mutation message => store the message to track for potential hints. - if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION) - previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout); - else - previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel + int messageId = nextId(); + CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous); + return messageId; + } - public int addCallback(IAsyncCallback cb, MessageOut<? extends IMutation> message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel) - assert previous == null; ++ public int addCallback(IAsyncCallback cb, ++ MessageOut<? extends IMutation> message, ++ InetAddress to, ++ long timeout, ++ ConsistencyLevel consistencyLevel, ++ boolean allowHints) + { + assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION; + int messageId = nextId(); - CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout); ++ CallbackInfo previous = callbacks.put(messageId, ++ new WriteCallbackInfo(to, ++ cb, ++ message, ++ callbackDeserializers.get(message.verb), ++ consistencyLevel, ++ allowHints), ++ timeout); + assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous); return messageId; } @@@ -568,24 -559,14 +580,21 @@@ } /** - * A special version of sendRR that doesn't trigger a hint for the mutation on a timeout. - * Used by BatchlogManager and HintedHandOffManager. + * Send a non-mutation message to a given endpoint. This method specifies a callback + * which is invoked with the actual response. - * Also holds the message (only mutation messages) to determine if it - * needs to trigger a hint (uses StorageProxy for that). + * + * @param message message to be sent. + * @param to endpoint to which the message needs to be sent + * @param cb callback interface which is used to pass the responses or + * suggest that a timeout occurred to the invoker of the send(). - * suggest that a timeout occurred to the invoker of the send(). + * @param timeout the timeout used for expiration + * @return an reference to message id used to match with the result */ - public void sendUnhintableMutation(RowMutation mutation, InetAddress to, IMessageCallback cb) + public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout) { - String id = nextId(); - callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer), DatabaseDescriptor.getWriteRpcTimeout()); - sendOneWay(mutation.createMessage(), id, to); + int id = addCallback(cb, message, to, timeout); + sendOneWay(message, id, to); + return id; } /** @@@ -596,14 -577,24 +605,16 @@@ * * @param message message to be sent. * @param to endpoint to which the message needs to be sent - * @param cb callback interface which is used to pass the responses or + * @param handler callback interface which is used to pass the responses or * suggest that a timeout occurred to the invoker of the send(). - * suggest that a timeout occurred to the invoker of the send(). - * @param timeout the timeout used for expiration * @return an reference to message id used to match with the result */ - public int sendRR(MessageOut<? extends IMutation> message, InetAddress to, AbstractWriteResponseHandler handler) - public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout) ++ public int sendRR(MessageOut<? extends IMutation> message, ++ InetAddress to, ++ AbstractWriteResponseHandler handler, ++ boolean allowHints) { - int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel); - String id = addCallback(cb, message, to, timeout); - - if (cb instanceof AbstractWriteResponseHandler) - { - PBSPredictor.instance().startWriteOperation(id); - } - else if (cb instanceof ReadCallback) - { - PBSPredictor.instance().startReadOperation(id); - } - ++ int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints); sendOneWay(message, id, to); return id; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/WriteCallbackInfo.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java index be7b668,0000000..987ec15 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java +++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java @@@ -1,46 -1,0 +1,54 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.service.StorageProxy; + +public class WriteCallbackInfo extends CallbackInfo +{ + public final MessageOut sentMessage; + private final ConsistencyLevel consistencyLevel; ++ private final boolean allowHints; + - public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel) ++ public WriteCallbackInfo(InetAddress target, ++ IAsyncCallback callback, ++ MessageOut message, ++ IVersionedSerializer<?> serializer, ++ ConsistencyLevel consistencyLevel, ++ boolean allowHints) + { + super(target, callback, serializer); + assert message != null; + this.sentMessage = message; + this.consistencyLevel = consistencyLevel; ++ this.allowHints = allowHints; + } + + public boolean shouldHint() + { - return sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION - && consistencyLevel != ConsistencyLevel.ANY - && StorageProxy.shouldHint(target); ++ return allowHints ++ && sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION ++ && consistencyLevel != ConsistencyLevel.ANY ++ && StorageProxy.shouldHint(target); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 033ce8e,7ef3d72..fc6ee3a --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -917,29 -616,35 +917,34 @@@ public class StorageProxy implements St Iterator<InetAddress> iter = targets.iterator(); InetAddress target = iter.next(); - // direct writes to local DC or old Cassandra versions - if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_12) + // Add the other destinations of the same message as a FORWARD_HEADER entry + DataOutputBuffer out = new DataOutputBuffer(); + try { - // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid - // creating a second iterator since we already have a perfectly good one - MessagingService.instance().sendRR(message, target, handler); + out.writeInt(targets.size() - 1); while (iter.hasNext()) { - target = iter.next(); - MessagingService.instance().sendRR(message, target, handler); + InetAddress destination = iter.next(); + CompactEndpointSerializationHelper.serialize(destination, out); - int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel); ++ int id = MessagingService.instance().addCallback(handler, ++ message, ++ destination, ++ message.getTimeout(), ++ handler.consistencyLevel, ++ true); + out.writeInt(id); + logger.trace("Adding FWD message to {}@{}", id, destination); } - return; + message = message.withParameter(RowMutation.FORWARD_TO, out.getData()); + // send the combined message + forward headers + int id = MessagingService.instance().sendRR(message, target, handler); + logger.trace("Sending message to {}@{}", id, target); } - - // Add all the other destinations of the same message as a FORWARD_HEADER entry - FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - dos.writeInt(targets.size() - 1); - while (iter.hasNext()) + catch (IOException e) { - InetAddress destination = iter.next(); - CompactEndpointSerializationHelper.serialize(destination, dos); - String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); - dos.writeUTF(id); + // DataOutputBuffer is in-memory, doesn't throw IOException + throw new AssertionError(e); } - message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray()); - // send the combined message + forward headers - Tracing.trace("Enqueuing message to {}", target); - MessagingService.instance().sendRR(message, target, handler); } private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)