Fix streaming to catch exception so retry not fail

patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc

Branch: refs/heads/cassandra-2.2
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 16 ++++++++++++++--
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
  * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
  * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
  * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
         DataInputStream dis = new DataInputStream(new 
LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             while (in.getBytesRead() < totalSize)
             {
                 writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can 
drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
         CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo, 
inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             for (Pair<Long, Long> section : sections)
             {
                 long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can 
drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

Reply via email to