Updated Branches: refs/heads/trunk 652ae9a64 -> 9ecda7230
clean up SequentialWriter and friends patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-2116 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ecda723 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ecda723 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ecda723 Branch: refs/heads/trunk Commit: 9ecda7230c719784a203741d5d548b505cae2969 Parents: 844b9c4 Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Jul 30 15:40:43 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Jul 30 15:40:43 2012 -0500 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionTask.java | 2 +- .../db/compaction/LeveledCompactionTask.java | 3 +- .../io/compress/CompressedSequentialWriter.java | 132 +++++++++++---- .../cassandra/io/compress/DeflateCompressor.java | 2 +- .../apache/cassandra/io/sstable/SSTableWriter.java | 47 ++--- .../apache/cassandra/io/util/SequentialWriter.java | 95 ++++++++--- 6 files changed, 189 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 796d56b..7a88c68 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -234,7 +234,7 @@ public class CompactionTask extends AbstractCompactionTask } //extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size - protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException + protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) { return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index ebc91d7..74e8401 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.compaction; -import java.io.IOException; import java.util.Collection; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -62,7 +61,7 @@ public class LeveledCompactionTask extends CompactionTask } @Override - protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException + protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) { return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 6d78287..00eb5a7 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -17,11 +17,15 @@ */ package org.apache.cassandra.io.compress; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.zip.CRC32; import java.util.zip.Checksum; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.SSTableMetadata.Collector; import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.SequentialWriter; @@ -77,30 +81,46 @@ public class CompressedSequentialWriter extends SequentialWriter } @Override - public long getOnDiskFilePointer() throws IOException + public long getOnDiskFilePointer() { - return out.getFilePointer(); + try + { + return out.getFilePointer(); + } + catch (IOException e) + { + throw new FSReadError(e, getPath()); + } } @Override - public void sync() throws IOException + public void sync() { throw new UnsupportedOperationException(); } @Override - public void flush() throws IOException + public void flush() { throw new UnsupportedOperationException(); } @Override - protected void flushData() throws IOException + protected void flushData() { seekToChunkStart(); - // compressing data with buffer re-use - int compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0); + + int compressedLength; + try + { + // compressing data with buffer re-use + compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0); + } + catch (IOException e) + { + throw new RuntimeException("Compression exception", e); // shouldn't happen + } originalSize += validBufferBytes; compressedSize += compressedLength; @@ -108,16 +128,23 @@ public class CompressedSequentialWriter extends SequentialWriter // update checksum checksum.update(buffer, 0, validBufferBytes); - // write an offset of the newly written chunk to the index file - metadataWriter.writeLong(chunkOffset); - chunkCount++; + try + { + // write an offset of the newly written chunk to the index file + metadataWriter.writeLong(chunkOffset); + chunkCount++; - assert compressedLength <= compressed.buffer.length; + assert compressedLength <= compressed.buffer.length; - // write data itself - out.write(compressed.buffer, 0, compressedLength); - // write corresponding checksum - out.writeInt((int) checksum.getValue()); + // write data itself + out.write(compressed.buffer, 0, compressedLength); + // write corresponding checksum + out.writeInt((int) checksum.getValue()); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } // reset checksum object to the blank state for re-use checksum.reset(); @@ -133,11 +160,11 @@ public class CompressedSequentialWriter extends SequentialWriter } @Override - public synchronized void resetAndTruncate(FileMark mark) throws IOException + public synchronized void resetAndTruncate(FileMark mark) { assert mark instanceof CompressedFileWriterMark; - CompressedFileWriterMark realMark = ((CompressedFileWriterMark) mark); + CompressedFileWriterMark realMark = (CompressedFileWriterMark) mark; // reset position current = realMark.uncDataOffset; @@ -161,16 +188,39 @@ public class CompressedSequentialWriter extends SequentialWriter if (compressed.buffer.length < chunkSize) compressed.buffer = new byte[chunkSize]; - out.seek(chunkOffset); - out.readFully(compressed.buffer, 0, chunkSize); - - // decompress data chunk and store its length - int validBytes = compressor.uncompress(compressed.buffer, 0, chunkSize, buffer, 0); - - checksum.update(buffer, 0, validBytes); - - if (out.readInt() != (int) checksum.getValue()) - throw new CorruptBlockException(getPath(), chunkOffset, chunkSize); + try + { + out.seek(chunkOffset); + out.readFully(compressed.buffer, 0, chunkSize); + + int validBytes; + try + { + // decompress data chunk and store its length + validBytes = compressor.uncompress(compressed.buffer, 0, chunkSize, buffer, 0); + } + catch (IOException e) + { + throw new CorruptBlockException(getPath(), chunkOffset, chunkSize); + } + + checksum.update(buffer, 0, validBytes); + + if (out.readInt() != (int) checksum.getValue()) + throw new CorruptBlockException(getPath(), chunkOffset, chunkSize); + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, getPath()); + } + catch (EOFException e) + { + throw new CorruptSSTableException(new CorruptBlockException(getPath(), chunkOffset, chunkSize), getPath()); + } + catch (IOException e) + { + throw new FSReadError(e, getPath()); + } checksum.reset(); @@ -186,17 +236,24 @@ public class CompressedSequentialWriter extends SequentialWriter /** * Seek to the offset where next compressed data chunk should be stored. - * - * @throws IOException on any I/O error. */ - private void seekToChunkStart() throws IOException + private void seekToChunkStart() { - if (out.getFilePointer() != chunkOffset) - out.seek(chunkOffset); + if (getOnDiskFilePointer() != chunkOffset) + { + try + { + out.seek(chunkOffset); + } + catch (IOException e) + { + throw new FSReadError(e, getPath()); + } + } } @Override - public void close() throws IOException + public void close() { if (buffer == null) return; // already closed @@ -204,7 +261,14 @@ public class CompressedSequentialWriter extends SequentialWriter super.close(); sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize); metadataWriter.finalizeHeader(current, chunkCount); - metadataWriter.close(); + try + { + metadataWriter.close(); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java index 97f2ccb..125a08f 100644 --- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java @@ -68,7 +68,7 @@ public class DeflateCompressor implements ICompressor return chunkLength; } - public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset) throws IOException + public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset) { Deflater def = deflater.get(); def.reset(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index e7128f4..2ea71d8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -18,6 +18,7 @@ package org.apache.cassandra.io.sstable; import java.io.*; +import java.nio.channels.ClosedChannelException; import java.util.*; import java.util.regex.Pattern; @@ -114,8 +115,7 @@ public class SSTableWriter extends SSTable iwriter.mark(); } - // NOT necessarily an FS error - not throwing FSWE. - public void resetAndTruncate() throws IOException + public void resetAndTruncate() { dataFile.resetAndTruncate(dataMark); iwriter.resetAndTruncate(); @@ -149,21 +149,21 @@ public class SSTableWriter extends SSTable public RowIndexEntry append(AbstractCompactedRow row) { + long currentPosition = beforeAppend(row.key); try { - long currentPosition = beforeAppend(row.key); ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream); long dataStart = dataFile.getFilePointer(); long dataSize = row.write(dataFile.stream); assert dataSize == dataFile.getFilePointer() - (dataStart + 8) : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8)); - sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); - return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index()); } catch (IOException e) { throw new FSWriteError(e, dataFile.getPath()); } + sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); + return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index()); } public void append(DecoratedKey decoratedKey, ColumnFamily cf) @@ -190,12 +190,12 @@ public class SSTableWriter extends SSTable dataFile.stream.writeInt(builder.writtenAtomCount()); dataFile.stream.write(buffer.getData(), 0, buffer.getLength()); afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index); - sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); } catch (IOException e) { throw new FSWriteError(e, dataFile.getPath()); } + sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); } /** @@ -318,17 +318,8 @@ public class SSTableWriter extends SSTable { // index and filter iwriter.close(); - - try - { - // main data, close will truncate if necessary - dataFile.close(); - } - catch (IOException e) - { - throw new FSWriteError(e, dataFile.getPath()); - } - + // main data, close will truncate if necessary + dataFile.close(); // write sstable statistics SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName()); writeMetadata(descriptor, sstableMetadata); @@ -373,12 +364,12 @@ public class SSTableWriter extends SSTable try { out.write(String.format("%s %s", Hex.bytesToHex(digest), dataFileName).getBytes()); - out.close(); } - catch (IOException e) + catch (ClosedChannelException e) { - throw new FSWriteError(e, out.getPath()); + throw new AssertionError(); // can't happen. } + out.close(); } private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata) @@ -387,12 +378,12 @@ public class SSTableWriter extends SSTable try { SSTableMetadata.serializer.serialize(sstableMetadata, out.stream); - out.close(); } catch (IOException e) { throw new FSWriteError(e, out.getPath()); } + out.close(); } static Descriptor rename(Descriptor tmpdesc, Set<Component> components) @@ -421,7 +412,7 @@ public class SSTableWriter extends SSTable return dataFile.getFilePointer(); } - public long getOnDiskFilePointer() throws IOException + public long getOnDiskFilePointer() { return dataFile.getOnDiskFilePointer(); } @@ -491,17 +482,17 @@ public class SSTableWriter extends SSTable stream.flush(); fos.getFD().sync(); stream.close(); - - // index - long position = indexFile.getFilePointer(); - indexFile.close(); // calls force - FileUtils.truncate(indexFile.getPath(), position); } catch (IOException e) { throw new FSWriteError(e, path); } + // index + long position = indexFile.getFilePointer(); + indexFile.close(); // calls force + FileUtils.truncate(indexFile.getPath(), position); + // finalize in-memory index state summary.complete(); } @@ -511,7 +502,7 @@ public class SSTableWriter extends SSTable mark = indexFile.mark(); } - public void resetAndTruncate() throws IOException + public void resetAndTruncate() { // we can't un-set the bloom filter addition, but extra keys in there are harmless. // we can't reset dbuilder either, but that is the last thing called in afterappend so http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 3377226..77d4fcf 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -23,6 +23,8 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.utils.CLibrary; public class SequentialWriter extends OutputStream @@ -106,18 +108,18 @@ public class SequentialWriter extends OutputStream return new SequentialWriter(file, bufferSize, skipIOCache); } - public void write(int value) throws IOException + public void write(int value) throws ClosedChannelException { singleByteBuffer[0] = (byte) value; write(singleByteBuffer, 0, 1); } - public void write(byte[] buffer) throws IOException + public void write(byte[] buffer) throws ClosedChannelException { write(buffer, 0, buffer.length); } - public void write(byte[] data, int offset, int length) throws IOException + public void write(byte[] data, int offset, int length) throws ClosedChannelException { if (buffer == null) throw new ClosedChannelException(); @@ -137,7 +139,7 @@ public class SequentialWriter extends OutputStream * return the number of bytes written. caller is responsible for setting * isDirty. */ - private int writeAtMost(byte[] data, int offset, int length) throws IOException + private int writeAtMost(byte[] data, int offset, int length) { if (current >= bufferOffset + buffer.length) reBuffer(); @@ -162,19 +164,25 @@ public class SequentialWriter extends OutputStream /** * Synchronize file contents with disk. - * @throws java.io.IOException on any I/O error. */ - public void sync() throws IOException + public void sync() { syncInternal(); } - protected void syncDataOnlyInternal() throws IOException + protected void syncDataOnlyInternal() { - out.getFD().sync(); + try + { + out.getFD().sync(); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } } - protected void syncInternal() throws IOException + protected void syncInternal() { if (syncNeeded) { @@ -195,16 +203,14 @@ public class SequentialWriter extends OutputStream * If buffer is dirty, flush it's contents to the operating system. Does not imply fsync(). * * Currently, for implementation reasons, this also invalidates the buffer. - * - * @throws java.io.IOException on any I/O error. */ @Override - public void flush() throws IOException + public void flush() { flushInternal(); } - protected void flushInternal() throws IOException + protected void flushInternal() { if (isDirty) { @@ -246,11 +252,19 @@ public class SequentialWriter extends OutputStream /** * Override this method instead of overriding flush() - * @throws IOException on any I/O error. + * @throws FSWriteError on any I/O error. */ - protected void flushData() throws IOException + protected void flushData() { - out.write(buffer, 0, validBufferBytes); + try + { + out.write(buffer, 0, validBufferBytes); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + if (digest != null) digest.update(buffer, 0, validBufferBytes); } @@ -267,14 +281,21 @@ public class SequentialWriter extends OutputStream * Furthermore, for compressed files, this value refers to compressed data, while the * writer getFilePointer() refers to uncompressedFile */ - public long getOnDiskFilePointer() throws IOException + public long getOnDiskFilePointer() { return getFilePointer(); } - public long length() throws IOException + public long length() { - return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes); + try + { + return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes); + } + catch (IOException e) + { + throw new FSReadError(e, getPath()); + } } public String getPath() @@ -282,7 +303,7 @@ public class SequentialWriter extends OutputStream return filePath; } - protected void reBuffer() throws IOException + protected void reBuffer() { flushInternal(); resetBuffer(); @@ -304,7 +325,7 @@ public class SequentialWriter extends OutputStream return new BufferedFileWriterMark(current); } - public void resetAndTruncate(FileMark mark) throws IOException + public void resetAndTruncate(FileMark mark) { assert mark instanceof BufferedFileWriterMark; @@ -325,18 +346,32 @@ public class SequentialWriter extends OutputStream truncate(current); // reset channel position - out.seek(current); + try + { + out.seek(current); + } + catch (IOException e) + { + throw new FSReadError(e, getPath()); + } resetBuffer(); } - public void truncate(long toSize) throws IOException + public void truncate(long toSize) { - out.getChannel().truncate(toSize); + try + { + out.getChannel().truncate(toSize); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } } @Override - public void close() throws IOException + public void close() { if (buffer == null) return; // already closed @@ -348,7 +383,15 @@ public class SequentialWriter extends OutputStream if (skipIOCache && bytesSinceCacheFlush > 0) CLibrary.trySkipCache(fd, 0, 0); - out.close(); + try + { + out.close(); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + CLibrary.tryCloseFD(directoryFD); }