Author: jbellis Date: Thu Oct 6 15:22:04 2011 New Revision: 1179662 URL: http://svn.apache.org/viewvc?rev=1179662&view=rev Log: close scrubbed sstable fd before deleting it patch by jbellis; reviewed by slebresne for CASSANDRA-3318
Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1179662&r1=1179661&r2=1179662&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Thu Oct 6 15:22:04 2011 @@ -1,4 +1,5 @@ 1.0.0-final + * close scrubbed sstable fd before deleting it (CASSANDRA-3318) * fix bug preventing obsolete commitlog segments from being removed (CASSANDRA-3269) * tolerate whitespace in seed CDL (CASSANDRA-3263) Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1179662&r1=1179661&r2=1179662&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Oct 6 15:22:04 2011 @@ -483,10 +483,13 @@ public class CompactionManager implement // row header (key or data size) is corrupt. (This means our position in the index file will be one row // "ahead" of the data file.) final RandomAccessReader dataFile = sstable.openDataReader(true); - - String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX); - RandomAccessReader indexFile = RandomAccessReader.open(new File(indexFilename), true); + RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true); ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable); + executor.beginCompaction(scrubInfo); + + SSTableWriter writer = null; + SSTableReader newSstable = null; + int goodRows = 0, badRows = 0, emptyRows = 0; try { @@ -497,170 +500,155 @@ public class CompactionManager implement assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } - SSTableReader newSstable = null; - - // errors when creating the writer may leave empty temp files. - SSTableWriter writer = maybeCreateWriter(cfs, - compactionFileLocation, - expectedBloomFilterSize, - null, - Collections.singletonList(sstable)); - - int goodRows = 0, badRows = 0, emptyRows = 0; + // TODO errors when creating the writer may leave empty temp files. + writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable)); - executor.beginCompaction(scrubInfo); - - try + while (!dataFile.isEOF()) { - while (!dataFile.isEOF()) + long rowStart = dataFile.getFilePointer(); + if (logger.isDebugEnabled()) + logger.debug("Reading row at " + rowStart); + + DecoratedKey key = null; + long dataSize = -1; + try { - long rowStart = dataFile.getFilePointer(); + key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile)); + dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong(); if (logger.isDebugEnabled()) - logger.debug("Reading row at " + rowStart); + logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize)); + } + catch (Throwable th) + { + throwIfFatal(th); + // check for null key below + } - DecoratedKey key = null; - long dataSize = -1; - try - { - key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile)); - dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong(); - if (logger.isDebugEnabled()) - logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize)); - } - catch (Throwable th) - { - throwIfFatal(th); - // check for null key below - } + ByteBuffer currentIndexKey = nextIndexKey; + long nextRowPositionFromIndex; + try + { + nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); + nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong(); + } + catch (Throwable th) + { + logger.warn("Error reading index file", th); + nextIndexKey = null; + nextRowPositionFromIndex = dataFile.length(); + } - ByteBuffer currentIndexKey = nextIndexKey; - long nextRowPositionFromIndex; - try - { - nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); - nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong(); - } - catch (Throwable th) - { - logger.warn("Error reading index file", th); - nextIndexKey = null; - nextRowPositionFromIndex = dataFile.length(); - } - - long dataStart = dataFile.getFilePointer(); - long dataStartFromIndex = currentIndexKey == null - ? -1 - : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8); - long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; - assert currentIndexKey != null || indexFile.isEOF(); - if (logger.isDebugEnabled() && currentIndexKey != null) - logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex)); + long dataStart = dataFile.getFilePointer(); + long dataStartFromIndex = currentIndexKey == null + ? -1 + : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8); + long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; + assert currentIndexKey != null || indexFile.isEOF(); + if (logger.isDebugEnabled() && currentIndexKey != null) + logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex)); - writer.mark(); - try + writer.mark(); + try + { + if (key == null) + throw new IOError(new IOException("Unable to read row key from data file")); + if (dataSize > dataFile.length()) + throw new IOError(new IOException("Impossible row size " + dataSize)); + SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true); + AbstractCompactedRow compactedRow = controller.getCompactedRow(row); + if (compactedRow.isEmpty()) { - if (key == null) - throw new IOError(new IOException("Unable to read row key from data file")); - if (dataSize > dataFile.length()) - throw new IOError(new IOException("Impossible row size " + dataSize)); - SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true); - AbstractCompactedRow compactedRow = controller.getCompactedRow(row); - if (compactedRow.isEmpty()) - { - emptyRows++; - } - else - { - writer.append(compactedRow); - goodRows++; - } - if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex) - logger.warn("Index file contained a different key or row size; using key from data file"); + emptyRows++; } - catch (Throwable th) + else { - throwIfFatal(th); - logger.warn("Non-fatal error reading row (stacktrace follows)", th); - writer.resetAndTruncate(); - - if (currentIndexKey != null - && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex)) + writer.append(compactedRow); + goodRows++; + } + if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex) + logger.warn("Index file contained a different key or row size; using key from data file"); + } + catch (Throwable th) + { + throwIfFatal(th); + logger.warn("Non-fatal error reading row (stacktrace follows)", th); + writer.resetAndTruncate(); + + if (currentIndexKey != null + && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex)) + { + logger.info(String.format("Retrying from row index; data is %s bytes starting at %s", + dataSizeFromIndex, dataStartFromIndex)); + key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey); + try { - logger.info(String.format("Retrying from row index; data is %s bytes starting at %s", - dataSizeFromIndex, dataStartFromIndex)); - key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey); - try + SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true); + AbstractCompactedRow compactedRow = controller.getCompactedRow(row); + if (compactedRow.isEmpty()) { - SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true); - AbstractCompactedRow compactedRow = controller.getCompactedRow(row); - if (compactedRow.isEmpty()) - { - emptyRows++; - } - else - { - writer.append(compactedRow); - goodRows++; - } + emptyRows++; } - catch (Throwable th2) + else { - throwIfFatal(th2); - // Skipping rows is dangerous for counters (see CASSANDRA-2759) - if (isCommutative) - throw new IOError(th2); - - logger.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2); - writer.resetAndTruncate(); - dataFile.seek(nextRowPositionFromIndex); - badRows++; + writer.append(compactedRow); + goodRows++; } } - else + catch (Throwable th2) { + throwIfFatal(th2); // Skipping rows is dangerous for counters (see CASSANDRA-2759) if (isCommutative) - throw new IOError(th); + throw new IOError(th2); - logger.warn("Row at " + dataStart + " is unreadable; skipping to next"); - if (currentIndexKey != null) - dataFile.seek(nextRowPositionFromIndex); + logger.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2); + writer.resetAndTruncate(); + dataFile.seek(nextRowPositionFromIndex); badRows++; } } + else + { + // Skipping rows is dangerous for counters (see CASSANDRA-2759) + if (isCommutative) + throw new IOError(th); + + logger.warn("Row at " + dataStart + " is unreadable; skipping to next"); + if (currentIndexKey != null) + dataFile.seek(nextRowPositionFromIndex); + badRows++; + } } - - if (writer.getFilePointer() > 0) - newSstable = writer.closeAndOpenReader(sstable.maxDataAge); - } - finally - { - writer.cleanupIfNecessary(); } - if (newSstable != null) - { - cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable)); - logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped"); - if (badRows > 0) - logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any"); - } - else - { - cfs.markCompacted(Arrays.asList(sstable)); - if (badRows > 0) - logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot"); - else - logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned"); - } + if (writer.getFilePointer() > 0) + newSstable = writer.closeAndOpenReader(sstable.maxDataAge); } finally { + if (writer != null) + writer.cleanupIfNecessary(); FileUtils.closeQuietly(dataFile); FileUtils.closeQuietly(indexFile); executor.finishCompaction(scrubInfo); } + + if (newSstable == null) + { + cfs.markCompacted(Arrays.asList(sstable)); + if (badRows > 0) + logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot"); + else + logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned"); + } + else + { + cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable)); + logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped"); + if (badRows > 0) + logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any"); + } } private void throwIfFatal(Throwable th)