This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e28dc0  Harden resource management on SSTable components to prevent 
future leaks
8e28dc0 is described below

commit 8e28dc0ebac3d80db43acfe76cfb45c0cb17a5c8
Author: Josh McKenzie <jmcken...@apache.org>
AuthorDate: Mon Nov 29 11:27:17 2021 -0500

    Harden resource management on SSTable components to prevent future leaks
    
    Patch by Josh McKenzie; reviewed by Caleb Rackliffe and Marcus Erikkson for 
CASSANDRA-17174
---
 CHANGES.txt                                        |   1 +
 .../io/sstable/format/big/BigTableWriter.java      | 140 +++++++++++++--------
 2 files changed, 90 insertions(+), 51 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5203504..c8cc544 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Harden resource management on SSTable components to prevent future leaks 
(CASSANDRA-17174)
  * Make nodes more resilient to local unrelated files during startup 
(CASSANDRA-17082)
  * repair prepare message would produce a wrong error message if network 
timeout happened rather than reply wait timeout (CASSANDRA-16992)
  * Log queries that fail on timeout or unavailable errors up to once per 
minute by default (CASSANDRA-17159)
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 889547d..dc43380 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.stream.Stream;
 
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
@@ -50,6 +51,7 @@ import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
@@ -338,32 +340,50 @@ public class BigTableWriter extends SSTableWriter
         if (boundary == null)
             return null;
 
-        StatsMetadata stats = statsMetadata();
-        assert boundary.indexLength > 0 && boundary.dataLength > 0;
-        // open the reader early
-        IndexSummary indexSummary = 
iwriter.summary.build(metadata().partitioner, boundary);
-        long indexFileLength = new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
-        int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength 
/ indexSummary.size());
-        FileHandle ifile = 
iwriter.builder.bufferSize(indexBufferSize).complete(boundary.indexLength);
-        if (compression)
-            dbuilder.withCompressionMetadata(((CompressedSequentialWriter) 
dataFile).open(boundary.dataLength));
-        int dataBufferSize = 
optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
-        FileHandle dfile = 
dbuilder.bufferSize(dataBufferSize).complete(boundary.dataLength);
-        invalidateCacheAtBoundary(dfile);
-        SSTableReader sstable = SSTableReader.internalOpen(descriptor,
-                                                           components, 
metadata,
-                                                           ifile, dfile,
-                                                           indexSummary,
-                                                           
iwriter.bf.sharedCopy(), 
-                                                           maxDataAge, 
-                                                           stats, 
-                                                           
SSTableReader.OpenReason.EARLY, 
-                                                           header);
-
-        // now it's open, find the ACTUAL last readable key (i.e. for which 
the data file has also been flushed)
-        sstable.first = getMinimalKey(first);
-        sstable.last = getMinimalKey(boundary.lastKey);
-        return sstable;
+        IndexSummary indexSummary = null;
+        FileHandle ifile = null;
+        FileHandle dfile = null;
+        SSTableReader sstable = null;
+
+        try
+        {
+            StatsMetadata stats = statsMetadata();
+            assert boundary.indexLength > 0 && boundary.dataLength > 0;
+            // open the reader early
+            indexSummary = iwriter.summary.build(metadata().partitioner, 
boundary);
+            long indexFileLength = new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+            int indexBufferSize = 
optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+            ifile = 
iwriter.builder.bufferSize(indexBufferSize).complete(boundary.indexLength);
+            if (compression)
+                dbuilder.withCompressionMetadata(((CompressedSequentialWriter) 
dataFile).open(boundary.dataLength));
+            int dataBufferSize = 
optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+            dfile = 
dbuilder.bufferSize(dataBufferSize).complete(boundary.dataLength);
+            invalidateCacheAtBoundary(dfile);
+            sstable = SSTableReader.internalOpen(descriptor,
+                                                 components, metadata,
+                                                 ifile, dfile,
+                                                 indexSummary,
+                                                 iwriter.bf.sharedCopy(),
+                                                 maxDataAge,
+                                                 stats,
+                                                 
SSTableReader.OpenReason.EARLY,
+                                                 header);
+
+            // now it's open, find the ACTUAL last readable key (i.e. for 
which the data file has also been flushed)
+            sstable.first = getMinimalKey(first);
+            sstable.last = getMinimalKey(boundary.lastKey);
+            return sstable;
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            // If we successfully created our sstable, we can rely on its 
InstanceTidier to clean things up for us
+            if (sstable != null)
+                sstable.selfRef().release();
+            else
+                Stream.of(indexSummary, ifile, 
dfile).filter(Objects::nonNull).forEach(SharedCloseableImpl::close);
+            throw t;
+        }
     }
 
     void invalidateCacheAtBoundary(FileHandle dfile)
@@ -390,31 +410,49 @@ public class BigTableWriter extends SSTableWriter
         if (maxDataAge < 0)
             maxDataAge = currentTimeMillis();
 
-        StatsMetadata stats = statsMetadata();
-        // finalize in-memory state for the reader
-        IndexSummary indexSummary = 
iwriter.summary.build(metadata().partitioner);
-        long indexFileLength = new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
-        int dataBufferSize = 
optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
-        int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength 
/ indexSummary.size());
-        FileHandle ifile = 
iwriter.builder.bufferSize(indexBufferSize).complete();
-        if (compression)
-            dbuilder.withCompressionMetadata(((CompressedSequentialWriter) 
dataFile).open(0));
-        FileHandle dfile = dbuilder.bufferSize(dataBufferSize).complete();
-        invalidateCacheAtBoundary(dfile);
-        SSTableReader sstable = SSTableReader.internalOpen(descriptor,
-                                                           components,
-                                                           metadata,
-                                                           ifile,
-                                                           dfile,
-                                                           indexSummary,
-                                                           
iwriter.bf.sharedCopy(),
-                                                           maxDataAge,
-                                                           stats,
-                                                           openReason,
-                                                           header);
-        sstable.first = getMinimalKey(first);
-        sstable.last = getMinimalKey(last);
-        return sstable;
+        IndexSummary indexSummary = null;
+        FileHandle ifile = null;
+        FileHandle dfile = null;
+        SSTableReader sstable = null;
+
+        try
+        {
+            StatsMetadata stats = statsMetadata();
+            // finalize in-memory state for the reader
+            indexSummary = iwriter.summary.build(metadata().partitioner);
+            long indexFileLength = new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+            int dataBufferSize = 
optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+            int indexBufferSize = 
optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+            ifile = iwriter.builder.bufferSize(indexBufferSize).complete();
+            if (compression)
+                dbuilder.withCompressionMetadata(((CompressedSequentialWriter) 
dataFile).open(0));
+            dfile = dbuilder.bufferSize(dataBufferSize).complete();
+            invalidateCacheAtBoundary(dfile);
+            sstable = SSTableReader.internalOpen(descriptor,
+                                                 components,
+                                                 metadata,
+                                                 ifile,
+                                                 dfile,
+                                                 indexSummary,
+                                                 iwriter.bf.sharedCopy(),
+                                                 maxDataAge,
+                                                 stats,
+                                                 openReason,
+                                                 header);
+            sstable.first = getMinimalKey(first);
+            sstable.last = getMinimalKey(last);
+            return sstable;
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            // If we successfully created our sstable, we can rely on its 
InstanceTidier to clean things up for us
+            if (sstable != null)
+                sstable.selfRef().release();
+            else
+                Stream.of(indexSummary, ifile, 
dfile).filter(Objects::nonNull).forEach(SharedCloseableImpl::close);
+            throw t;
+        }
     }
 
     protected SSTableWriter.TransactionalProxy txnProxy()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to