Updated Branches:
  refs/heads/trunk 652ae9a64 -> 9ecda7230

clean up SequentialWriter and friends
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-2116


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ecda723
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ecda723
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ecda723

Branch: refs/heads/trunk
Commit: 9ecda7230c719784a203741d5d548b505cae2969
Parents: 844b9c4
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon Jul 30 15:40:43 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Mon Jul 30 15:40:43 2012 -0500

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java    |    2 +-
 .../db/compaction/LeveledCompactionTask.java       |    3 +-
 .../io/compress/CompressedSequentialWriter.java    |  132 +++++++++++----
 .../cassandra/io/compress/DeflateCompressor.java   |    2 +-
 .../apache/cassandra/io/sstable/SSTableWriter.java |   47 ++---
 .../apache/cassandra/io/util/SequentialWriter.java |   95 ++++++++---
 6 files changed, 189 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 796d56b..7a88c68 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -234,7 +234,7 @@ public class CompactionTask extends AbstractCompactionTask
     }
 
     //extensibility point for other strategies that may want to limit the 
upper bounds of the sstable segment size
-    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) 
throws IOException
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
     {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index ebc91d7..74e8401 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -62,7 +61,7 @@ public class LeveledCompactionTask extends CompactionTask
     }
 
     @Override
-    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) 
throws IOException
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
     {
         return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 6d78287..00eb5a7 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,11 +17,15 @@
  */
 package org.apache.cassandra.io.compress;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.SSTableMetadata.Collector;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
@@ -77,30 +81,46 @@ public class CompressedSequentialWriter extends 
SequentialWriter
     }
 
     @Override
-    public long getOnDiskFilePointer() throws IOException
+    public long getOnDiskFilePointer()
     {
-        return out.getFilePointer();
+        try
+        {
+            return out.getFilePointer();
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, getPath());
+        }
     }
 
     @Override
