Merge branch cassandra-3.0 into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1020d62
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1020d62
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1020d62

Branch: refs/heads/trunk
Commit: c1020d62ed05f7fa5735af6f09915cdc6850dbeb
Parents: b3e9908 73ca0e1
Author: Benjamin Lerer <b.le...@gmail.com>
Authored: Tue Apr 10 10:02:36 2018 +0200
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Tue Apr 10 10:03:32 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/util/CompressedChunkReader.java          | 65 +++++++++++---------
 2 files changed, 38 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1020d62/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1020d62/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
index 0919c29,0000000..177afb0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
@@@ -1,229 -1,0 +1,238 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.io.util;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.concurrent.ThreadLocalRandom;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.primitives.Ints;
 +
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.compress.CorruptBlockException;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +
 +public abstract class CompressedChunkReader extends AbstractReaderFileProxy 
implements ChunkReader
 +{
 +    final CompressionMetadata metadata;
 +
 +    protected CompressedChunkReader(ChannelProxy channel, CompressionMetadata 
metadata)
 +    {
 +        super(channel, metadata.dataLength);
 +        this.metadata = metadata;
 +        assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a 
power of two
 +    }
 +
 +    @VisibleForTesting
 +    public double getCrcCheckChance()
 +    {
 +        return metadata.parameters.getCrcCheckChance();
 +    }
 +
++    protected final boolean shouldCheckCrc()
++    {
++        return getCrcCheckChance() >= 1d || getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble();
++    }
++
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("CompressedChunkReader.%s(%s - %s, chunk length 
%d, data length %d)",
 +                             getClass().getSimpleName(),
 +                             channel.filePath(),
 +                             metadata.compressor().getClass().getSimpleName(),
 +                             metadata.chunkLength(),
 +                             metadata.dataLength);
 +    }
 +
 +    @Override
 +    public int chunkSize()
 +    {
 +        return metadata.chunkLength();
 +    }
 +
 +    @Override
 +    public BufferType preferredBufferType()
 +    {
 +        return metadata.compressor().preferredBufferType();
 +    }
 +
 +    @Override
 +    public Rebufferer instantiateRebufferer()
 +    {
 +        return new BufferManagingRebufferer.Aligned(this);
 +    }
 +
 +    public static class Standard extends CompressedChunkReader
 +    {
 +        // we read the raw compressed bytes into this buffer, then 
uncompressed them into the provided one.
 +        private final ThreadLocal<ByteBuffer> compressedHolder;
 +
 +        public Standard(ChannelProxy channel, CompressionMetadata metadata)
 +        {
 +            super(channel, metadata);
 +            compressedHolder = ThreadLocal.withInitial(this::allocateBuffer);
 +        }
 +
 +        public ByteBuffer allocateBuffer()
 +        {
 +            return 
allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
 +        }
 +
 +        public ByteBuffer allocateBuffer(int size)
 +        {
 +            return metadata.compressor().preferredBufferType().allocate(size);
 +        }
 +
 +        @Override
 +        public void readChunk(long position, ByteBuffer uncompressed)
 +        {
 +            try
 +            {
 +                // accesses must always be aligned
 +                assert (position & -uncompressed.capacity()) == position;
 +                assert position <= fileLength;
 +
 +                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +                ByteBuffer compressed = compressedHolder.get();
 +
-                 if (compressed.capacity() < chunk.length)
++                boolean shouldCheckCrc = shouldCheckCrc();
++
++                int length = shouldCheckCrc ? chunk.length + Integer.BYTES : 
chunk.length;
++
++                if (compressed.capacity() < length)
 +                {
-                     compressed = allocateBuffer(chunk.length);
++                    compressed = allocateBuffer(length);
 +                    compressedHolder.set(compressed);
 +                }
 +                else
 +                {
 +                    compressed.clear();
 +                }
 +
-                 compressed.limit(chunk.length);
-                 if (channel.read(compressed, chunk.offset) != chunk.length)
++                compressed.limit(length);
++                if (channel.read(compressed, chunk.offset) != length)
 +                    throw new CorruptBlockException(channel.filePath(), 
chunk);
 +
 +                compressed.flip();
 +                uncompressed.clear();
 +
++                compressed.position(0).limit(chunk.length);
++
++                if (shouldCheckCrc)
++                {
++                    int checksum = (int) metadata.checksumType.of(compressed);
++
++                    compressed.limit(length);
++                    if (compressed.getInt() != checksum)
++                        throw new CorruptBlockException(channel.filePath(), 
chunk);
++
++                    compressed.position(0).limit(chunk.length);
++                }
++
 +                try
 +                {
 +                    metadata.compressor().uncompress(compressed, 
uncompressed);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new CorruptBlockException(channel.filePath(), 
chunk, e);
 +                }
 +                finally
 +                {
 +                    uncompressed.flip();
 +                }
- 
-                 if (getCrcCheckChance() >= 1d ||
-                     getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
-                 {
-                     compressed.rewind();
-                     int checksum = (int) metadata.checksumType.of(compressed);
- 
-                     compressed.clear().limit(Integer.BYTES);
-                     if (channel.read(compressed, chunk.offset + chunk.length) 
!= Integer.BYTES
-                                 || compressed.getInt(0) != checksum)
-                         throw new CorruptBlockException(channel.filePath(), 
chunk);
-                 }
 +            }
 +            catch (CorruptBlockException e)
 +            {
 +                throw new CorruptSSTableException(e, channel.filePath());
 +            }
 +        }
 +    }
 +
 +    public static class Mmap extends CompressedChunkReader
 +    {
 +        protected final MmappedRegions regions;
 +
 +        public Mmap(ChannelProxy channel, CompressionMetadata metadata, 
MmappedRegions regions)
 +        {
 +            super(channel, metadata);
 +            this.regions = regions;
 +        }
 +
 +        @Override
 +        public void readChunk(long position, ByteBuffer uncompressed)
 +        {
 +            try
 +            {
 +                // accesses must always be aligned
 +                assert (position & -uncompressed.capacity()) == position;
 +                assert position <= fileLength;
 +
 +                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +
 +                MmappedRegions.Region region = regions.floor(chunk.offset);
 +                long segmentOffset = region.offset();
 +                int chunkOffset = Ints.checkedCast(chunk.offset - 
segmentOffset);
 +                ByteBuffer compressedChunk = region.buffer();
 +
 +                compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
 +
 +                uncompressed.clear();
 +
++                if (shouldCheckCrc())
++                {
++                    int checksum = (int) 
metadata.checksumType.of(compressedChunk);
++
++                    compressedChunk.limit(compressedChunk.capacity());
++                    if (compressedChunk.getInt() != checksum)
++                        throw new CorruptBlockException(channel.filePath(), 
chunk);
++
++                    compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
++                }
++
 +                try
 +                {
 +                    metadata.compressor().uncompress(compressedChunk, 
uncompressed);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new CorruptBlockException(channel.filePath(), 
chunk, e);
 +                }
 +                finally
 +                {
 +                    uncompressed.flip();
 +                }
- 
-                 if (getCrcCheckChance() >= 1d ||
-                     getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
-                 {
-                     compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
- 
-                     int checksum = (int) 
metadata.checksumType.of(compressedChunk);
- 
-                     compressedChunk.limit(compressedChunk.capacity());
-                     if (compressedChunk.getInt() != checksum)
-                         throw new CorruptBlockException(channel.filePath(), 
chunk);
-                 }
 +            }
 +            catch (CorruptBlockException e)
 +            {
 +                throw new CorruptSSTableException(e, channel.filePath());
 +            }
 +
 +        }
 +
 +        public void close()
 +        {
 +            regions.closeQuietly();
 +            super.close();
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to