Switch CRC component to Adler and include it for compressed sstables
patch by Marcus Eriksson, Radovan Zvoncek, and jbellis for CASSANDRA-4165


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9405ce0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9405ce0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9405ce0e

Branch: refs/heads/trunk
Commit: 9405ce0e2d95424544561d088d3167cf328964ec
Parents: da444a6
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Wed Mar 12 10:04:29 2014 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Wed Mar 12 10:04:29 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../io/compress/CompressedSequentialWriter.java | 35 ++++-----
 .../apache/cassandra/io/sstable/Descriptor.java |  3 +
 .../cassandra/io/sstable/SSTableWriter.java     | 19 +++--
 .../io/util/DataIntegrityMetadata.java          | 83 ++++++++------------
 .../cassandra/io/util/SequentialWriter.java     | 35 +++++----
 6 files changed, 79 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 61e17e3..7a1fc60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.0-beta2
+ * Switch CRC component to Adler and include it for compressed sstables 
+   (CASSANDRA-4165)
  * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
  * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
  * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 436b6dc..b8a21cc 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -26,20 +26,15 @@ import java.util.zip.Checksum;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 
 public class CompressedSequentialWriter extends SequentialWriter
 {
-    public static SequentialWriter open(String dataFilePath,
-                                        String indexFilePath,
-                                        boolean skipIOCache,
-                                        CompressionParameters parameters,
-                                        MetadataCollector 
sstableMetadataCollector)
-    {
-        return new CompressedSequentialWriter(new File(dataFilePath), 
indexFilePath, skipIOCache, parameters, sstableMetadataCollector);
-    }
+    private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
 
     // holds offset in the file where current chunk should be written
     // changed only by flush() method where data buffer gets compressed and 
stored to the file
@@ -55,14 +50,12 @@ public class CompressedSequentialWriter extends 
SequentialWriter
     // holds a number of already written chunks
     private int chunkCount = 0;
 
-    private final Checksum checksum = new Adler32();
-
     private long originalSize = 0, compressedSize = 0;
 
     private final MetadataCollector sstableMetadataCollector;
 
     public CompressedSequentialWriter(File file,
-                                      String indexFilePath,
+                                      String offsetsPath,
                                       boolean skipIOCache,
                                       CompressionParameters parameters,
                                       MetadataCollector 
sstableMetadataCollector)
@@ -74,10 +67,11 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         compressed = new ICompressor.WrappedArray(new 
byte[compressor.initialCompressedBufferLength(buffer.length)]);
 
         /* Index File (-CompressionInfo.db component) and it's header */
-        metadataWriter = CompressionMetadata.Writer.open(indexFilePath);
+        metadataWriter = CompressionMetadata.Writer.open(offsetsPath);
         metadataWriter.writeHeader(parameters);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
+        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out);
     }
 
     @Override
@@ -125,9 +119,6 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         originalSize += validBufferBytes;
         compressedSize += compressedLength;
 
-        // update checksum
-        checksum.update(compressed.buffer, 0, compressedLength);
-
         try
         {
             // write an offset of the newly written chunk to the index file
@@ -139,16 +130,13 @@ public class CompressedSequentialWriter extends 
SequentialWriter
             // write data itself
             out.write(compressed.buffer, 0, compressedLength);
             // write corresponding checksum
-            out.writeInt((int) checksum.getValue());
+            crcMetadata.append(compressed.buffer, 0, compressedLength);
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, getPath());
         }
 
-        // reset checksum object to the blank state for re-use
-        checksum.reset();
-
         // next chunk should be written right after current + length of the 
checksum (int)
         chunkOffset += compressedLength + 4;
     }
@@ -203,6 +191,7 @@ public class CompressedSequentialWriter extends 
SequentialWriter
                 throw new CorruptBlockException(getPath(), chunkOffset, 
chunkSize);
             }
 
+            Checksum checksum = new Adler32();
             checksum.update(compressed.buffer, 0, chunkSize);
 
             if (out.readInt() != (int) checksum.getValue())
@@ -221,8 +210,6 @@ public class CompressedSequentialWriter extends 
SequentialWriter
             throw new FSReadError(e, getPath());
         }
 
-        checksum.reset();
-
         // reset buffer
         validBufferBytes = realMark.bufferOffset;
         bufferOffset = current - validBufferBytes;
@@ -270,6 +257,12 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         }
     }
 
