Merge branch cassandra-2.1 into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3ac7937 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3ac7937 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3ac7937 Branch: refs/heads/trunk Commit: b3ac7937edce41a341d1d01c7f3201592e1caa8f Parents: 2e5e11d 34a1d5d Author: Benjamin Lerer <b.le...@gmail.com> Authored: Tue Apr 10 09:51:02 2018 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Tue Apr 10 09:52:18 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compress/CompressedRandomAccessReader.java | 52 ++++++++++---------- 2 files changed, 27 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ac7937/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 527975c,aeb3009..5221b1e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,16 -1,8 +1,17 @@@ -2.1.21 +2.2.13 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891) + * Fix query pager DEBUG log leak causing hit in paged reads throughput (CASSANDRA-14318) + * Backport circleci yaml (CASSANDRA-14240) +Merged from 2.1: + * Check checksum before decompressing data (CASSANDRA-14284) * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183) -2.1.20 +2.2.12 + * Fix the inspectJvmOptions startup check (CASSANDRA-14112) + * Fix race that prevents submitting compaction for a table when executor is full (CASSANDRA-13801) + * Rely on the JVM to handle OutOfMemoryErrors (CASSANDRA-13006) + * Grab refs during scrub/index redistribution/cleanup (CASSANDRA-13873) +Merged from 2.1: * Protect against overflow of local expiration time (CASSANDRA-14092) * More PEP8 compliance for cqlsh (CASSANDRA-14021) * RPM package spec: fix permissions for installed jars and config files (CASSANDRA-14181) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ac7937/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index ccfa5e7,fe90cc9..0fc96ed --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@@ -99,54 -77,7 +99,54 @@@ public class CompressedRandomAccessRead { try { - decompressChunk(metadata.chunkFor(current)); + long position = current(); + assert position < metadata.dataLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + + if (compressed.capacity() < chunk.length) + compressed = allocateBuffer(chunk.length, metadata.compressor().preferredBufferType()); + else + compressed.clear(); + compressed.limit(chunk.length); + + if (channel.read(compressed, chunk.offset) != chunk.length) + throw new CorruptBlockException(getPath(), chunk); + compressed.flip(); + buffer.clear(); + ++ if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) ++ { ++ FBUtilities.directCheckSum(checksum, compressed); ++ ++ if (checksum(chunk) != (int) checksum.getValue()) ++ throw new CorruptBlockException(getPath(), chunk); ++ ++ // reset checksum object back to the original (blank) state ++ checksum.reset(); ++ compressed.rewind(); ++ } ++ + try + { + metadata.compressor().uncompress(compressed, buffer); + } + catch (IOException e) + { + throw new CorruptBlockException(getPath(), chunk); + } + finally + { + buffer.flip(); + } + - if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) - { - compressed.rewind(); - FBUtilities.directCheckSum(checksum, compressed); - - if (checksum(chunk) != (int) checksum.getValue()) - throw new CorruptBlockException(getPath(), chunk); - - // reset checksum object back to the original (blank) state - checksum.reset(); - } - + // buffer offset is always aligned + bufferOffset = position & ~(buffer.capacity() - 1); + buffer.position((int) (position - bufferOffset)); + // the length() can be provided at construction time, to override the true (uncompressed) length of the file; + // this is permitted to occur within a compressed segment, so we truncate validBufferBytes if we cross the imposed length + if (bufferOffset + buffer.limit() > length()) + buffer.limit((int)(length() - bufferOffset)); } catch (CorruptBlockException e) { @@@ -158,76 -89,58 +158,76 @@@ } } - private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException + private void reBufferMmap() { - if (channel.position() != chunk.offset) - channel.position(chunk.offset); + try + { + long position = current(); + assert position < metadata.dataLength; - if (compressed.capacity() < chunk.length) - compressed = ByteBuffer.wrap(new byte[chunk.length]); - else - compressed.clear(); - compressed.limit(chunk.length); + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); - if (channel.read(compressed) != chunk.length) - throw new CorruptBlockException(getPath(), chunk); + Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset); + long segmentOffset = entry.getKey(); + int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset); + ByteBuffer compressedChunk = entry.getValue().duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java - // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes - // in the future this will save a lot of hair-pulling - compressed.flip(); + compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); - // If the checksum is on compressed data we want to check it before uncompressing the data - if (metadata.hasPostCompressionAdlerChecksums) - checkChecksumIfNeeded(chunk, compressed.array(), chunk.length); + buffer.clear(); - try - { - validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0); ++ if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) ++ { ++ FBUtilities.directCheckSum(checksum, compressedChunk); ++ ++ compressedChunk.limit(compressedChunk.capacity()); ++ if (compressedChunk.getInt() != (int) checksum.getValue()) ++ throw new CorruptBlockException(getPath(), chunk); ++ ++ // reset checksum object back to the original (blank) state ++ checksum.reset(); ++ ++ compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); ++ } ++ + try + { + metadata.compressor().uncompress(compressedChunk, buffer); + } + catch (IOException e) + { + throw new CorruptBlockException(getPath(), chunk); + } + finally + { + buffer.flip(); + } + - if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) - { - compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); - - FBUtilities.directCheckSum(checksum, compressedChunk); - - compressedChunk.limit(compressedChunk.capacity()); - if (compressedChunk.getInt() != (int) checksum.getValue()) - throw new CorruptBlockException(getPath(), chunk); - - // reset checksum object back to the original (blank) state - checksum.reset(); - } - + // buffer offset is always aligned + bufferOffset = position & ~(buffer.capacity() - 1); + buffer.position((int) (position - bufferOffset)); + // the length() can be provided at construction time, to override the true (uncompressed) length of the file; + // this is permitted to occur within a compressed segment, so we truncate validBufferBytes if we cross the imposed length + if (bufferOffset + buffer.limit() > length()) + buffer.limit((int)(length() - bufferOffset)); } - catch (IOException e) + catch (CorruptBlockException e) { - throw new CorruptBlockException(getPath(), chunk, e); + throw new CorruptSSTableException(e, getPath()); } - if (!metadata.hasPostCompressionAdlerChecksums) - checkChecksumIfNeeded(chunk, buffer, validBufferBytes); - - - // buffer offset is always aligned - bufferOffset = current & ~(buffer.length - 1); - // the length() can be provided at construction time, to override the true (uncompressed) length of the file; - // this is permitted to occur within a compressed segment, so we truncate validBufferBytes if we cross the imposed length - if (bufferOffset + validBufferBytes > length()) - validBufferBytes = (int)(length() - bufferOffset); } - private void checkChecksumIfNeeded(CompressionMetadata.Chunk chunk, byte[] bytes, int length) throws IOException + @Override + protected void reBuffer() { - if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + if (chunkSegments != null) { - checksum.update(bytes, 0, length); - if (checksum(chunk) != (int) checksum.getValue()) - throw new CorruptBlockException(getPath(), chunk); - // reset checksum object back to the original (blank) state - checksum.reset(); + reBufferMmap(); + } + else + { + reBufferStandard(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org