Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/afe3fe3d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/afe3fe3d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/afe3fe3d

Branch: refs/heads/cassandra-3.1
Commit: afe3fe3df98f3439bca05cbe132a0e97b66945e4
Parents: a68f8bd 9b97766
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Tue Nov 17 18:56:02 2015 +0000
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue Nov 17 18:56:02 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                                   | 1 +
 src/java/org/apache/cassandra/io/util/RandomAccessReader.java | 7 ++++---
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/afe3fe3d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index fb8f89a,08db386..4cb9275
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 -2.1.12
 +2.2.4
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Make buffered read size configurable (CASSANDRA-10249)
   * Forbid compact clustering column type changes in ALTER TABLE 
(CASSANDRA-8879)
   * Reject incremental repair with subrange repair (CASSANDRA-10422)
   * Add a nodetool command to refresh size_estimates (CASSANDRA-9579)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/afe3fe3d/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 278f55c,d15fe46..751269b
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@@ -23,22 -24,31 +23,23 @@@ import java.nio.ByteBuffer
  import com.google.common.annotations.VisibleForTesting;
  
  import org.apache.cassandra.io.FSReadError;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  
 -public class RandomAccessReader extends RandomAccessFile implements 
FileDataInput
 +public class RandomAccessReader extends AbstractDataInput implements 
FileDataInput
  {
 -    public static final long CACHE_FLUSH_INTERVAL_IN_BYTES = (long) 
Math.pow(2, 27); // 128mb
 -
      // default buffer size, 64Kb
      public static final int DEFAULT_BUFFER_SIZE = 65536;
+     public static final int BUFFER_SIZE = 
Integer.getInteger("cassandra.rar_buffer_size", DEFAULT_BUFFER_SIZE);
  
 -    // absolute filesystem path to the file
 -    private final String filePath;
 -
      // buffer which will cache file blocks
 -    protected byte[] buffer;
 +    protected ByteBuffer buffer;
  
 -    // `current` as current position in file
      // `bufferOffset` is the offset of the beginning of the buffer
      // `markedPointer` folds the offset of the last file mark
 -    protected long bufferOffset, current = 0, markedPointer;
 -    // `validBufferBytes` is the number of bytes in the buffer that are 
actually valid;
 -    //  this will be LESS than buffer capacity if buffer is not full!
 -    protected int validBufferBytes = 0;
 +    protected long bufferOffset, markedPointer;
  
 -    // channel liked with the file, used to retrieve data and force updates.
 -    protected final FileChannel channel;
 +    protected final ChannelProxy channel;
  
      // this can be overridden at construction to a value shorter than the 
true length of the file;
      // if so, it acts as an imposed limit on reads, rather than a convenience 
property
@@@ -55,40 -73,39 +56,40 @@@
          if (bufferSize <= 0)
              throw new IllegalArgumentException("bufferSize must be positive");
  
 -        buffer = new byte[bufferSize];
 -
          // we can cache file length in read-only mode
 -        long fileLength = overrideLength;
 -        if (fileLength <= 0)
 -        {
 -            try
 -            {
 -                fileLength = channel.size();
 -            }
 -            catch (IOException e)
 -            {
 -                throw new FSReadError(e, filePath);
 -            }
 -        }
 +        fileLength = overrideLength <= 0 ? channel.size() : overrideLength;
 +
 +        buffer = allocateBuffer(bufferSize, bufferType);
 +        buffer.limit(0);
 +    }
  
 -        this.fileLength = fileLength;
 -        validBufferBytes = -1; // that will trigger reBuffer() on demand by 
read/seek operations
 +    protected ByteBuffer allocateBuffer(int bufferSize, BufferType bufferType)
 +    {
 +        int size = (int) Math.min(fileLength, bufferSize);
 +        return bufferType.allocate(size);
      }
  
 -    public static RandomAccessReader open(File file, long overrideSize, 
PoolingSegmentedFile owner)
 +    public static RandomAccessReader open(ChannelProxy channel, long 
overrideSize, PoolingSegmentedFile owner)
      {
-         return open(channel, DEFAULT_BUFFER_SIZE, overrideSize, owner);
 -        return open(file, BUFFER_SIZE, overrideSize, owner);
++        return open(channel, BUFFER_SIZE, overrideSize, owner);
      }
  
      public static RandomAccessReader open(File file)
      {
 -        return open(file, -1L);
 +        try (ChannelProxy channel = new ChannelProxy(file))
 +        {
 +            return open(channel);
 +        }
      }
  
 -    public static RandomAccessReader open(File file, long overrideSize)
 +    public static RandomAccessReader open(ChannelProxy channel)
      {
 -        return open(file, BUFFER_SIZE, overrideSize, null);
 +        return open(channel, -1L);
 +    }
 +
 +    public static RandomAccessReader open(ChannelProxy channel, long 
overrideSize)
 +    {
-         return open(channel, DEFAULT_BUFFER_SIZE, overrideSize, null);
++        return open(channel, BUFFER_SIZE, overrideSize, null);
      }
  
      @VisibleForTesting
@@@ -105,15 -129,7 +106,15 @@@
      @VisibleForTesting
      static RandomAccessReader open(SequentialWriter writer)
      {
 -        return open(new File(writer.getPath()), BUFFER_SIZE, null);
 +        try (ChannelProxy channel = new ChannelProxy(writer.getPath()))
 +        {
-             return open(channel, DEFAULT_BUFFER_SIZE, null);
++            return open(channel, BUFFER_SIZE, null);
 +        }
 +    }
 +
 +    public ChannelProxy getChannel()
 +    {
 +        return channel;
      }
  
      /**

Reply via email to