Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e74f014 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e74f014 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e74f014 Branch: refs/heads/trunk Commit: 4e74f01488e03d85516b68514388c32d3c78965c Parents: c169d49 b885e9c Author: Jeff Jirsa <jji...@apple.com> Authored: Wed Dec 6 21:56:22 2017 -0800 Committer: Jeff Jirsa <jji...@apple.com> Committed: Wed Dec 6 21:56:47 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/io/util/CompressedChunkReader.java | 6 ++++-- .../cassandra/streaming/compress/CompressedInputStream.java | 3 ++- 3 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e74f014/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 3c6565c,b275397..1a1a2cf --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,15 -1,7 +1,16 @@@ +3.11.2 + * Remove OpenJDK log warning (CASSANDRA-13916) + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079) + * Cache disk boundaries (CASSANDRA-13215) + * Add asm jar to build.xml for maven builds (CASSANDRA-11193) + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897) + * Update jackson JSON jars (CASSANDRA-13949) + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930) +Merged from 3.0: 3.0.16 + * Optimize CRC check chance probability calculations (CASSANDRA-14094) * Fix cleanup on keyspace with no replicas (CASSANDRA-13526) - * Fix updating base table rows with TTL not removing materialized view entries (CASSANDRA-14071) + * Fix updating base table rows with TTL not removing view entries (CASSANDRA-14071) * Reduce garbage created by DynamicSnitch (CASSANDRA-14091) * More frequent commitlog chained markers (CASSANDRA-13987) * Fix serialized size of DataLimits (CASSANDRA-14057) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e74f014/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/CompressedChunkReader.java index 8f00ce7,0000000..0919c29 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java @@@ -1,227 -1,0 +1,229 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.compress.CorruptBlockException; +import org.apache.cassandra.io.sstable.CorruptSSTableException; + +public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader +{ + final CompressionMetadata metadata; + + protected CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata) + { + super(channel, metadata.dataLength); + this.metadata = metadata; + assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two + } + + @VisibleForTesting + public double getCrcCheckChance() + { + return metadata.parameters.getCrcCheckChance(); + } + + @Override + public String toString() + { + return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length %d)", + getClass().getSimpleName(), + channel.filePath(), + metadata.compressor().getClass().getSimpleName(), + metadata.chunkLength(), + metadata.dataLength); + } + + @Override + public int chunkSize() + { + return metadata.chunkLength(); + } + + @Override + public BufferType preferredBufferType() + { + return metadata.compressor().preferredBufferType(); + } + + @Override + public Rebufferer instantiateRebufferer() + { + return new BufferManagingRebufferer.Aligned(this); + } + + public static class Standard extends CompressedChunkReader + { + // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one. + private final ThreadLocal<ByteBuffer> compressedHolder; + + public Standard(ChannelProxy channel, CompressionMetadata metadata) + { + super(channel, metadata); + compressedHolder = ThreadLocal.withInitial(this::allocateBuffer); + } + + public ByteBuffer allocateBuffer() + { + return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())); + } + + public ByteBuffer allocateBuffer(int size) + { + return metadata.compressor().preferredBufferType().allocate(size); + } + + @Override + public void readChunk(long position, ByteBuffer uncompressed) + { + try + { + // accesses must always be aligned + assert (position & -uncompressed.capacity()) == position; + assert position <= fileLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + ByteBuffer compressed = compressedHolder.get(); + + if (compressed.capacity() < chunk.length) + { + compressed = allocateBuffer(chunk.length); + compressedHolder.set(compressed); + } + else + { + compressed.clear(); + } + + compressed.limit(chunk.length); + if (channel.read(compressed, chunk.offset) != chunk.length) + throw new CorruptBlockException(channel.filePath(), chunk); + + compressed.flip(); + uncompressed.clear(); + + try + { + metadata.compressor().uncompress(compressed, uncompressed); + } + catch (IOException e) + { + throw new CorruptBlockException(channel.filePath(), chunk, e); + } + finally + { + uncompressed.flip(); + } + - if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) ++ if (getCrcCheckChance() >= 1d || ++ getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + compressed.rewind(); + int checksum = (int) metadata.checksumType.of(compressed); + + compressed.clear().limit(Integer.BYTES); + if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES + || compressed.getInt(0) != checksum) + throw new CorruptBlockException(channel.filePath(), chunk); + } + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, channel.filePath()); + } + } + } + + public static class Mmap extends CompressedChunkReader + { + protected final MmappedRegions regions; + + public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) + { + super(channel, metadata); + this.regions = regions; + } + + @Override + public void readChunk(long position, ByteBuffer uncompressed) + { + try + { + // accesses must always be aligned + assert (position & -uncompressed.capacity()) == position; + assert position <= fileLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + + MmappedRegions.Region region = regions.floor(chunk.offset); + long segmentOffset = region.offset(); + int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset); + ByteBuffer compressedChunk = region.buffer(); + + compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); + + uncompressed.clear(); + + try + { + metadata.compressor().uncompress(compressedChunk, uncompressed); + } + catch (IOException e) + { + throw new CorruptBlockException(channel.filePath(), chunk, e); + } + finally + { + uncompressed.flip(); + } + - if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) ++ if (getCrcCheckChance() >= 1d || ++ getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); + + int checksum = (int) metadata.checksumType.of(compressedChunk); + + compressedChunk.limit(compressedChunk.capacity()); + if (compressedChunk.getInt() != checksum) + throw new CorruptBlockException(channel.filePath(), chunk); + } + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, channel.filePath()); + } + + } + + public void close() + { + regions.closeQuietly(); + super.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e74f014/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 81abefa,e3d698e..8a32d7a --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@@ -164,13 -136,17 +164,14 @@@ public class CompressedInputStream exte totalCompressedBytesRead += compressed.length; // validate crc randomly - if (this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) + if (this.crcCheckChanceSupplier.get() >= 1d || + this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) { - checksum.update(compressed, 0, compressed.length - checksumBytes.length); + int checksum = (int) checksumType.of(compressed, 0, compressed.length - checksumBytes.length); System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length); - if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue()) + if (Ints.fromByteArray(checksumBytes) != checksum) throw new IOException("CRC unmatched"); - - // reset checksum object back to the original (blank) state - checksum.reset(); } // buffer offset is always aligned --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org