Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java src/java/org/apache/cassandra/io/compress/CompressionMetadata.java src/java/org/apache/cassandra/io/sstable/SSTableWriter.java src/java/org/apache/cassandra/io/util/SequentialWriter.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55750e07 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55750e07 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55750e07 Branch: refs/heads/cassandra-2.1 Commit: 55750e07d20b76bf2c9f07575bfcb9193734bf24 Parents: edf48f8 3679b1b Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Jan 7 13:05:16 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Jan 7 13:05:16 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../io/compress/CompressedSequentialWriter.java | 6 +++ .../io/compress/CompressionMetadata.java | 9 +++++ .../cassandra/io/sstable/SSTableWriter.java | 21 ++++++----- .../io/util/ChecksummedSequentialWriter.java | 6 +++ .../cassandra/io/util/SequentialWriter.java | 39 ++++++++++++++++---- 6 files changed, 67 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 1c1bfe2,7aad4c0..372972d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,51 -1,5 +1,54 @@@ +2.1.3 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512) + * Properly calculate expected write size during compaction (CASSANDRA-8532) + * Invalidate affected prepared statements when a table's columns + are altered (CASSANDRA-7910) + * Stress - user defined writes should populate sequentally (CASSANDRA-8524) + * Fix regression in SSTableRewriter causing some rows to become unreadable + during compaction (CASSANDRA-8429) + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510) + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression + is disabled (CASSANDRA-8288) + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623) + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463) + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507) + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370) + * Make sstablescrub check leveled manifest again (CASSANDRA-8432) + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458) + * Disable mmap on Windows (CASSANDRA-6993) + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253) + * Add auth support to cassandra-stress (CASSANDRA-7985) + * Fix ArrayIndexOutOfBoundsException when generating error message + for some CQL syntax errors (CASSANDRA-8455) + * Scale memtable slab allocation logarithmically (CASSANDRA-7882) + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964) + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926) + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383) + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459) + * Remove tmplink files for offline compactions (CASSANDRA-8321) + * Reduce maxHintsInProgress (CASSANDRA-8415) + * BTree updates may call provided update function twice (CASSANDRA-8018) + * Release sstable references after anticompaction (CASSANDRA-8386) + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320) + * Fix high size calculations for prepared statements (CASSANDRA-8231) + * Centralize shared executors (CASSANDRA-8055) + * Fix filtering for CONTAINS (KEY) relations on frozen collection + clustering columns when the query is restricted to a single + partition (CASSANDRA-8203) + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243) + * Add more log info if readMeter is null (CASSANDRA-8238) + * add check of the system wall clock time at startup (CASSANDRA-8305) + * Support for frozen collections (CASSANDRA-7859) + * Fix overflow on histogram computation (CASSANDRA-8028) + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801) + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291) + * Improve JBOD disk utilization (CASSANDRA-7386) + * Log failed host when preparing incremental repair (CASSANDRA-8228) + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281) +Merged from 2.0: ++======= + 2.0.12: + * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499) * Increase bf true positive count on key cache hit (CASSANDRA-8525) * Move MeteredFlusher to its own thread (CASSANDRA-8485) * Fix non-distinct results in DISTNCT queries on static columns when http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index d3c41fa,909d822..81bb3e9 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@@ -261,12 -271,12 +261,18 @@@ public class CompressedSequentialWrite } } + public void abort() + { + super.abort(); + metadataWriter.abort(); + } + + @Override + public void writeFullChecksum(Descriptor descriptor) + { + crcMetadata.writeFullChecksum(descriptor); + } + /** * Class to hold a mark to the position of the file */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 173722f,5b0154b..f19d502 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@@ -356,25 -360,38 +356,34 @@@ public class CompressionMetadat */ public void resetAndTruncate(int chunkIndex) { + count = chunkIndex; + } + + public void close(long dataLength, int chunks) throws IOException + { + DataOutputStream out = null; try { - seek(dataLengthOffset - + 8 // size reserved for uncompressed data length - + 4 // size reserved for chunk count - + (chunkIndex * 8L)); - getChannel().truncate(getFilePointer()); + out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath))); + assert chunks == count; + writeHeader(out, dataLength, chunks); + for (int i = 0 ; i < count ; i++) + out.writeLong(offsets.getLong(i * 8)); } - catch (IOException e) + finally { - throw new FSWriteError(e, filePath); + FileUtils.closeQuietly(out); } } + - public void close() throws IOException - { - if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not - getChannel().force(true); - super.close(); - } - + public void abort() + { - try - { - super.close(); - } - catch (Throwable t) ++ if (offsets != null) + { - logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t); ++ offsets.unreference(); ++ offsets = null; + } + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 595012d,08e5527..b0365ad --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@@ -339,23 -313,9 +339,19 @@@ public class SSTableWriter extends SSTa */ public void abort() { - assert descriptor.temporary; - iwriter.abort(); - dataFile.abort(); + abort(true); + } + public void abort(boolean closeBf) + { + assert descriptor.type.isTemporary; + if (iwriter == null && dataFile == null) + return; ++ + if (iwriter != null) - { - FileUtils.closeQuietly(iwriter.indexFile); - if (closeBf) - { - iwriter.bf.close(); - } - } ++ iwriter.abort(closeBf); ++ + if (dataFile!= null) - FileUtils.closeQuietly(dataFile); ++ dataFile.abort(); Set<Component> components = SSTable.componentsFor(descriptor); try @@@ -589,7 -431,7 +585,7 @@@ /** * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. */ -- class IndexWriter implements Closeable ++ class IndexWriter { private final SequentialWriter indexFile; public final SegmentedFile.Builder builder; @@@ -633,6 -469,6 +629,13 @@@ builder.addPotentialBoundary(indexPosition); } ++ public void abort(boolean closeBf) ++ { ++ indexFile.abort(); ++ if (closeBf) ++ bf.close(); ++ } ++ /** * Closes the index and bloomfilter, making the public state of this writer valid for consumption. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java index b95bf32,0000000..f4281b2 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java @@@ -1,53 -1,0 +1,59 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.File; + +import org.apache.cassandra.io.sstable.Descriptor; + +public class ChecksummedSequentialWriter extends SequentialWriter +{ + private final SequentialWriter crcWriter; + private final DataIntegrityMetadata.ChecksumWriter crcMetadata; + + public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath) + { + super(file, bufferSize); + crcWriter = new SequentialWriter(crcPath, 8 * 1024); + crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream); + crcMetadata.writeChunkSize(buffer.length); + } + + protected void flushData() + { + super.flushData(); + crcMetadata.append(buffer, 0, validBufferBytes); + } + + public void writeFullChecksum(Descriptor descriptor) + { + crcMetadata.writeFullChecksum(descriptor); + } + + public void close() + { + super.close(); + crcWriter.close(); + } ++ ++ public void abort() ++ { ++ super.abort(); ++ crcWriter.abort(); ++ } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java index 7a7eb63,b980cf1..227c79d --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@@ -18,10 -18,11 +18,13 @@@ package org.apache.cassandra.io.util; import java.io.*; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.nio.channels.WritableByteChannel; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; @@@ -36,17 -32,23 +39,19 @@@ import org.apache.cassandra.utils.CLibr * Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also * fsync incrementally if Config.trickle_fsync is enabled. */ -public class SequentialWriter extends OutputStream +public class SequentialWriter extends OutputStream implements WritableByteChannel { + private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class); + // isDirty - true if this.buffer contains any un-synced bytes protected boolean isDirty = false, syncNeeded = false; // absolute path to the given file private final String filePath; - // so we can use the write(int) path w/o tons of new byte[] allocations - private final byte[] singleByteBuffer = new byte[1]; - protected byte[] buffer; - private final boolean skipIOCache; private final int fd; -- private final int directoryFD; ++ private int directoryFD; // directory should be synced only after first file sync, in other words, only once per file private boolean directorySynced = false; @@@ -437,21 -387,47 +442,39 @@@ buffer = null; - try - { - out.close(); - } - catch (IOException e) - if (skipIOCache && bytesSinceCacheFlush > 0) - CLibrary.trySkipCache(fd, 0, 0); - + cleanup(true); + } + + public void abort() + { + cleanup(false); + } + + private void cleanup(boolean throwExceptions) + { - FileUtils.closeQuietly(metadata); - - try { CLibrary.tryCloseFD(directoryFD); } - catch (Throwable t) { handle(t, throwExceptions); } ++ if (directoryFD >= 0) + { - throw new FSWriteError(e, getPath()); ++ try { CLibrary.tryCloseFD(directoryFD); } ++ catch (Throwable t) { handle(t, throwExceptions); } ++ directoryFD = -1; + } - CLibrary.tryCloseFD(directoryFD); ++ // close is idempotent + try { out.close(); } + catch (Throwable t) { handle(t, throwExceptions); } + } + + private void handle(Throwable t, boolean throwExceptions) + { + if (!throwExceptions) + logger.warn("Suppressing exception thrown while aborting writer", t); + else + throw new FSWriteError(t, getPath()); } - /** - * Turn on digest computation on this writer. - * This can only be called before any data is written to this write, - * otherwise an IllegalStateException is thrown. - */ - public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter writer) + // hack to make life easier for subclasses + public void writeFullChecksum(Descriptor descriptor) { - if (current != 0) - throw new IllegalStateException(); - metadata = writer; - metadata.writeChunkSize(buffer.length); } /**