Add failure callbacks for outgoing streams. Patch by Yuki Morishita, reviewed by brandonwilliams for CASSANDRA-4051
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/34c1fc0b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/34c1fc0b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/34c1fc0b Branch: refs/heads/trunk Commit: 34c1fc0b7cbdf568ec7869a564fe614f96dcbe9b Parents: 97aa922 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Apr 11 15:06:45 2012 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Apr 11 15:06:45 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/dht/RangeStreamer.java | 6 ++++- .../apache/cassandra/io/sstable/SSTableLoader.java | 6 ++++- .../apache/cassandra/service/StorageService.java | 19 ++++++++++++-- .../apache/cassandra/streaming/FileStreamTask.java | 7 +++++ .../cassandra/streaming/StreamInSession.java | 12 ++++++++- 5 files changed, 43 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index dac05cf..6f7beb0 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -230,7 +230,11 @@ public class RangeStreamer source, table, opType, latch.getCount())); } - public void onFailure() {} + public void onFailure() + { + logger.warn("Streaming from " + source + " failed"); + onSuccess(); // calling onSuccess for latch countdown + } }; if (logger.isDebugEnabled()) logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", ")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 85b5146..79259ec 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -227,7 +227,11 @@ public class SSTableLoader client.stop(); } - public void onFailure() {} + public void onFailure() + { + outputHandler.output(String.format("Streaming session to %s failed", endpoint)); + onSuccess(); // call onSuccess for latch countdown + } } public interface OutputHandler http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 84c0096..88b9c19 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1502,7 +1502,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } } - public void onFailure() {} + public void onFailure() + { + logger_.warn("Streaming from " + source + " failed"); + onSuccess(); // calling onSuccess to send notification + } }; if (logger_.isDebugEnabled()) logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); @@ -2813,7 +2817,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe latch.countDown(); } } - public void onFailure() {} + + public void onFailure() + { + logger_.warn("Streaming to " + endPointEntry + " failed"); + onSuccess(); // calling onSuccess for latch countdown + } }; StageManager.getStage(Stage.STREAM).execute(new Runnable() @@ -2865,7 +2874,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe latch.countDown(); } - public void onFailure() {} + public void onFailure() + { + logger_.warn("Streaming from " + source + " failed"); + onSuccess(); // calling onSuccess for latch countdown + } }; if (logger_.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index babdc4e..8ff2b83 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -106,6 +106,13 @@ public class FileStreamTask extends WrappedRunnable logger.info("Finished streaming session to {}", to); } } + catch (IOException e) + { + StreamOutSession session = StreamOutSession.get(to, header.sessionId); + if (session != null) + session.close(false); + throw e; + } finally { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/34c1fc0b/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index e662a49..a5e08f0 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -127,12 +127,20 @@ public class StreamInSession extends AbstractStreamSession { logger.error(String.format("Failed streaming session %d from %s while receiving %s", getSessionId(), getHost().toString(), current), new IllegalStateException("Too many retries for " + remoteFile)); - closeInternal(false); + close(false); return; } StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY); logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this); - sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost()))); + try + { + sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost()))); + } + catch (IOException e) + { + logger.error("Sending retry message failed, closing session.", e); + close(false); + } } public void sendMessage(Message message) throws IOException