Author: jbellis
Date: Thu Oct  6 15:22:04 2011
New Revision: 1179662

URL: http://svn.apache.org/viewvc?rev=1179662&view=rev
Log:
close scrubbed sstable fd before deleting it
patch by jbellis; reviewed by slebresne for CASSANDRA-3318

Modified:
    cassandra/branches/cassandra-1.0.0/CHANGES.txt
    
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java

Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1179662&r1=1179661&r2=1179662&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Thu Oct  6 15:22:04 2011
@@ -1,4 +1,5 @@
 1.0.0-final
+ * close scrubbed sstable fd before deleting it (CASSANDRA-3318)
  * fix bug preventing obsolete commitlog segments from being removed
    (CASSANDRA-3269)
  * tolerate whitespace in seed CDL (CASSANDRA-3263)

Modified: 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1179662&r1=1179661&r2=1179662&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Thu Oct  6 15:22:04 2011
@@ -483,10 +483,13 @@ public class CompactionManager implement
         // row header (key or data size) is corrupt. (This means our position 
in the index file will be one row
         // "ahead" of the data file.)
         final RandomAccessReader dataFile = sstable.openDataReader(true);
-
-        String indexFilename = 
sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
-        RandomAccessReader indexFile = RandomAccessReader.open(new 
File(indexFilename), true);
+        RandomAccessReader indexFile = RandomAccessReader.open(new 
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
         ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
+        executor.beginCompaction(scrubInfo);
+
+        SSTableWriter writer = null;
+        SSTableReader newSstable = null;
+        int goodRows = 0, badRows = 0, emptyRows = 0;
 
         try
         {
@@ -497,170 +500,155 @@ public class CompactionManager implement
                 assert firstRowPositionFromIndex == 0 : 
firstRowPositionFromIndex;
             }
 
-            SSTableReader newSstable = null;
-
-            // errors when creating the writer may leave empty temp files.
-            SSTableWriter writer = maybeCreateWriter(cfs,
-                                                     compactionFileLocation,
-                                                     expectedBloomFilterSize,
-                                                     null,
-                                                     
Collections.singletonList(sstable));
-
-            int goodRows = 0, badRows = 0, emptyRows = 0;
+            // TODO errors when creating the writer may leave empty temp files.
+            writer = maybeCreateWriter(cfs, compactionFileLocation, 
expectedBloomFilterSize, null, Collections.singletonList(sstable));
 
-            executor.beginCompaction(scrubInfo);
-
-            try
+            while (!dataFile.isEOF())
             {
-                while (!dataFile.isEOF())
+                long rowStart = dataFile.getFilePointer();
+                if (logger.isDebugEnabled())
+                    logger.debug("Reading row at " + rowStart);
+
+                DecoratedKey key = null;
+                long dataSize = -1;
+                try
                 {
-                    long rowStart = dataFile.getFilePointer();
+                    key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+                    dataSize = sstable.descriptor.hasIntRowSize ? 
dataFile.readInt() : dataFile.readLong();
                     if (logger.isDebugEnabled())
-                        logger.debug("Reading row at " + rowStart);
+                        logger.debug(String.format("row %s is %s bytes", 
ByteBufferUtil.bytesToHex(key.key), dataSize));
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    // check for null key below
+                }
 
-                    DecoratedKey key = null;
-                    long dataSize = -1;
-                    try
-                    {
-                        key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
-                        dataSize = sstable.descriptor.hasIntRowSize ? 
dataFile.readInt() : dataFile.readLong();
-                        if (logger.isDebugEnabled())
-                            logger.debug(String.format("row %s is %s bytes", 
ByteBufferUtil.bytesToHex(key.key), dataSize));
-                    }
-                    catch (Throwable th)
-                    {
-                        throwIfFatal(th);
-                        // check for null key below
-                    }
+                ByteBuffer currentIndexKey = nextIndexKey;
+                long nextRowPositionFromIndex;
+                try
+                {
+                    nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
+                    nextRowPositionFromIndex = indexFile.isEOF() ? 
dataFile.length() : indexFile.readLong();
+                }
+                catch (Throwable th)
+                {
+                    logger.warn("Error reading index file", th);
+                    nextIndexKey = null;
+                    nextRowPositionFromIndex = dataFile.length();
+                }
 
-                    ByteBuffer currentIndexKey = nextIndexKey;
-                    long nextRowPositionFromIndex;
-                    try
-                    {
-                        nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
-                        nextRowPositionFromIndex = indexFile.isEOF() ? 
dataFile.length() : indexFile.readLong();
-                    }
-                    catch (Throwable th)
-                    {
-                        logger.warn("Error reading index file", th);
-                        nextIndexKey = null;
-                        nextRowPositionFromIndex = dataFile.length();
-                    }
-
-                    long dataStart = dataFile.getFilePointer();
-                    long dataStartFromIndex = currentIndexKey == null
-                                            ? -1
-                                            : rowStart + 2 + 
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
-                    long dataSizeFromIndex = nextRowPositionFromIndex - 
dataStartFromIndex;
-                    assert currentIndexKey != null || indexFile.isEOF();
-                    if (logger.isDebugEnabled() && currentIndexKey != null)
-                        logger.debug(String.format("Index doublecheck: row %s 
is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
+                long dataStart = dataFile.getFilePointer();
+                long dataStartFromIndex = currentIndexKey == null
+                                        ? -1
+                                        : rowStart + 2 + 
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+                long dataSizeFromIndex = nextRowPositionFromIndex - 
dataStartFromIndex;
+                assert currentIndexKey != null || indexFile.isEOF();
+                if (logger.isDebugEnabled() && currentIndexKey != null)
+                    logger.debug(String.format("Index doublecheck: row %s is 
%s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
 
-                    writer.mark();
-                    try
+                writer.mark();
+                try
+                {
+                    if (key == null)
+                        throw new IOError(new IOException("Unable to read row 
key from data file"));
+                    if (dataSize > dataFile.length())
+                        throw new IOError(new IOException("Impossible row size 
" + dataSize));
+                    SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                    AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
+                    if (compactedRow.isEmpty())
                     {
-                        if (key == null)
-                            throw new IOError(new IOException("Unable to read 
row key from data file"));
-                        if (dataSize > dataFile.length())
-                            throw new IOError(new IOException("Impossible row 
size " + dataSize));
-                        SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
-                        AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
-                        if (compactedRow.isEmpty())
-                        {
-                            emptyRows++;
-                        }
-                        else
-                        {
-                            writer.append(compactedRow);
-                            goodRows++;
-                        }
-                        if (!key.key.equals(currentIndexKey) || dataStart != 
dataStartFromIndex)
-                            logger.warn("Index file contained a different key 
or row size; using key from data file");
+                        emptyRows++;
                     }
-                    catch (Throwable th)
+                    else
                     {
-                        throwIfFatal(th);
-                        logger.warn("Non-fatal error reading row (stacktrace 
follows)", th);
-                        writer.resetAndTruncate();
-
-                        if (currentIndexKey != null
-                            && (key == null || 
!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize 
!= dataSizeFromIndex))
+                        writer.append(compactedRow);
+                        goodRows++;
+                    }
+                    if (!key.key.equals(currentIndexKey) || dataStart != 
dataStartFromIndex)
+                        logger.warn("Index file contained a different key or 
row size; using key from data file");
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    logger.warn("Non-fatal error reading row (stacktrace 
follows)", th);
+                    writer.resetAndTruncate();
+
+                    if (currentIndexKey != null
+                        && (key == null || !key.key.equals(currentIndexKey) || 
dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+                    {
+                        logger.info(String.format("Retrying from row index; 
data is %s bytes starting at %s",
+                                                  dataSizeFromIndex, 
dataStartFromIndex));
+                        key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, currentIndexKey);
+                        try
                         {
-                            logger.info(String.format("Retrying from row 
index; data is %s bytes starting at %s",
-                                                      dataSizeFromIndex, 
dataStartFromIndex));
-                            key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, currentIndexKey);
-                            try
+                            SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, 
dataSizeFromIndex, true);
+                            AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
+                            if (compactedRow.isEmpty())
                             {
-                                SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, 
dataSizeFromIndex, true);
-                                AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
-                                if (compactedRow.isEmpty())
-                                {
-                                    emptyRows++;
-                                }
-                                else
-                                {
-                                    writer.append(compactedRow);
-                                    goodRows++;
-                                }
+                                emptyRows++;
                             }
-                            catch (Throwable th2)
+                            else
                             {
-                                throwIfFatal(th2);
-                                // Skipping rows is dangerous for counters 
(see CASSANDRA-2759)
-                                if (isCommutative)
-                                    throw new IOError(th2);
-
-                                logger.warn("Retry failed too.  Skipping to 
next row (retry's stacktrace follows)", th2);
-                                writer.resetAndTruncate();
-                                dataFile.seek(nextRowPositionFromIndex);
-                                badRows++;
+                                writer.append(compactedRow);
+                                goodRows++;
                             }
                         }
-                        else
+                        catch (Throwable th2)
                         {
+                            throwIfFatal(th2);
                             // Skipping rows is dangerous for counters (see 
CASSANDRA-2759)
                             if (isCommutative)
-                                throw new IOError(th);
+                                throw new IOError(th2);
 
-                            logger.warn("Row at " + dataStart + " is 
unreadable; skipping to next");
-                            if (currentIndexKey != null)
-                                dataFile.seek(nextRowPositionFromIndex);
+                            logger.warn("Retry failed too.  Skipping to next 
row (retry's stacktrace follows)", th2);
+                            writer.resetAndTruncate();
+                            dataFile.seek(nextRowPositionFromIndex);
                             badRows++;
                         }
                     }
+                    else
+                    {
+                        // Skipping rows is dangerous for counters (see 
CASSANDRA-2759)
+                        if (isCommutative)
+                            throw new IOError(th);
+
+                        logger.warn("Row at " + dataStart + " is unreadable; 
skipping to next");
+                        if (currentIndexKey != null)
+                            dataFile.seek(nextRowPositionFromIndex);
+                        badRows++;
+                    }
                 }
-
-                if (writer.getFilePointer() > 0)
-                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
-            }
-            finally
-            {
-                writer.cleanupIfNecessary();
             }
 
-            if (newSstable != null)
-            {
-                cfs.replaceCompactedSSTables(Arrays.asList(sstable), 
Arrays.asList(newSstable));
-                logger.info("Scrub of " + sstable + " complete: " + goodRows + 
" rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
-                if (badRows > 0)
-                    logger.warn("Unable to recover " + badRows + " rows that 
were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  
You can also run nodetool repair to transfer the data from a healthy replica, 
if any");
-            }
-            else
-            {
-                cfs.markCompacted(Arrays.asList(sstable));
-                if (badRows > 0)
-                    logger.warn("No valid rows found while scrubbing " + 
sstable + "; it is marked for deletion now. If you want to attempt manual 
recovery, you can find a copy in the pre-scrub snapshot");
-                else
-                    logger.info("Scrub of " + sstable + " complete; looks like 
all " + emptyRows + " rows were tombstoned");
-            }
+            if (writer.getFilePointer() > 0)
+                newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
         }
         finally
         {
+            if (writer != null)
+                writer.cleanupIfNecessary();
             FileUtils.closeQuietly(dataFile);
             FileUtils.closeQuietly(indexFile);
 
             executor.finishCompaction(scrubInfo);
         }
+
+        if (newSstable == null)
+        {
+            cfs.markCompacted(Arrays.asList(sstable));
+            if (badRows > 0)
+                logger.warn("No valid rows found while scrubbing " + sstable + 
"; it is marked for deletion now. If you want to attempt manual recovery, you 
can find a copy in the pre-scrub snapshot");
+            else
+                logger.info("Scrub of " + sstable + " complete; looks like all 
" + emptyRows + " rows were tombstoned");
+        }
+        else
+        {
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), 
Arrays.asList(newSstable));
+            logger.info("Scrub of " + sstable + " complete: " + goodRows + " 
rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+            if (badRows > 0)
+                logger.warn("Unable to recover " + badRows + " rows that were 
skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can 
also run nodetool repair to transfer the data from a healthy replica, if any");
+        }
     }
 
     private void throwIfFatal(Throwable th)


Reply via email to