Repository: cassandra
Updated Branches:
  refs/heads/trunk d97d95ff1 -> 982ab93a2


Properly close StreamCompressionInputStream to release any ByteBuffer

patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-13906


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/982ab93a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/982ab93a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/982ab93a

Branch: refs/heads/trunk
Commit: 982ab93a2f8a0f5c56af9378f65d3e9e430000b9
Parents: d97d95f
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Tue Sep 26 15:52:54 2017 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Wed Oct 4 04:21:20 2017 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 11 ++++++-----
 .../streaming/compress/CompressedInputStream.java  |  4 +++-
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++-----------
 .../compress/StreamCompressionInputStream.java     |  7 ++++++-
 5 files changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2498270..5a8ab47 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Properly close StreamCompressionInputStream to release any ByteBuf 
(CASSANDRA-13906)
  * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
  * LCS needlessly checks for L0 STCS candidates multiple times 
(CASSANDRA-12961)
  * Correctly close netty channels when a stream session ends (CASSANDRA-13905)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 590ba5f..f4eb9c4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -106,12 +106,12 @@ public class StreamReader
                      session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(),
                      cfs.getTableName(), pendingRepair);
 
-
-        TrackedDataInputPlus in = new TrackedDataInputPlus(new 
StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION));
-        StreamDeserializer deserializer = new 
StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
+        StreamDeserializer deserializer = null;
         SSTableMultiWriter writer = null;
-        try
+        try (StreamCompressionInputStream streamCompressionInputStream = new 
StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION))
         {
+            TrackedDataInputPlus in = new 
TrackedDataInputPlus(streamCompressionInputStream);
+            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
             writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
             while (in.getBytesRead() < totalSize)
             {
@@ -125,8 +125,9 @@ public class StreamReader
         }
         catch (Throwable e)
         {
+            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
             logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
-                        session.planId(), deserializer.partitionKey(), 
cfs.keyspace.getName(), cfs.getTableName(), e);
+                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName(), e);
             if (writer != null)
             {
                 writer.abort(e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 76f76ea..4b9fc61 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.utils.WrappedRunnable;
  * InputStream which reads data from underlining source with given {@link 
CompressionInfo}. Uses {@link #buffer} as a buffer
  * for uncompressed data (which is read by stream consumers - {@link 
StreamDeserializer} in this case).
  */
-public class CompressedInputStream extends RebufferingInputStream
+public class CompressedInputStream extends RebufferingInputStream implements 
AutoCloseable
 {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CompressedInputStream.class);
@@ -200,6 +200,8 @@ public class CompressedInputStream extends 
RebufferingInputStream
     }
 
     /**
+     * {@inheritDoc}
+     *
      * Releases the resources specific to this instance, but not the {@link 
DataInputPlus} that is used by the {@link Reader}.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index e40788b..bd44209 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -75,14 +75,12 @@ public class CompressedStreamReader extends StreamReader
                      session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(), pendingRepair,
                      cfs.getTableName());
 
-        CompressedInputStream cis = new CompressedInputStream(inputPlus, 
compressionInfo,
-                                                              
ChecksumType.CRC32, cfs::getCrcCheckChance);
-        TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
-
-        StreamDeserializer deserializer = new 
StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
+        StreamDeserializer deserializer = null;
         SSTableMultiWriter writer = null;
-        try
+        try (CompressedInputStream cis = new CompressedInputStream(inputPlus, 
compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
         {
+            TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
+            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
             writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
             String filename = writer.getFilename();
             int sectionIdx = 0;
@@ -109,8 +107,9 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
+            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
             logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
-                        session.planId(), deserializer.partitionKey(), 
cfs.keyspace.getName(), cfs.getTableName());
+                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName());
             if (writer != null)
             {
                 writer.abort(e);
@@ -119,10 +118,6 @@ public class CompressedStreamReader extends StreamReader
                 throw e;
             throw Throwables.propagate(e);
         }
-        finally
-        {
-            cis.close();
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
 
b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
index 4b1459d..daf6d28 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.io.util.RebufferingInputStream;
 import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
 import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
 
-public class StreamCompressionInputStream extends RebufferingInputStream
+public class StreamCompressionInputStream extends RebufferingInputStream 
implements AutoCloseable
 {
     /**
      * The stream which contains buffers of compressed data that came from the 
peer.
@@ -70,6 +70,11 @@ public class StreamCompressionInputStream extends 
RebufferingInputStream
         buffer = currentBuf.nioBuffer(0, currentBuf.readableBytes());
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * Close resources except {@link #dataInputPlus} as that needs to remain 
open for other streaming activity.
+     */
     @Override
     public void close()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to