Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0dc8440e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0dc8440e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0dc8440e Branch: refs/heads/trunk Commit: 0dc8440efb90a71d3a91374c682dfc4bfc61c672 Parents: a07d7ac c39a9b0 Author: Yuki Morishita <yu...@apache.org> Authored: Thu Nov 10 09:34:28 2016 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Nov 10 09:34:28 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/io/compress/CompressedSequentialWriter.java | 2 +- src/java/org/apache/cassandra/io/util/CompressedChunkReader.java | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dc8440e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index dd9088b,9598546..e765a9c --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,111 -1,14 +1,112 @@@ -3.0.11 +3.10 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Fix cassandra-stress truncate option (CASSANDRA-12695) + * Fix crossNode value when receiving messages (CASSANDRA-12791) + * Don't load MX4J beans twice (CASSANDRA-12869) + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Add row offset support to SASI (CASSANDRA-11990) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889) - * Fix partition count log during compaction (CASSANDRA-12184) - - -3.0.10 * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867) * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788) - * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854) * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296) - * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689) + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854) + * Avoid deadlock due to MV lock contention (CASSANDRA-12689) * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801) * Include SSTable filename in compacting large row message (CASSANDRA-12384) * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dc8440e/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dc8440e/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/CompressedChunkReader.java index 5f8751a,0000000..8f00ce7 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java @@@ -1,227 -1,0 +1,227 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.compress.CorruptBlockException; +import org.apache.cassandra.io.sstable.CorruptSSTableException; + +public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader +{ + final CompressionMetadata metadata; + + protected CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata) + { + super(channel, metadata.dataLength); + this.metadata = metadata; + assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two + } + + @VisibleForTesting + public double getCrcCheckChance() + { + return metadata.parameters.getCrcCheckChance(); + } + + @Override + public String toString() + { + return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length %d)", + getClass().getSimpleName(), + channel.filePath(), + metadata.compressor().getClass().getSimpleName(), + metadata.chunkLength(), + metadata.dataLength); + } + + @Override + public int chunkSize() + { + return metadata.chunkLength(); + } + + @Override + public BufferType preferredBufferType() + { + return metadata.compressor().preferredBufferType(); + } + + @Override + public Rebufferer instantiateRebufferer() + { + return new BufferManagingRebufferer.Aligned(this); + } + + public static class Standard extends CompressedChunkReader + { + // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one. + private final ThreadLocal<ByteBuffer> compressedHolder; + + public Standard(ChannelProxy channel, CompressionMetadata metadata) + { + super(channel, metadata); + compressedHolder = ThreadLocal.withInitial(this::allocateBuffer); + } + + public ByteBuffer allocateBuffer() + { + return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())); + } + + public ByteBuffer allocateBuffer(int size) + { + return metadata.compressor().preferredBufferType().allocate(size); + } + + @Override + public void readChunk(long position, ByteBuffer uncompressed) + { + try + { + // accesses must always be aligned + assert (position & -uncompressed.capacity()) == position; + assert position <= fileLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + ByteBuffer compressed = compressedHolder.get(); + + if (compressed.capacity() < chunk.length) + { + compressed = allocateBuffer(chunk.length); + compressedHolder.set(compressed); + } + else + { + compressed.clear(); + } + + compressed.limit(chunk.length); + if (channel.read(compressed, chunk.offset) != chunk.length) + throw new CorruptBlockException(channel.filePath(), chunk); + + compressed.flip(); + uncompressed.clear(); + + try + { + metadata.compressor().uncompress(compressed, uncompressed); + } + catch (IOException e) + { - throw new CorruptBlockException(channel.filePath(), chunk); ++ throw new CorruptBlockException(channel.filePath(), chunk, e); + } + finally + { + uncompressed.flip(); + } + + if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + compressed.rewind(); + int checksum = (int) metadata.checksumType.of(compressed); + + compressed.clear().limit(Integer.BYTES); + if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES + || compressed.getInt(0) != checksum) + throw new CorruptBlockException(channel.filePath(), chunk); + } + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, channel.filePath()); + } + } + } + + public static class Mmap extends CompressedChunkReader + { + protected final MmappedRegions regions; + + public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) + { + super(channel, metadata); + this.regions = regions; + } + + @Override + public void readChunk(long position, ByteBuffer uncompressed) + { + try + { + // accesses must always be aligned + assert (position & -uncompressed.capacity()) == position; + assert position <= fileLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + + MmappedRegions.Region region = regions.floor(chunk.offset); + long segmentOffset = region.offset(); + int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset); + ByteBuffer compressedChunk = region.buffer(); + + compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); + + uncompressed.clear(); + + try + { + metadata.compressor().uncompress(compressedChunk, uncompressed); + } + catch (IOException e) + { - throw new CorruptBlockException(channel.filePath(), chunk); ++ throw new CorruptBlockException(channel.filePath(), chunk, e); + } + finally + { + uncompressed.flip(); + } + + if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); + + int checksum = (int) metadata.checksumType.of(compressedChunk); + + compressedChunk.limit(compressedChunk.capacity()); + if (compressedChunk.getInt() != checksum) + throw new CorruptBlockException(channel.filePath(), chunk); + } + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, channel.filePath()); + } + + } + + public void close() + { + regions.closeQuietly(); + super.close(); + } + } +}