-    public void sync() throws IOException
+    public void sync()
     {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public void flush() throws IOException
+    public void flush()
     {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    protected void flushData() throws IOException
+    protected void flushData()
     {
         seekToChunkStart();
 
-        // compressing data with buffer re-use
-        int compressedLength = compressor.compress(buffer, 0, 
validBufferBytes, compressed, 0);
+
+        int compressedLength;
+        try
+        {
+            // compressing data with buffer re-use
+            compressedLength = compressor.compress(buffer, 0, 
validBufferBytes, compressed, 0);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Compression exception", e); // 
shouldn't happen
+        }
 
         originalSize += validBufferBytes;
         compressedSize += compressedLength;
@@ -108,16 +128,23 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         // update checksum
         checksum.update(buffer, 0, validBufferBytes);
 
-        // write an offset of the newly written chunk to the index file
-        metadataWriter.writeLong(chunkOffset);
-        chunkCount++;
+        try
+        {
+            // write an offset of the newly written chunk to the index file
+            metadataWriter.writeLong(chunkOffset);
+            chunkCount++;
 
-        assert compressedLength <= compressed.buffer.length;
+            assert compressedLength <= compressed.buffer.length;
 
-        // write data itself
-        out.write(compressed.buffer, 0, compressedLength);
-        // write corresponding checksum
-        out.writeInt((int) checksum.getValue());
+            // write data itself
+            out.write(compressed.buffer, 0, compressedLength);
+            // write corresponding checksum
+            out.writeInt((int) checksum.getValue());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
 
         // reset checksum object to the blank state for re-use
         checksum.reset();
@@ -133,11 +160,11 @@ public class CompressedSequentialWriter extends 
SequentialWriter
     }
 
     @Override
-    public synchronized void resetAndTruncate(FileMark mark) throws IOException
+    public synchronized void resetAndTruncate(FileMark mark)
     {
         assert mark instanceof CompressedFileWriterMark;
 
-        CompressedFileWriterMark realMark = ((CompressedFileWriterMark) mark);
+        CompressedFileWriterMark realMark = (CompressedFileWriterMark) mark;
 
         // reset position
         current = realMark.uncDataOffset;
@@ -161,16 +188,39 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         if (compressed.buffer.length < chunkSize)
             compressed.buffer = new byte[chunkSize];
 
-        out.seek(chunkOffset);
-        out.readFully(compressed.buffer, 0, chunkSize);
-
-        // decompress data chunk and store its length
-        int validBytes = compressor.uncompress(compressed.buffer, 0, 
chunkSize, buffer, 0);
-
-        checksum.update(buffer, 0, validBytes);
-
-        if (out.readInt() != (int) checksum.getValue())
-            throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
+        try
+        {
+            out.seek(chunkOffset);
+            out.readFully(compressed.buffer, 0, chunkSize);
+
+            int validBytes;
+            try
+            {
+                // decompress data chunk and store its length
+                validBytes = compressor.uncompress(compressed.buffer, 0, 
chunkSize, buffer, 0);
+            }
+            catch (IOException e)
+            {
+                throw new CorruptBlockException(getPath(), chunkOffset, 
chunkSize);
+            }
+
+            checksum.update(buffer, 0, validBytes);
+
+            if (out.readInt() != (int) checksum.getValue())
+                throw new CorruptBlockException(getPath(), chunkOffset, 
chunkSize);
+        }
+        catch (CorruptBlockException e)
+        {
+            throw new CorruptSSTableException(e, getPath());
+        }
+        catch (EOFException e)
+        {
+            throw new CorruptSSTableException(new 
CorruptBlockException(getPath(), chunkOffset, chunkSize), getPath());
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, getPath());
+        }
 
         checksum.reset();
 
@@ -186,17 +236,24 @@ public class CompressedSequentialWriter extends 
SequentialWriter
 
     /**
      * Seek to the offset where next compressed data chunk should be stored.
-     *
-     * @throws IOException on any I/O error.
      */
-    private void seekToChunkStart() throws IOException
+    private void seekToChunkStart()
     {
-        if (out.getFilePointer() != chunkOffset)
-            out.seek(chunkOffset);
+        if (getOnDiskFilePointer() != chunkOffset)
+        {
+            try
+            {
+                out.seek(chunkOffset);
+            }
+            catch (IOException e)
+            {
+                throw new FSReadError(e, getPath());
+            }
+        }
     }
 
     @Override
-    public void close() throws IOException
+    public void close()
     {
         if (buffer == null)
             return; // already closed
@@ -204,7 +261,14 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         super.close();
         sstableMetadataCollector.addCompressionRatio(compressedSize, 
originalSize);
         metadataWriter.finalizeHeader(current, chunkCount);
-        metadataWriter.close();
+        try
+        {
+            metadataWriter.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java 
b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 97f2ccb..125a08f 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -68,7 +68,7 @@ public class DeflateCompressor implements ICompressor
         return chunkLength;
     }
 
-    public int compress(byte[] input, int inputOffset, int inputLength, 
ICompressor.WrappedArray output, int outputOffset) throws IOException
+    public int compress(byte[] input, int inputOffset, int inputLength, 
ICompressor.WrappedArray output, int outputOffset)
     {
         Deflater def = deflater.get();
         def.reset();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index e7128f4..2ea71d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
+import java.nio.channels.ClosedChannelException;
 import java.util.*;
 import java.util.regex.Pattern;
 
@@ -114,8 +115,7 @@ public class SSTableWriter extends SSTable
         iwriter.mark();
     }
 
-    // NOT necessarily an FS error - not throwing FSWE.
-    public void resetAndTruncate() throws IOException
+    public void resetAndTruncate()
     {
         dataFile.resetAndTruncate(dataMark);
         iwriter.resetAndTruncate();
@@ -149,21 +149,21 @@ public class SSTableWriter extends SSTable
 
     public RowIndexEntry append(AbstractCompactedRow row)
     {
+        long currentPosition = beforeAppend(row.key);
         try
         {
-            long currentPosition = beforeAppend(row.key);
             ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
             long dataStart = dataFile.getFilePointer();
             long dataSize = row.write(dataFile.stream);
             assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
                     : "incorrect row data size " + dataSize + " written to " + 
dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart 
+ 8));
-            sstableMetadataCollector.update(dataFile.getFilePointer() - 
currentPosition, row.columnStats());
-            return afterAppend(row.key, currentPosition, row.deletionInfo(), 
row.index());
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, dataFile.getPath());
         }
+        sstableMetadataCollector.update(dataFile.getFilePointer() - 
currentPosition, row.columnStats());
+        return afterAppend(row.key, currentPosition, row.deletionInfo(), 
row.index());
     }
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -190,12 +190,12 @@ public class SSTableWriter extends SSTable
             dataFile.stream.writeInt(builder.writtenAtomCount());
             dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
             afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
-            sstableMetadataCollector.update(dataFile.getFilePointer() - 
startPosition, cf.getColumnStats());
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, dataFile.getPath());
         }
