This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 8e28dc0 Harden resource management on SSTable components to prevent future leaks 8e28dc0 is described below commit 8e28dc0ebac3d80db43acfe76cfb45c0cb17a5c8 Author: Josh McKenzie <jmcken...@apache.org> AuthorDate: Mon Nov 29 11:27:17 2021 -0500 Harden resource management on SSTable components to prevent future leaks Patch by Josh McKenzie; reviewed by Caleb Rackliffe and Marcus Erikkson for CASSANDRA-17174 --- CHANGES.txt | 1 + .../io/sstable/format/big/BigTableWriter.java | 140 +++++++++++++-------- 2 files changed, 90 insertions(+), 51 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5203504..c8cc544 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Harden resource management on SSTable components to prevent future leaks (CASSANDRA-17174) * Make nodes more resilient to local unrelated files during startup (CASSANDRA-17082) * repair prepare message would produce a wrong error message if network timeout happened rather than reply wait timeout (CASSANDRA-16992) * Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159) diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 889547d..dc43380 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.*; +import java.util.stream.Stream; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; @@ -50,6 +51,7 @@ import org.apache.cassandra.io.util.*; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.SharedCloseableImpl; import org.apache.cassandra.utils.concurrent.Transactional; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @@ -338,32 +340,50 @@ public class BigTableWriter extends SSTableWriter if (boundary == null) return null; - StatsMetadata stats = statsMetadata(); - assert boundary.indexLength > 0 && boundary.dataLength > 0; - // open the reader early - IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner, boundary); - long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length(); - int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size()); - FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete(boundary.indexLength); - if (compression) - dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(boundary.dataLength)); - int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); - FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete(boundary.dataLength); - invalidateCacheAtBoundary(dfile); - SSTableReader sstable = SSTableReader.internalOpen(descriptor, - components, metadata, - ifile, dfile, - indexSummary, - iwriter.bf.sharedCopy(), - maxDataAge, - stats, - SSTableReader.OpenReason.EARLY, - header); - - // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) - sstable.first = getMinimalKey(first); - sstable.last = getMinimalKey(boundary.lastKey); - return sstable; + IndexSummary indexSummary = null; + FileHandle ifile = null; + FileHandle dfile = null; + SSTableReader sstable = null; + + try + { + StatsMetadata stats = statsMetadata(); + assert boundary.indexLength > 0 && boundary.dataLength > 0; + // open the reader early + indexSummary = iwriter.summary.build(metadata().partitioner, boundary); + long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length(); + int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size()); + ifile = iwriter.builder.bufferSize(indexBufferSize).complete(boundary.indexLength); + if (compression) + dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(boundary.dataLength)); + int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); + dfile = dbuilder.bufferSize(dataBufferSize).complete(boundary.dataLength); + invalidateCacheAtBoundary(dfile); + sstable = SSTableReader.internalOpen(descriptor, + components, metadata, + ifile, dfile, + indexSummary, + iwriter.bf.sharedCopy(), + maxDataAge, + stats, + SSTableReader.OpenReason.EARLY, + header); + + // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(boundary.lastKey); + return sstable; + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + // If we successfully created our sstable, we can rely on its InstanceTidier to clean things up for us + if (sstable != null) + sstable.selfRef().release(); + else + Stream.of(indexSummary, ifile, dfile).filter(Objects::nonNull).forEach(SharedCloseableImpl::close); + throw t; + } } void invalidateCacheAtBoundary(FileHandle dfile) @@ -390,31 +410,49 @@ public class BigTableWriter extends SSTableWriter if (maxDataAge < 0) maxDataAge = currentTimeMillis(); - StatsMetadata stats = statsMetadata(); - // finalize in-memory state for the reader - IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner); - long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length(); - int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); - int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size()); - FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete(); - if (compression) - dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(0)); - FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete(); - invalidateCacheAtBoundary(dfile); - SSTableReader sstable = SSTableReader.internalOpen(descriptor, - components, - metadata, - ifile, - dfile, - indexSummary, - iwriter.bf.sharedCopy(), - maxDataAge, - stats, - openReason, - header); - sstable.first = getMinimalKey(first); - sstable.last = getMinimalKey(last); - return sstable; + IndexSummary indexSummary = null; + FileHandle ifile = null; + FileHandle dfile = null; + SSTableReader sstable = null; + + try + { + StatsMetadata stats = statsMetadata(); + // finalize in-memory state for the reader + indexSummary = iwriter.summary.build(metadata().partitioner); + long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length(); + int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); + int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size()); + ifile = iwriter.builder.bufferSize(indexBufferSize).complete(); + if (compression) + dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(0)); + dfile = dbuilder.bufferSize(dataBufferSize).complete(); + invalidateCacheAtBoundary(dfile); + sstable = SSTableReader.internalOpen(descriptor, + components, + metadata, + ifile, + dfile, + indexSummary, + iwriter.bf.sharedCopy(), + maxDataAge, + stats, + openReason, + header); + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(last); + return sstable; + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + // If we successfully created our sstable, we can rely on its InstanceTidier to clean things up for us + if (sstable != null) + sstable.selfRef().release(); + else + Stream.of(indexSummary, ifile, dfile).filter(Objects::nonNull).forEach(SharedCloseableImpl::close); + throw t; + } } protected SSTableWriter.TransactionalProxy txnProxy() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org