Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08

Branch: refs/heads/cassandra-3.0
Commit: a549bd085f5244b3271249ce881ac30dd3f27553
Parents: 82aa796 068614c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 09:40:49 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:40:49 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 17 ++++++++++++++---
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac997f2,4c24b35..5c23acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -2.1.12
 +2.2.4
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
   * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
   * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
   * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d,5389a80..1ccebb0
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -94,12 -89,12 +94,12 @@@ public class StreamReade
          }
          ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, 
format);
- 
          DataInputStream dis = new DataInputStream(new 
LZFInputStream(Channels.newInputStream(channel)));
          BytesReadTracker in = new BytesReadTracker(dis);
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
                  writeRow(writer, in, cfs);
@@@ -108,9 -102,21 +108,20 @@@
                  session.progress(desc, ProgressInfo.Direction.IN, 
in.getBytesRead(), totalSize);
              }
              return writer;
 -        }
 -        catch (Throwable e)
 +        } catch (Throwable e)
          {
-             writer.abort();
+             if (writer != null)
+             {
+                 try
+                 {
+                     writer.abort();
+                 }
+                 catch (Throwable e2)
+                 {
+                     // add abort error to original and continue so we can 
drain unread stream
+                     e.addSuppressed(e2);
+                 }
+             }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94,0529496..facb906
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext
          }
          ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, 
format);
- 
 -        CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo, 
inputVersion.hasPostCompressionAdlerChecksums);
 +        CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              for (Pair<Long, Long> section : sections)
              {
 -                long length = section.right - section.left;
 +                assert cis.getTotalCompressedBytesRead() <= totalSize;
 +                int sectionLength = (int) (section.right - section.left);
 +
                  // skip to beginning of section inside chunk
                  cis.position(section.left);
                  in.reset(0);

Reply via email to