Add optional socket timeout for streaming patch by vijay2win; reviewed by slebresne for CASSANDRA-3838
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a35f8787 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a35f8787 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a35f8787 Branch: refs/heads/trunk Commit: a35f8787cfbfb2ad60ee7795c104ee05661227dd Parents: 0074d64 Author: Sylvain Lebresne <sylv...@riptano.com> Authored: Sun Feb 5 22:23:51 2012 +0100 Committer: Sylvain Lebresne <sylv...@riptano.com> Committed: Sun Feb 5 22:23:51 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 7 +++++++ conf/cassandra.yaml | 7 +++++++ src/java/org/apache/cassandra/config/Config.java | 2 ++ .../cassandra/config/DatabaseDescriptor.java | 5 +++++ .../apache/cassandra/streaming/FileStreamTask.java | 1 + .../cassandra/streaming/IncomingStreamReader.java | 1 + 7 files changed, 24 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b201961..83eca8a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ readability (CASSANDRA-3726) * synchronize BiMap of bootstrapping tokens (CASSANDRA-3417) * show index options in CLI (CASSANDRA-3809) + * add optional socket timeout for streaming (CASSANDRA-3838) Merged from 0.8: * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily case (CASSANDRA-3251) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 2c29606..e36b9e8 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -8,6 +8,13 @@ upgrade, just in case you need to roll back to the previous version. (Cassandra version X + 1 will always be able to read data files created by version X, but the inverse is not necessarily the case.) +1.0.8 +===== + +Other +----- + - Allow configuring socket timeout for streaming + 1.0.7 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 2082db0..209bcb8 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -315,6 +315,13 @@ compaction_preheat_key_cache: true # Time to wait for a reply from other nodes before failing the command rpc_timeout_in_ms: 10000 +# Enable socket timeout for streaming operation. +# When a timeout occurs during streaming, streaming is retried from the start +# of the current file. This *can* involve re-streaming an important amount of +# data, so you should avoid setting the value too low. +# Default value is 0, which never timeout streams. +# streaming_socket_timeout_in_ms: 0 + # phi value that must be reached for a host to be marked down. # most users should never need to adjust this. # phi_convict_threshold: 8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 9f5480c..7cff37f 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -50,6 +50,8 @@ public class Config public Long rpc_timeout_in_ms = new Long(2000); + public Integer streaming_socket_timeout_in_ms = new Integer(0); + public Integer phi_convict_threshold = 8; public Integer concurrent_reads = 8; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 03b5175..5aa59e4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1008,4 +1008,9 @@ public class DatabaseDescriptor { return conf.commitlog_total_space_in_mb; } + + public static int getStreamingSocketTimeout() + { + return conf.streaming_socket_timeout_in_ms; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index 9411b16..ffb1388 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -197,6 +197,7 @@ public class FileStreamTask extends WrappedRunnable try { socket = MessagingService.instance().getConnectionPool(to).newSocket(); + socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); output = socket.getOutputStream(); break; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java index e2a618f..8ade06a 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java @@ -55,6 +55,7 @@ public class IncomingStreamReader public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException { + socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); this.socket = socket; InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress(); session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);