Repository: cassandra Updated Branches: refs/heads/trunk d97d95ff1 -> 982ab93a2
Properly close StreamCompressionInputStream to release any ByteBuffer patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-13906 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/982ab93a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/982ab93a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/982ab93a Branch: refs/heads/trunk Commit: 982ab93a2f8a0f5c56af9378f65d3e9e430000b9 Parents: d97d95f Author: Jason Brown <jasedbr...@gmail.com> Authored: Tue Sep 26 15:52:54 2017 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Wed Oct 4 04:21:20 2017 +0900 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/streaming/StreamReader.java | 11 ++++++----- .../streaming/compress/CompressedInputStream.java | 4 +++- .../streaming/compress/CompressedStreamReader.java | 17 ++++++----------- .../compress/StreamCompressionInputStream.java | 7 ++++++- 5 files changed, 22 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2498270..5a8ab47 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906) * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925) * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961) * Correctly close netty channels when a stream session ends (CASSANDRA-13905) http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 590ba5f..f4eb9c4 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -106,12 +106,12 @@ public class StreamReader session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), cfs.getTableName(), pendingRepair); - - TrackedDataInputPlus in = new TrackedDataInputPlus(new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION)); - StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); + StreamDeserializer deserializer = null; SSTableMultiWriter writer = null; - try + try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION)) { + TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream); + deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); while (in.getBytesRead() < totalSize) { @@ -125,8 +125,9 @@ public class StreamReader } catch (Throwable e) { + Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName(), e); + session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e); if (writer != null) { writer.abort(e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 76f76ea..4b9fc61 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -44,7 +44,7 @@ import org.apache.cassandra.utils.WrappedRunnable; * InputStream which reads data from underlining source with given {@link CompressionInfo}. Uses {@link #buffer} as a buffer * for uncompressed data (which is read by stream consumers - {@link StreamDeserializer} in this case). */ -public class CompressedInputStream extends RebufferingInputStream +public class CompressedInputStream extends RebufferingInputStream implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class); @@ -200,6 +200,8 @@ public class CompressedInputStream extends RebufferingInputStream } /** + * {@inheritDoc} + * * Releases the resources specific to this instance, but not the {@link DataInputPlus} that is used by the {@link Reader}. */ @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index e40788b..bd44209 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -75,14 +75,12 @@ public class CompressedStreamReader extends StreamReader session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair, cfs.getTableName()); - CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, - ChecksumType.CRC32, cfs::getCrcCheckChance); - TrackedDataInputPlus in = new TrackedDataInputPlus(cis); - - StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); + StreamDeserializer deserializer = null; SSTableMultiWriter writer = null; - try + try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance)) { + TrackedDataInputPlus in = new TrackedDataInputPlus(cis); + deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); String filename = writer.getFilename(); int sectionIdx = 0; @@ -109,8 +107,9 @@ public class CompressedStreamReader extends StreamReader } catch (Throwable e) { + Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName()); + session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName()); if (writer != null) { writer.abort(e); @@ -119,10 +118,6 @@ public class CompressedStreamReader extends StreamReader throw e; throw Throwables.propagate(e); } - finally - { - cis.close(); - } } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/982ab93a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java index 4b1459d..daf6d28 100644 --- a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java @@ -31,7 +31,7 @@ import org.apache.cassandra.io.util.RebufferingInputStream; import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; import org.apache.cassandra.streaming.async.StreamCompressionSerializer; -public class StreamCompressionInputStream extends RebufferingInputStream +public class StreamCompressionInputStream extends RebufferingInputStream implements AutoCloseable { /** * The stream which contains buffers of compressed data that came from the peer. @@ -70,6 +70,11 @@ public class StreamCompressionInputStream extends RebufferingInputStream buffer = currentBuf.nioBuffer(0, currentBuf.readableBytes()); } + /** + * {@inheritDoc} + * + * Close resources except {@link #dataInputPlus} as that needs to remain open for other streaming activity. + */ @Override public void close() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org