+    @Override
+    public void writeFullChecksum(Descriptor descriptor)
+    {
+        crcMetadata.writeFullChecksum(descriptor);
+    }
+
     /**
      * Class to hold a mark to the position of the file
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index a2a27d8..4803ae7 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -62,6 +62,7 @@ public class Descriptor
         //             checksum the compressed data
         // ka (2.1.0): new Statistics.db file format
         //             index summaries can be downsampled and the sampling 
level is persisted
+        //             switch uncompressed checksums to adler32
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -77,6 +78,7 @@ public class Descriptor
         public final boolean hasPostCompressionAdlerChecksums;
         public final boolean hasSamplingLevel;
         public final boolean newStatsFile;
+        public final boolean hasAllAdlerChecksums;
         public final boolean hasRepairedAt;
 
         public Version(String version)
@@ -92,6 +94,7 @@ public class Descriptor
             hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
             hasSamplingLevel = version.compareTo("ka") >= 0;
             newStatsFile = version.compareTo("ka") >= 0;
+            hasAllAdlerChecksums = version.compareTo("ka") >= 0;
             hasRepairedAt = version.compareTo("ka") >= 0;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 2d1858f..a7fd881 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
@@ -76,7 +75,8 @@ public class SSTableWriter extends SSTable
                                                                          
Component.PRIMARY_INDEX,
                                                                          
Component.STATS,
                                                                          
Component.SUMMARY,
-                                                                         
Component.TOC));
+                                                                         
Component.TOC,
+                                                                         
Component.DIGEST));
 
         if (metadata.getBloomFilterFpChance() < 1.0)
             components.add(Component.FILTER);
@@ -89,7 +89,6 @@ public class SSTableWriter extends SSTable
         {
             // it would feel safer to actually add this component later in 
maybeWriteDigest(),
             // but the components are unmodifiable after construction
-            components.add(Component.DIGEST);
             components.add(Component.CRC);
         }
         return components;
@@ -112,17 +111,16 @@ public class SSTableWriter extends SSTable
         if (compression)
         {
             dbuilder = SegmentedFile.getCompressedBuilder();
-            dataFile = CompressedSequentialWriter.open(getFilename(),
-                                                       
descriptor.filenameFor(Component.COMPRESSION_INFO),
-                                                       
!metadata.populateIoCacheOnFlush(),
-                                                       
metadata.compressionParameters(),
-                                                       
sstableMetadataCollector);
+            dataFile = SequentialWriter.open(getFilename(),
+                                             
descriptor.filenameFor(Component.COMPRESSION_INFO),
+                                             
!metadata.populateIoCacheOnFlush(),
+                                             metadata.compressionParameters(),
+                                             sstableMetadataCollector);
         }
         else
         {
             dbuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-            dataFile = SequentialWriter.open(new File(getFilename()), 
!metadata.populateIoCacheOnFlush());
-            
dataFile.setDataIntegrityWriter(DataIntegrityMetadata.checksumWriter(descriptor));
+            dataFile = SequentialWriter.open(new File(getFilename()), 
!metadata.populateIoCacheOnFlush(), new 
File(descriptor.filenameFor(Component.CRC)));
         }
 
         this.sstableMetadataCollector = sstableMetadataCollector;
@@ -369,6 +367,7 @@ public class SSTableWriter extends SSTable
 
     private Pair<Descriptor, StatsMetadata> close(long repairedAt)
     {
+        dataFile.writeFullChecksum(descriptor);
         // index and filter
         iwriter.close();
         // main data, close will truncate if necessary

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java 
b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index f3c6c63..797b964 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -17,19 +17,21 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.io.BufferedWriter;
 import java.io.Closeable;
+import java.io.DataOutput;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.regex.Pattern;
+import java.nio.file.Files;
+import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
+import com.google.common.base.Charsets;
+
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.utils.Hex;
 import org.apache.cassandra.utils.PureJavaCrc32;
 
 public class DataIntegrityMetadata
@@ -41,24 +43,23 @@ public class DataIntegrityMetadata
 
     public static class ChecksumValidator implements Closeable
     {
-        private final Checksum checksum = new PureJavaCrc32();
+        private final Checksum checksum;
         private final RandomAccessReader reader;
         private final Descriptor descriptor;
         public final int chunkSize;
 
-        public ChecksumValidator(Descriptor desc) throws IOException
+        public ChecksumValidator(Descriptor descriptor) throws IOException
         {
-            this.descriptor = desc;
-            reader = RandomAccessReader.open(new 
File(desc.filenameFor(Component.CRC)));
+            this.descriptor = descriptor;
+            checksum = descriptor.version.hasAllAdlerChecksums ? new Adler32() 
: new PureJavaCrc32();
+            reader = RandomAccessReader.open(new 
File(descriptor.filenameFor(Component.CRC)));
             chunkSize = reader.readInt();
         }
 
         public void seek(long offset)
         {
             long start = chunkStart(offset);
-            reader.seek(((start / chunkSize) * 4L) + 4); // 8 byte checksum per
-                                                         // chunk + 4 byte
-                                                         // header/chunkLength
+            reader.seek(((start / chunkSize) * 4L) + 4); // 8 byte checksum 
per chunk + 4 byte header/chunkLength
         }
 
         public long chunkStart(long offset)
@@ -83,38 +84,22 @@ public class DataIntegrityMetadata
         }
     }
 
-    public static ChecksumWriter checksumWriter(Descriptor desc)
-    {
-        return new ChecksumWriter(desc);
-    }
-
-    public static class ChecksumWriter implements Closeable
+    public static class ChecksumWriter
     {
-        private final Checksum checksum = new PureJavaCrc32();
-        private final MessageDigest digest;
-        private final SequentialWriter writer;
-        private final Descriptor descriptor;
+        private final Checksum incrementalChecksum = new Adler32();
+        private final DataOutput incrementalOut;
+        private final Checksum fullChecksum = new Adler32();
 
-        public ChecksumWriter(Descriptor desc)
+        public ChecksumWriter(DataOutput incrementalOut)
         {
-            this.descriptor = desc;
-            writer = SequentialWriter.open(new 
File(desc.filenameFor(Component.CRC)), true);
-            try
-            {
-                digest = MessageDigest.getInstance("SHA-1");
-            }
-            catch (NoSuchAlgorithmException e)
-            {
-                // SHA-1 is standard in java 6
-                throw new RuntimeException(e);
-            }
+            this.incrementalOut = incrementalOut;
         }
 
         public void writeChunkSize(int length)
         {
             try
             {
-                writer.stream.writeInt(length);
+                incrementalOut.writeInt(length);
             }
             catch (IOException e)
             {
@@ -126,11 +111,11 @@ public class DataIntegrityMetadata
         {
             try
             {
-                checksum.update(buffer, start, end);
-                writer.stream.writeInt((int) checksum.getValue());
-                checksum.reset();
+                incrementalChecksum.update(buffer, start, end);
+                incrementalOut.writeInt((int) incrementalChecksum.getValue());
+                incrementalChecksum.reset();
 
-                digest.update(buffer, start, end);
+                fullChecksum.update(buffer, start, end);
             }
             catch (IOException e)
             {
@@ -138,24 +123,18 @@ public class DataIntegrityMetadata
             }
         }
 
-        public void close()
+        public void writeFullChecksum(Descriptor descriptor)
         {
-            FileUtils.closeQuietly(writer);
-            byte[] bytes = digest.digest();
-            if (bytes == null)
-                return;
-            SequentialWriter out = SequentialWriter.open(new 
File(descriptor.filenameFor(Component.DIGEST)), true);
-            // Writting output compatible with sha1sum
-            Descriptor newdesc = descriptor.asTemporary(false);
-            String[] tmp = 
newdesc.filenameFor(Component.DATA).split(Pattern.quote(File.separator));
-            String dataFileName = tmp[tmp.length - 1];
+            File outFile = new File(descriptor.filenameFor(Component.DIGEST));
+            BufferedWriter out = null;
             try
             {
-                out.write(String.format("%s  %s", Hex.bytesToHex(bytes), 
dataFileName).getBytes());
+                out = Files.newBufferedWriter(outFile.toPath(), 
Charsets.UTF_8);
+                out.write(String.valueOf(fullChecksum.getValue()));
             }
-            catch (ClosedChannelException e)
+            catch (IOException e)
             {
-                throw new AssertionError(); // can't happen.
+                throw new FSWriteError(e, outFile);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index a69391e..c4ec653 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -23,6 +23,10 @@ import java.nio.channels.ClosedChannelException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CLibrary;
 
 /**
@@ -59,7 +63,6 @@ public class SequentialWriter extends OutputStream
     private int bytesSinceTrickleFsync = 0;
 
     public final DataOutputStream stream;
-    private DataIntegrityMetadata.ChecksumWriter metadata;
 
     public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
     {
@@ -107,6 +110,20 @@ public class SequentialWriter extends OutputStream
         return new SequentialWriter(file, bufferSize, skipIOCache);
     }
 
+    public static ChecksummedSequentialWriter open(File file, boolean 
skipIOCache, File crcPath)
+    {
+        return new ChecksummedSequentialWriter(file, 
RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache, crcPath);
+    }
+
+    public static CompressedSequentialWriter open(String dataFilePath,
+                                                  String offsetsPath,
+                                                  boolean skipIOCache,
+                                                  CompressionParameters 
parameters,
+                                                  MetadataCollector 
sstableMetadataCollector)
+    {
+        return new CompressedSequentialWriter(new File(dataFilePath), 
offsetsPath, skipIOCache, parameters, sstableMetadataCollector);
+    }
+
     public void write(int value) throws ClosedChannelException
     {
         if (current >= bufferOffset + buffer.length)
@@ -273,9 +290,6 @@ public class SequentialWriter extends OutputStream
         {
             throw new FSWriteError(e, getPath());
         }
-
-        if (metadata != null)
-            metadata.append(buffer, 0, validBufferBytes);
     }
 
     public long getFilePointer()
@@ -401,21 +415,12 @@ public class SequentialWriter extends OutputStream
             throw new FSWriteError(e, getPath());
         }
 
-        FileUtils.closeQuietly(metadata);
         CLibrary.tryCloseFD(directoryFD);
     }
 
-    /**
-     * Turn on digest computation on this writer.
-     * This can only be called before any data is written to this write,
-     * otherwise an IllegalStateException is thrown.
-     */
-    public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter 
writer)
+    // hack to make life easier for subclasses
+    public void writeFullChecksum(Descriptor descriptor)
     {
-        if (current != 0)
-            throw new IllegalStateException();
-        metadata = writer;
-        metadata.writeChunkSize(buffer.length);
     }
 
     /**

Reply via email to