Updated Branches: refs/heads/trunk ced78f3c4 -> e6610e469
split up rpc timeout by operation type patch by Melvin Wang and jbellis; reviewed by vijay for CASSANDRA-2819 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e6610e46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e6610e46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e6610e46 Branch: refs/heads/trunk Commit: e6610e4692800485501e316df3dab53a6e9e34b2 Parents: ced78f3 Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed May 16 14:49:56 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Jun 21 13:19:02 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 5 + conf/cassandra.yaml | 13 +++- src/java/org/apache/cassandra/config/Config.java | 10 ++- .../cassandra/config/DatabaseDescriptor.java | 70 +++++++++++++++ .../org/apache/cassandra/db/RangeSliceCommand.java | 10 ++- src/java/org/apache/cassandra/db/ReadCommand.java | 6 ++ .../org/apache/cassandra/dht/BootStrapper.java | 2 +- .../apache/cassandra/net/MessageDeliveryTask.java | 2 +- src/java/org/apache/cassandra/net/MessageIn.java | 6 ++ src/java/org/apache/cassandra/net/MessageOut.java | 6 ++ .../org/apache/cassandra/net/MessagingService.java | 17 +--- .../cassandra/net/OutboundTcpConnection.java | 4 +- .../service/AbstractWriteResponseHandler.java | 3 +- .../org/apache/cassandra/service/IReadCommand.java | 1 + .../org/apache/cassandra/service/ReadCallback.java | 2 +- .../apache/cassandra/service/RepairCallback.java | 2 +- .../org/apache/cassandra/service/StorageProxy.java | 29 ++++--- .../cassandra/service/StorageProxyMBean.java | 8 ++ .../cassandra/service/TruncateResponseHandler.java | 2 +- .../apache/cassandra/thrift/CassandraServer.java | 10 +- 21 files changed, 167 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8da61ba..344c87b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * split up rpc timeout by operation type (CASSANDRA-2819) * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762) * update MS protocol with a version handshake + broadcast address id (CASSANDRA-4311) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 6cad6c2..23814b1 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -36,6 +36,11 @@ Upgrading - Global option hinted_handoff_throttle_delay_in_ms has been removed. hinted_handoff_throttle_in_kb has been added instead. +Features +-------- + - rpc_timeout has been split up to allow finer-grained control + on timeouts for different operation types + 1.1.1 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index cddd491..a584448 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -400,7 +400,18 @@ compaction_preheat_key_cache: true # When unset, the default is 400 Mbps or 50 MB/s. # stream_throughput_outbound_megabits_per_sec: 400 -# Time to wait for a reply from other nodes before failing the command +# How long the coordinator should wait for read operations to complete +read_rpc_timeout_in_ms: 10000 +# How long the coordinator should wait for seq or index scans to complete +range_rpc_timeout_in_ms: 10000 +# How long the coordinator should wait for writes to complete +write_rpc_timeout_in_ms: 10000 +# How long the coordinator should wait for truncates to complete +# (This can be much longer, because we need to flush all CFs +# to make sure we can clear out anythink in the commitlog that could +# cause truncated data to reappear.) +truncate_timeout_in_ms: 300000 +# The default timeout for other, miscellaneous operations rpc_timeout_in_ms: 10000 # Enable socket timeout for streaming operation. http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index e1c9a42..1cda551 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -45,7 +45,15 @@ public class Config /* initial token in the ring */ public String initial_token; - public Long rpc_timeout_in_ms = new Long(2000); + public Long rpc_timeout_in_ms = new Long(10000); + + public Long read_rpc_timeout_in_ms = new Long(10000); + + public Long range_rpc_timeout_in_ms = new Long(10000); + + public Long write_rpc_timeout_in_ms = new Long(10000); + + public Long truncate_rpc_timeout_in_ms = new Long(300000); public Integer streaming_socket_timeout_in_ms = new Integer(0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index d94782c..12f5a4d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -26,6 +26,7 @@ import java.net.URL; import java.net.UnknownHostException; import java.util.*; +import com.google.common.primitives.Longs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +46,12 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.locator.EndpointSnitchInfo; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.SeedProvider; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.scheduler.NoScheduler; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.CassandraDaemon; import org.apache.cassandra.utils.FBUtilities; import org.yaml.snakeyaml.Loader; @@ -696,6 +699,73 @@ public class DatabaseDescriptor conf.rpc_timeout_in_ms = timeOutInMillis; } + public static long getReadRpcTimeout() + { + return conf.read_rpc_timeout_in_ms; + } + + public static void setReadRpcTimeout(Long timeOutInMillis) + { + conf.read_rpc_timeout_in_ms = timeOutInMillis; + } + + public static long getRangeRpcTimeout() + { + return conf.range_rpc_timeout_in_ms; + } + + public static void setRangeRpcTimeout(Long timeOutInMillis) + { + conf.range_rpc_timeout_in_ms = timeOutInMillis; + } + + public static long getWriteRpcTimeout() + { + return conf.write_rpc_timeout_in_ms; + } + + public static void setWriteRpcTimeout(Long timeOutInMillis) + { + conf.write_rpc_timeout_in_ms = timeOutInMillis; + } + + public static long getTruncateRpcTimeout() + { + return conf.truncate_rpc_timeout_in_ms; + } + + public static void setTruncateRpcTimeout(Long timeOutInMillis) + { + conf.truncate_rpc_timeout_in_ms = timeOutInMillis; + } + + // not part of the Verb enum so we can change timeouts easily via JMX + public static long getTimeout(MessagingService.Verb verb) + { + switch (verb) + { + case READ: + return getReadRpcTimeout(); + case RANGE_SLICE: + return getRangeRpcTimeout(); + case TRUNCATE: + return getTruncateRpcTimeout(); + case READ_REPAIR: + case MUTATION: + return getWriteRpcTimeout(); + default: + return getRpcTimeout(); + } + } + + /** + * @return the minimum configured {read, write, range, truncate, misc} timeout + */ + public static long getMinRpcTimeout() + { + return Longs.min(getRpcTimeout(), getReadRpcTimeout(), getRangeRpcTimeout(), getWriteRpcTimeout(), getTruncateRpcTimeout()); + } + public static double getPhiConvictThreshold() { return conf.phi_convict_threshold; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/db/RangeSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java index c0054ea..5511127 100644 --- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java +++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java @@ -43,8 +43,11 @@ import java.util.ArrayList; import java.util.List; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.filter.IFilter; +import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.io.IVersionedSerializer; @@ -168,6 +171,11 @@ public class RangeSliceCommand implements IReadCommand SlicePredicate pred = RangeSliceCommandSerializer.asSlicePredicate(predicate); return new IndexScanCommand(keyspace, column_family, clause, pred, range); } + + public long getTimeout() + { + return DatabaseDescriptor.getRangeRpcTimeout(); + } } class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand> http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 8749c46..74d8fba 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.IVersionedSerializer; @@ -99,6 +100,11 @@ public abstract class ReadCommand implements IReadCommand { // noop } + + public long getTimeout() + { + return DatabaseDescriptor.getReadRpcTimeout(); + } } class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 7b14586..8abc48d 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -155,7 +155,7 @@ public class BootStrapper { MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN); int retries = 5; - long timeout = Math.max(MessagingService.getDefaultCallbackTimeout(), BOOTSTRAP_TIMEOUT); + long timeout = Math.max(DatabaseDescriptor.getRpcTimeout(), BOOTSTRAP_TIMEOUT); while (retries > 0) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index a5fd614..dc50ef8 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -41,7 +41,7 @@ public class MessageDeliveryTask implements Runnable { MessagingService.Verb verb = message.verb; if (MessagingService.DROPPABLE_VERBS.contains(verb) - && System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout()) + && System.currentTimeMillis() > constructionTime + message.getTimeout()) { MessagingService.instance().incrementDroppedMessages(verb); return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index f3b712e..38e376f 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -26,6 +26,7 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.FileUtils; @@ -99,6 +100,11 @@ public class MessageIn<T> return MessagingService.verbStages.get(verb); } + public long getTimeout() + { + return DatabaseDescriptor.getTimeout(verb); + } + public String toString() { StringBuilder sbuf = new StringBuilder(""); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index fe09c2e..78546c8 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -27,6 +27,7 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.utils.FBUtilities; @@ -87,6 +88,11 @@ public class MessageOut<T> return MessagingService.verbStages.get(verb); } + public long getTimeout() + { + return DatabaseDescriptor.getTimeout(verb); + } + public String toString() { StringBuilder sbuf = new StringBuilder(""); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c0af377..18f3baa 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -285,7 +285,6 @@ public final class MessagingService implements MessagingServiceMBean private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>(); private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>(); private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>(); - private static final long DEFAULT_CALLBACK_TIMEOUT = DatabaseDescriptor.getRpcTimeout(); // protocol versions of the other nodes in the cluster private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>(); @@ -324,7 +323,7 @@ public final class MessagingService implements MessagingServiceMBean public Object apply(Pair<String, CallbackInfo> pair) { CallbackInfo expiredCallbackInfo = pair.right; - maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) DatabaseDescriptor.getRpcTimeout()); + maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) expiredCallbackInfo.sentMessage.getTimeout()); totalTimeouts++; String ip = expiredCallbackInfo.target.getHostAddress(); AtomicLong c = timeoutsPerHost.get(ip); @@ -350,7 +349,7 @@ public final class MessagingService implements MessagingServiceMBean } }; - callbacks = new ExpiringMap<String, CallbackInfo>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter); + callbacks = new ExpiringMap<String, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try @@ -488,11 +487,6 @@ public final class MessagingService implements MessagingServiceMBean return verbHandlers.get(type); } - public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to) - { - return addCallback(cb, message, to, DEFAULT_CALLBACK_TIMEOUT); - } - public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout) { String messageId = nextId(); @@ -520,7 +514,7 @@ public final class MessagingService implements MessagingServiceMBean */ public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb) { - return sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT); + return sendRR(message, to, cb, message.getTimeout()); } /** @@ -914,11 +908,6 @@ public final class MessagingService implements MessagingServiceMBean return completedTasks; } - public static long getDefaultCallbackTimeout() - { - return DEFAULT_CALLBACK_TIMEOUT; - } - public Map<String, Integer> getDroppedMessages() { Map<String, Integer> map = new HashMap<String, Integer>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index a5d8181..bdf8223 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -131,7 +131,7 @@ public class OutboundTcpConnection extends Thread disconnect(); continue; } - if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout()) + if (entry.timestamp < System.currentTimeMillis() - m.getTimeout()) dropped.incrementAndGet(); else if (socket != null || connect()) writeConnected(m, id); @@ -312,7 +312,7 @@ public class OutboundTcpConnection extends Thread while (true) { Entry entry = backlog.peek(); - if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout()) + if (entry == null || entry.timestamp >= System.currentTimeMillis() - entry.message.getTimeout()) break; Entry entry2 = backlog.poll(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index a6c38ae..db85a47 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -44,7 +44,8 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand public void get() throws TimeoutException { - long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime); + boolean success; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/IReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IReadCommand.java b/src/java/org/apache/cassandra/service/IReadCommand.java index 743c80b..c6a129e 100644 --- a/src/java/org/apache/cassandra/service/IReadCommand.java +++ b/src/java/org/apache/cassandra/service/IReadCommand.java @@ -20,4 +20,5 @@ package org.apache.cassandra.service; public interface IReadCommand { public String getKeyspace(); + public long getTimeout(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 6608b3e..a3d273c 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -128,7 +128,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag public TResolved get() throws TimeoutException, DigestMismatchException, IOException { - long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + long timeout = command.getTimeout() - (System.currentTimeMillis() - startTime); boolean success; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/RepairCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java index 97f3079..8ded3e0 100644 --- a/src/java/org/apache/cassandra/service/RepairCallback.java +++ b/src/java/org/apache/cassandra/service/RepairCallback.java @@ -55,7 +55,7 @@ public class RepairCallback implements IAsyncCallback public Row get() throws TimeoutException, DigestMismatchException, IOException { - long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime); try { condition.await(timeout, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 51f5fe2..92a3256 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -430,7 +430,7 @@ public class StorageProxy implements StorageProxyMBean { InetAddress destination = iter.next(); CompactEndpointSerializationHelper.serialize(destination, dos); - String id = MessagingService.instance().addCallback(handler, message, destination); + String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); dos.writeUTF(id); if (logger.isDebugEnabled()) logger.debug("Adding FWD message to: " + destination + " with ID " + id); @@ -763,7 +763,7 @@ public class StorageProxy implements StorageProxyMBean RepairCallback handler = repairResponseHandlers.get(i); // wait for the repair writes to be acknowledged, to minimize impact on any replica that's // behind on writes in case the out-of-sync row is read multiple times in quick succession - FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); + FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); Row row; try @@ -903,7 +903,7 @@ public class StorageProxy implements StorageProxyMBean columnsCount += row.getLiveColumnCount(); logger.debug("range slices read {}", row.key); } - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); + FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); } catch (TimeoutException ex) { @@ -1237,7 +1237,7 @@ public class StorageProxy implements StorageProxyMBean public final void run() { - if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout()) + if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(verb)) { MessagingService.instance().incrementDroppedMessages(verb); return; @@ -1282,13 +1282,18 @@ public class StorageProxy implements StorageProxyMBean logger.warn("Some hints were not written before shutdown. This is not supposed to happen. You should (a) run repair, and (b) file a bug report"); } - public Long getRpcTimeout() - { - return DatabaseDescriptor.getRpcTimeout(); - } + public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); } + public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); } - public void setRpcTimeout(Long timeoutInMillis) - { - DatabaseDescriptor.setRpcTimeout(timeoutInMillis); - } + public Long getReadRpcTimeout() { return DatabaseDescriptor.getReadRpcTimeout(); } + public void setReadRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setReadRpcTimeout(timeoutInMillis); } + + public Long getWriteRpcTimeout() { return DatabaseDescriptor.getWriteRpcTimeout(); } + public void setWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setWriteRpcTimeout(timeoutInMillis); } + + public Long getRangeRpcTimeout() { return DatabaseDescriptor.getRangeRpcTimeout(); } + public void setRangeRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRangeRpcTimeout(timeoutInMillis); } + + public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); } + public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index f1fb25d..dd1541a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -48,4 +48,12 @@ public interface StorageProxyMBean public Long getRpcTimeout(); public void setRpcTimeout(Long timeoutInMillis); + public Long getReadRpcTimeout(); + public void setReadRpcTimeout(Long timeoutInMillis); + public Long getWriteRpcTimeout(); + public void setWriteRpcTimeout(Long timeoutInMillis); + public Long getRangeRpcTimeout(); + public void setRangeRpcTimeout(Long timeoutInMillis); + public Long getTruncateRpcTimeout(); + public void setTruncateRpcTimeout(Long timeoutInMillis); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/TruncateResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java index e5dc93d..e17211c 100644 --- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java +++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java @@ -49,7 +49,7 @@ public class TruncateResponseHandler implements IAsyncCallback public void get() throws TimeoutException { - long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + long timeout = DatabaseDescriptor.getTruncateRpcTimeout() - (System.currentTimeMillis() - startTime); boolean success; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 54bf927..72a4677 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -104,7 +104,7 @@ public class CassandraServer implements Cassandra.Iface List<Row> rows; try { - schedule(DatabaseDescriptor.getRpcTimeout()); + schedule(DatabaseDescriptor.getReadRpcTimeout()); try { rows = StorageProxy.read(commands, consistency_level); @@ -626,7 +626,7 @@ public class CassandraServer implements Cassandra.Iface return; try { - schedule(DatabaseDescriptor.getRpcTimeout()); + schedule(DatabaseDescriptor.getWriteRpcTimeout()); try { StorageProxy.mutate(mutations, consistency_level); @@ -685,7 +685,7 @@ public class CassandraServer implements Cassandra.Iface { bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p)); } - schedule(DatabaseDescriptor.getRpcTimeout()); + schedule(DatabaseDescriptor.getRangeRpcTimeout()); try { IFilter filter = ThriftValidation.asIFilter(predicate, metadata.getComparatorFor(column_parent.super_column)); @@ -745,7 +745,7 @@ public class CassandraServer implements Cassandra.Iface List<Row> rows; try { - schedule(DatabaseDescriptor.getRpcTimeout()); + schedule(DatabaseDescriptor.getRangeRpcTimeout()); try { IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator); @@ -1091,7 +1091,7 @@ public class CassandraServer implements Cassandra.Iface cState.hasColumnFamilyAccess(cfname, Permission.WRITE); try { - schedule(DatabaseDescriptor.getRpcTimeout()); + schedule(DatabaseDescriptor.getTruncateRpcTimeout()); try { StorageProxy.truncateBlocking(cState.getKeyspace(), cfname);