+        sstableMetadataCollector.update(dataFile.getFilePointer() - 
startPosition, cf.getColumnStats());
     }
 
     /**
@@ -318,17 +318,8 @@ public class SSTableWriter extends SSTable
     {
         // index and filter
         iwriter.close();
-
-        try
-        {
-            // main data, close will truncate if necessary
-            dataFile.close();
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, dataFile.getPath());
-        }
-
+        // main data, close will truncate if necessary
+        dataFile.close();
         // write sstable statistics
         SSTableMetadata sstableMetadata = 
sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName());
         writeMetadata(descriptor, sstableMetadata);
@@ -373,12 +364,12 @@ public class SSTableWriter extends SSTable
         try
         {
             out.write(String.format("%s  %s", Hex.bytesToHex(digest), 
dataFileName).getBytes());
-            out.close();
         }
-        catch (IOException e)
+        catch (ClosedChannelException e)
         {
-            throw new FSWriteError(e, out.getPath());
+            throw new AssertionError(); // can't happen.
         }
+        out.close();
     }
 
     private static void writeMetadata(Descriptor desc, SSTableMetadata 
sstableMetadata)
@@ -387,12 +378,12 @@ public class SSTableWriter extends SSTable
         try
         {
             SSTableMetadata.serializer.serialize(sstableMetadata, out.stream);
-            out.close();
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, out.getPath());
         }
+        out.close();
     }
 
     static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
@@ -421,7 +412,7 @@ public class SSTableWriter extends SSTable
         return dataFile.getFilePointer();
     }
 
-    public long getOnDiskFilePointer() throws IOException
+    public long getOnDiskFilePointer()
     {
         return dataFile.getOnDiskFilePointer();
     }
@@ -491,17 +482,17 @@ public class SSTableWriter extends SSTable
                 stream.flush();
                 fos.getFD().sync();
                 stream.close();
-
-                // index
-                long position = indexFile.getFilePointer();
-                indexFile.close(); // calls force
-                FileUtils.truncate(indexFile.getPath(), position);
             }
             catch (IOException e)
             {
                 throw new FSWriteError(e, path);
             }
 
+            // index
+            long position = indexFile.getFilePointer();
+            indexFile.close(); // calls force
+            FileUtils.truncate(indexFile.getPath(), position);
+
             // finalize in-memory index state
             summary.complete();
         }
@@ -511,7 +502,7 @@ public class SSTableWriter extends SSTable
             mark = indexFile.mark();
         }
 
-        public void resetAndTruncate() throws IOException
+        public void resetAndTruncate()
         {
             // we can't un-set the bloom filter addition, but extra keys in 
there are harmless.
             // we can't reset dbuilder either, but that is the last thing 
called in afterappend so

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 3377226..77d4fcf 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -23,6 +23,8 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.utils.CLibrary;
 
 public class SequentialWriter extends OutputStream
@@ -106,18 +108,18 @@ public class SequentialWriter extends OutputStream
         return new SequentialWriter(file, bufferSize, skipIOCache);
     }
 
-    public void write(int value) throws IOException
+    public void write(int value) throws ClosedChannelException
     {
         singleByteBuffer[0] = (byte) value;
         write(singleByteBuffer, 0, 1);
     }
 
-    public void write(byte[] buffer) throws IOException
+    public void write(byte[] buffer) throws ClosedChannelException
     {
         write(buffer, 0, buffer.length);
     }
 
-    public void write(byte[] data, int offset, int length) throws IOException
+    public void write(byte[] data, int offset, int length) throws 
ClosedChannelException
     {
         if (buffer == null)
             throw new ClosedChannelException();
@@ -137,7 +139,7 @@ public class SequentialWriter extends OutputStream
      * return the number of bytes written. caller is responsible for setting
      * isDirty.
      */
