Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 b7231ff8a -> e6c8034b1
  refs/heads/cassandra-2.1 5baa72f7f -> 874a34174
  refs/heads/trunk 6838790f8 -> 5f6e780d8


Fix SSTable not released if stream fails before it starts

patch by yukim; reviewed by Richard Low for CASSANDRA-6818


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35d4b5de
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35d4b5de
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35d4b5de

Branch: refs/heads/cassandra-2.0
Commit: 35d4b5de8f3ee18ec98b01f3aa0951df0e11e8d2
Parents: b7bb2fb
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Mar 24 07:44:19 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Mar 24 07:44:19 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                     | 1 +
 .../org/apache/cassandra/streaming/AbstractStreamSession.java   | 2 --
 src/java/org/apache/cassandra/streaming/StreamInSession.java    | 5 +++++
 src/java/org/apache/cassandra/streaming/StreamOutSession.java   | 5 +++++
 4 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 960b0e9..fa46c2e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,7 @@
  * Avoid NPEs when receiving table changes for an unknown keyspace 
(CASSANDRA-5631)
  * Fix bootstrapping when there is no schema (CASSANDRA-6685)
  * Fix truncating compression metadata (CASSANDRA-6791)
+ * Fix SSTable not released if stream session fails before starts 
(CASSANDRA-6818)
 
 
 1.2.15

http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java 
b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
index 89fbf5f..f8de827 100644
--- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
@@ -44,8 +44,6 @@ public abstract class AbstractStreamSession implements 
IEndpointStateChangeSubsc
         this.sessionId = sessionId;
         this.table = table;
         this.callback = callback;
-        Gossiper.instance.register(this);
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
     public UUID getSessionId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java 
b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index e83a5b6..f9cdc31 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -24,6 +24,7 @@ import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -61,6 +62,8 @@ public class StreamInSession extends AbstractStreamSession
     public static StreamInSession create(InetAddress host, IStreamCallback 
callback)
     {
         StreamInSession session = new StreamInSession(host, 
UUIDGen.getTimeUUID(), callback);
+        Gossiper.instance.register(session);
+        
FailureDetector.instance.registerFailureDetectionEventListener(session);
         sessions.put(session.getSessionId(), session);
         return session;
     }
@@ -71,6 +74,8 @@ public class StreamInSession extends AbstractStreamSession
         if (session == null)
         {
             StreamInSession possibleNew = new StreamInSession(host, sessionId, 
null);
+            Gossiper.instance.register(possibleNew);
+            
FailureDetector.instance.registerFailureDetectionEventListener(possibleNew);
             if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == 
null)
                 session = possibleNew;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java 
b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index edc07ca..c4d7695 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -154,6 +156,9 @@ public class StreamOutSession extends AbstractStreamSession
 
     public void begin()
     {
+        Gossiper.instance.register(this);
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
+
         PendingFile first = files.isEmpty() ? null : 
files.values().iterator().next();
         currentFile = first == null ? null : first.getFilename();
         StreamHeader header = new StreamHeader(table, getSessionId(), first, 
files.values());

Reply via email to