Updated Branches: refs/heads/trunk cb9cf504c -> 639b314d2
Repair should validate checksums before streaming Patch by Vijay, reviewed by Jason Brown for CASSANDRA-3648 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639b314d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639b314d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639b314d Branch: refs/heads/trunk Commit: 639b314d2ef2d95cfef2ffd4bc597540fa83f3cf Parents: cb9cf50 Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Thu Jan 31 13:49:19 2013 -0800 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Thu Jan 31 13:49:19 2013 -0800 ---------------------------------------------------------------------- .../org/apache/cassandra/io/sstable/Component.java | 6 +- .../org/apache/cassandra/io/sstable/SSTable.java | 1 + .../apache/cassandra/io/sstable/SSTableWriter.java | 34 +--- .../cassandra/io/util/DataIntegrityMetadata.java | 167 +++++++++++++++ .../apache/cassandra/io/util/SequentialWriter.java | 37 +--- .../apache/cassandra/streaming/FileStreamTask.java | 40 +++- .../cassandra/streaming/StreamingTransferTest.java | 42 ++++ 7 files changed, 260 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index cbc12d9..599e0ba 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -53,6 +53,8 @@ public class Component STATS("Statistics.db"), // holds sha1 sum of the data file (to be checked by sha1sum) DIGEST("Digest.sha1"), + // holds the CRC32 for chunks in an a uncompressed file. + CRC("CRC.db"), // holds SSTable Index Summary and Boundaries SUMMARY("Summary.db"), // table of contents, stores the list of all components for the sstable @@ -83,6 +85,7 @@ public class Component public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO); public final static Component STATS = new Component(Type.STATS); public final static Component DIGEST = new Component(Type.DIGEST); + public final static Component CRC = new Component(Type.CRC); public final static Component SUMMARY = new Component(Type.SUMMARY); public final static Component TOC = new Component(Type.TOC); @@ -134,7 +137,8 @@ public class Component case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break; case STATS: component = Component.STATS; break; case DIGEST: component = Component.DIGEST; break; - case SUMMARY: component = Component.SUMMARY; break; + case CRC: component = Component.CRC; break; + case SUMMARY: component = Component.SUMMARY; break; case TOC: component = Component.TOC; break; case CUSTOM: component = new Component(Type.CUSTOM, path.right); break; default: http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index c7486ba..25738d0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -63,6 +63,7 @@ public abstract class SSTable public static final String COMPONENT_FILTER = Component.Type.FILTER.repr; public static final String COMPONENT_STATS = Component.Type.STATS.repr; public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr; + public static final String COMPONENT_CRC = Component.Type.CRC.repr; public static final String COMPONENT_SUMMARY = Component.Type.SUMMARY.repr; public static final String TEMPFILE_MARKER = "tmp"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 279599e..2166808 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -18,9 +18,7 @@ package org.apache.cassandra.io.sstable; import java.io.*; -import java.nio.channels.ClosedChannelException; import java.util.*; -import java.util.regex.Pattern; import com.google.common.collect.Sets; import org.slf4j.Logger; @@ -48,6 +46,7 @@ public class SSTableWriter extends SSTable private DecoratedKey lastWrittenKey; private FileMark dataMark; private final SSTableMetadata.Collector sstableMetadataCollector; + private DataIntegrityMetadata.ChecksumWriter integratyWriter; public SSTableWriter(String filename, long keyCount) { @@ -70,11 +69,16 @@ public class SSTableWriter extends SSTable components.add(Component.FILTER); if (metadata.compressionParameters().sstableCompressor != null) + { components.add(Component.COMPRESSION_INFO); + } else + { // it would feel safer to actually add this component later in maybeWriteDigest(), // but the components are unmodifiable after construction components.add(Component.DIGEST); + components.add(Component.CRC); + } return components; } @@ -104,7 +108,8 @@ public class SSTableWriter extends SSTable dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); dataFile = SequentialWriter.open(new File(getFilename()), !DatabaseDescriptor.populateIOCacheOnFlush()); - dataFile.setComputeDigest(); + integratyWriter = DataIntegrityMetadata.checksumWriter(descriptor); + dataFile.setDataIntegratyWriter(integratyWriter); } this.sstableMetadataCollector = sstableMetadataCollector; @@ -313,7 +318,6 @@ public class SSTableWriter extends SSTable // write sstable statistics SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName()); writeMetadata(descriptor, sstableMetadata); - maybeWriteDigest(); // save the table of components SSTable.appendTOC(descriptor, components); @@ -343,28 +347,6 @@ public class SSTableWriter extends SSTable return sstable; } - private void maybeWriteDigest() - { - byte[] digest = dataFile.digest(); - if (digest == null) - return; - - SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true); - // Writting output compatible with sha1sum - Descriptor newdesc = descriptor.asTemporary(false); - String[] tmp = newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator)); - String dataFileName = tmp[tmp.length - 1]; - try - { - out.write(String.format("%s %s", Hex.bytesToHex(digest), dataFileName).getBytes()); - } - catch (ClosedChannelException e) - { - throw new AssertionError(); // can't happen. - } - out.close(); - } - private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata) { SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java new file mode 100644 index 0000000..f334d08 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.Closeable; +import java.io.File; +import java.io.IOError; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.regex.Pattern; +import java.util.zip.Checksum; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.PureJavaCrc32; + +public class DataIntegrityMetadata +{ + public static ChecksumValidator checksumValidator(Descriptor desc) throws IOException + { + return new ChecksumValidator(desc); + } + + public static class ChecksumValidator implements Closeable + { + private final Checksum checksum = new PureJavaCrc32(); + private final RandomAccessReader reader; + private final Descriptor descriptor; + public final int chunkSize; + + public ChecksumValidator(Descriptor desc) throws IOException + { + this.descriptor = desc; + reader = RandomAccessReader.open(new File(desc.filenameFor(Component.CRC))); + chunkSize = reader.readInt(); + } + + public void seek(long offset) + { + long start = chunkStart(offset); + reader.seek(((start / chunkSize) * 4L) + 4); // 8 byte checksum per + // chunk + 4 byte + // header/chunkLength + } + + public long chunkStart(long offset) + { + long startChunk = offset / chunkSize; + return startChunk * chunkSize; + } + + public void validate(byte[] bytes, int start, int end) throws IOException + { + checksum.update(bytes, start, end); + int current = (int) checksum.getValue(); + checksum.reset(); + int actual = reader.readInt(); + if (current != actual) + throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA)); + } + + public void close() + { + reader.close(); + } + } + + public static ChecksumWriter checksumWriter(Descriptor desc) + { + return new ChecksumWriter(desc); + } + + public static class ChecksumWriter implements Closeable + { + private final Checksum checksum = new PureJavaCrc32(); + private final MessageDigest digest; + private final SequentialWriter writer; + private final Descriptor descriptor; + + public ChecksumWriter(Descriptor desc) + { + this.descriptor = desc; + writer = SequentialWriter.open(new File(desc.filenameFor(Component.CRC)), true); + try + { + digest = MessageDigest.getInstance("SHA-1"); + } + catch (NoSuchAlgorithmException e) + { + // SHA-1 is standard in java 6 + throw new RuntimeException(e); + } + } + + public void writeChunkSize(int length) + { + try + { + writer.stream.writeInt(length); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + public void append(byte[] buffer, int start, int end) + { + try + { + checksum.update(buffer, start, end); + writer.stream.writeInt((int) checksum.getValue()); + checksum.reset(); + + digest.update(buffer, start, end); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + public void close() + { + FileUtils.closeQuietly(writer); + byte[] bytes = digest.digest(); + if (bytes == null) + return; + SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true); + // Writting output compatible with sha1sum + Descriptor newdesc = descriptor.asTemporary(false); + String[] tmp = newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator)); + String dataFileName = tmp[tmp.length - 1]; + try + { + out.write(String.format("%s %s", Hex.bytesToHex(bytes), dataFileName).getBytes()); + } + catch (ClosedChannelException e) + { + throw new AssertionError(); // can't happen. + } + finally + { + FileUtils.closeQuietly(out); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 77d4fcf..b970c95 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -19,8 +19,6 @@ package org.apache.cassandra.io.util; import java.io.*; import java.nio.channels.ClosedChannelException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; @@ -60,7 +58,7 @@ public class SequentialWriter extends OutputStream private int bytesSinceTrickleFsync = 0; public final DataOutputStream stream; - private MessageDigest digest; + private DataIntegrityMetadata.ChecksumWriter metadata; public SequentialWriter(File file, int bufferSize, boolean skipIOCache) { @@ -265,8 +263,8 @@ public class SequentialWriter extends OutputStream throw new FSWriteError(e, getPath()); } - if (digest != null) - digest.update(buffer, 0, validBufferBytes); + if (metadata != null) + metadata.append(buffer, 0, validBufferBytes); } public long getFilePointer() @@ -392,6 +390,7 @@ public class SequentialWriter extends OutputStream throw new FSWriteError(e, getPath()); } + FileUtils.closeQuietly(metadata); CLibrary.tryCloseFD(directoryFD); } @@ -400,34 +399,12 @@ public class SequentialWriter extends OutputStream * This can only be called before any data is written to this write, * otherwise an IllegalStateException is thrown. */ - public void setComputeDigest() + public void setDataIntegratyWriter(DataIntegrityMetadata.ChecksumWriter writer) { if (current != 0) throw new IllegalStateException(); - - try - { - digest = MessageDigest.getInstance("SHA-1"); - } - catch (NoSuchAlgorithmException e) - { - // SHA-1 is standard in java 6 - throw new RuntimeException(e); - } - } - - /** - * Return the digest associated to this file or null if no digest was - * created. - * This can only be called once the file is fully created, i.e. after - * close() has been called. Otherwise an IllegalStateException is thrown. - */ - public byte[] digest() - { - if (buffer != null) - throw new IllegalStateException(); - - return digest == null ? null : digest.digest(); + metadata = writer; + metadata.writeChunkSize(buffer.length); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index 67d5c35..8472d54 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -27,6 +27,10 @@ import org.slf4j.LoggerFactory; import com.ning.compress.lzf.LZFOutputStream; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.util.DataIntegrityMetadata; +import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.metrics.StreamingMetrics; @@ -41,7 +45,7 @@ public class FileStreamTask extends WrappedRunnable { private static final Logger logger = LoggerFactory.getLogger(FileStreamTask.class); - public static final int CHUNK_SIZE = 64 * 1024; + private static final int DEFAULT_CHUNK_SIZE = 64 * 1024; public static final int MAX_CONNECT_ATTEMPTS = 4; protected final StreamHeader header; @@ -54,7 +58,7 @@ public class FileStreamTask extends WrappedRunnable private OutputStream compressedoutput; private DataInputStream input; // allocate buffer to use for transfers only once - private final byte[] transferBuffer = new byte[CHUNK_SIZE]; + private byte[] transferBuffer; // outbound global throughput limiter protected final Throttle throttle; private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler(); @@ -140,6 +144,11 @@ public class FileStreamTask extends WrappedRunnable // try to skip kernel page cache if possible RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()), true); + Descriptor desc = Descriptor.fromFilename(header.file.getFilename()); + ChecksumValidator metadata = null; + if (new File(desc.filenameFor(Component.CRC)).exists()) + metadata = DataIntegrityMetadata.checksumValidator(desc); + transferBuffer = metadata == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[metadata.chunkSize]; // setting up data compression stream compressedoutput = new LZFOutputStream(output); @@ -151,21 +160,26 @@ public class FileStreamTask extends WrappedRunnable // stream each of the required sections of the file for (Pair<Long, Long> section : header.file.sections) { + long start = metadata == null ? section.left : metadata.chunkStart(section.left); + int skipBytes = (int) (section.left - start); // seek to the beginning of the section - file.seek(section.left); + file.seek(start); + if (metadata != null) + metadata.seek(start); - // length of the section to stream - long length = section.right - section.left; + // length of the section to read + long length = section.right - start; // tracks write progress long bytesTransferred = 0; while (bytesTransferred < length) { - long lastWrite = write(file, length, bytesTransferred); + long lastWrite = write(file, metadata, skipBytes, length, bytesTransferred); bytesTransferred += lastWrite; totalBytesTransferred += lastWrite; // store streaming progress header.file.progress += lastWrite; + skipBytes = 0; } // make sure that current section is send @@ -203,6 +217,8 @@ public class FileStreamTask extends WrappedRunnable * Sequentially read bytes from the file and write them to the output stream * * @param reader The file reader to read from + * @param validator validator to verify data integrity + * @param start number of bytes to skip transfer, but include for validation. * @param length The full length that should be transferred * @param bytesTransferred Number of bytes remaining to transfer * @@ -210,12 +226,16 @@ public class FileStreamTask extends WrappedRunnable * * @throws IOException on any I/O error */ - protected long write(RandomAccessReader reader, long length, long bytesTransferred) throws IOException + protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException { - int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); + int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred); + int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer()); - reader.readFully(transferBuffer, 0, toTransfer); - compressedoutput.write(transferBuffer, 0, toTransfer); + reader.readFully(transferBuffer, 0, minReadable); + if (validator != null) + validator.validate(transferBuffer, 0, minReadable); + + compressedoutput.write(transferBuffer, start, (toTransfer - start)); throttle.throttleDelta(toTransfer); return toTransfer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/639b314d/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 07b19ae..76d66b7 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -25,6 +25,7 @@ import static org.apache.cassandra.Util.column; import static org.apache.cassandra.Util.addMutation; import java.net.InetAddress; +import java.sql.Date; import java.util.*; import org.apache.cassandra.SchemaLoader; @@ -117,6 +118,11 @@ public class StreamingTransferTest extends SchemaLoader List<Range<Token>> ranges = new ArrayList<Range<Token>>(); ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); + transfer(table, sstable, ranges); + } + + private void transfer(Table table, SSTableReader sstable, List<Range<Token>> ranges) throws Exception + { StreamOutSession session = StreamOutSession.create(table.getName(), LOCAL, (IStreamCallback)null); StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP); session.await(); @@ -313,6 +319,42 @@ public class StreamingTransferTest extends SchemaLoader } } + @Test + public void testRandomSSTableTransfer() throws Exception + { + final Table table = Table.open("Keyspace1"); + final ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1"); + Mutator mutator = new Mutator() + { + public void mutate(String key, String colName, long timestamp) throws Exception + { + RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key)); + ColumnFamily cf = ColumnFamily.create(table.getName(), cfs.name); + cf.addColumn(column(colName, "value", timestamp)); + cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(new Date(timestamp).toString()), timestamp)); + rm.add(cf); + logger.debug("Applying row to transfer " + rm); + rm.apply(); + } + }; + // write a lot more data so the data is spread in more than 1 chunk. + for (int i = 1; i <= 6000; i++) + mutator.mutate("key" + i, "col" + i, System.currentTimeMillis()); + cfs.forceBlockingFlush(); + Util.compactAll(cfs).get(); + SSTableReader sstable = cfs.getSSTables().iterator().next(); + cfs.clearUnsafe(); + + IPartitioner p = StorageService.getPartitioner(); + List<Range<Token>> ranges = new ArrayList<Range<Token>>(); + ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key1000")))); + ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key5")), p.getToken(ByteBufferUtil.bytes("key500")))); + ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key9")), p.getToken(ByteBufferUtil.bytes("key900")))); + transfer(table, sstable, ranges); + assertEquals(1, cfs.getSSTables().size()); + assertEquals(7, Util.getRangeSlice(cfs).size()); + } + public interface Mutator { public void mutate(String key, String col, long timestamp) throws Exception;