-    private int writeAtMost(byte[] data, int offset, int length) throws 
IOException
+    private int writeAtMost(byte[] data, int offset, int length)
     {
         if (current >= bufferOffset + buffer.length)
             reBuffer();
@@ -162,19 +164,25 @@ public class SequentialWriter extends OutputStream
 
     /**
      * Synchronize file contents with disk.
-     * @throws java.io.IOException on any I/O error.
      */
-    public void sync() throws IOException
+    public void sync()
     {
         syncInternal();
     }
 
-    protected void syncDataOnlyInternal() throws IOException
+    protected void syncDataOnlyInternal()
     {
-        out.getFD().sync();
+        try
+        {
+            out.getFD().sync();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
     }
 
-    protected void syncInternal() throws IOException
+    protected void syncInternal()
     {
         if (syncNeeded)
         {
@@ -195,16 +203,14 @@ public class SequentialWriter extends OutputStream
      * If buffer is dirty, flush it's contents to the operating system. Does 
not imply fsync().
      *
      * Currently, for implementation reasons, this also invalidates the buffer.
-     *
-     * @throws java.io.IOException on any I/O error.
      */
     @Override
-    public void flush() throws IOException
+    public void flush()
     {
         flushInternal();
     }
 
-    protected void flushInternal() throws IOException
+    protected void flushInternal()
     {
         if (isDirty)
         {
@@ -246,11 +252,19 @@ public class SequentialWriter extends OutputStream
 
     /**
      * Override this method instead of overriding flush()
-     * @throws IOException on any I/O error.
+     * @throws FSWriteError on any I/O error.
      */
-    protected void flushData() throws IOException
+    protected void flushData()
     {
-        out.write(buffer, 0, validBufferBytes);
+        try
+        {
+            out.write(buffer, 0, validBufferBytes);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+
         if (digest != null)
             digest.update(buffer, 0, validBufferBytes);
     }
@@ -267,14 +281,21 @@ public class SequentialWriter extends OutputStream
      * Furthermore, for compressed files, this value refers to compressed 
data, while the
      * writer getFilePointer() refers to uncompressedFile
      */
-    public long getOnDiskFilePointer() throws IOException
+    public long getOnDiskFilePointer()
     {
         return getFilePointer();
     }
 
-    public long length() throws IOException
+    public long length()
     {
-        return Math.max(Math.max(current, out.length()), bufferOffset + 
validBufferBytes);
+        try
+        {
+            return Math.max(Math.max(current, out.length()), bufferOffset + 
validBufferBytes);
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, getPath());
+        }
     }
 
     public String getPath()
@@ -282,7 +303,7 @@ public class SequentialWriter extends OutputStream
         return filePath;
     }
 
-    protected void reBuffer() throws IOException
+    protected void reBuffer()
     {
         flushInternal();
         resetBuffer();
@@ -304,7 +325,7 @@ public class SequentialWriter extends OutputStream
         return new BufferedFileWriterMark(current);
     }
 
-    public void resetAndTruncate(FileMark mark) throws IOException
+    public void resetAndTruncate(FileMark mark)
     {
         assert mark instanceof BufferedFileWriterMark;
 
@@ -325,18 +346,32 @@ public class SequentialWriter extends OutputStream
         truncate(current);
 
         // reset channel position
-        out.seek(current);
+        try
+        {
+            out.seek(current);
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, getPath());
+        }
 
         resetBuffer();
     }
 
-    public void truncate(long toSize) throws IOException
+    public void truncate(long toSize)
     {
-        out.getChannel().truncate(toSize);
+        try
+        {
+            out.getChannel().truncate(toSize);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
     }
 
     @Override
-    public void close() throws IOException
+    public void close()
     {
         if (buffer == null)
             return; // already closed
@@ -348,7 +383,15 @@ public class SequentialWriter extends OutputStream
         if (skipIOCache && bytesSinceCacheFlush > 0)
             CLibrary.trySkipCache(fd, 0, 0);
 
-        out.close();
+        try
+        {
+            out.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+
         CLibrary.tryCloseFD(directoryFD);
     }
 

Reply via email to