Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/361c4e4c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/361c4e4c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/361c4e4c Branch: refs/heads/trunk Commit: 361c4e4c7000bcfded4ea81f6ebd59eeed42ab31 Parents: d572ab0 ffd10a9 Author: Robert Stupp <sn...@snazy.de> Authored: Tue May 24 09:42:43 2016 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Tue May 24 09:48:03 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/io/util/NIODataInputStream.java | 12 ++++-------- 2 files changed, 5 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/361c4e4c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 69e8c5d,d7ca9e5..ac5321d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,38 -1,8 +1,39 @@@ -2.2.7 +3.0.7 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) + * Allow compaction strategies to disable early open (CASSANDRA-11754) + * Refactor Materialized View code (CASSANDRA-11475) + * Update Java Driver (CASSANDRA-11615) +Merged from 2.2: + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) - * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add message dropped tasks to nodetool netstats (CASSANDRA-11855) * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) - * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395) +Merged from 2.1: + * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) + + +3.0.6 + * Disallow creating view with a static column (CASSANDRA-11602) + * Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593) + * Potential error replaying commitlog with smallint/tinyint/date/time types (CASSANDRA-11618) + * Fix queries with filtering on counter columns (CASSANDRA-11629) + * Improve tombstone printing in sstabledump (CASSANDRA-11655) + * Fix paging for range queries where all clustering columns are specified (CASSANDRA-11669) + * Don't require HEAP_NEW_SIZE to be set when using G1 (CASSANDRA-11600) + * Fix sstabledump not showing cells after tombstone marker (CASSANDRA-11654) + * Ignore all LocalStrategy keyspaces for streaming and other related + operations (CASSANDRA-11627) + * Ensure columnfilter covers indexed columns for thrift 2i queries (CASSANDRA-11523) + * Only open one sstable scanner per sstable (CASSANDRA-11412) + * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410) + * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485) + * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470) + * Notify indexers of expired rows during compaction (CASSANDRA-11329) + * Properly respond with ProtocolError when a v1/v2 native protocol + header is received (CASSANDRA-11464) + * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120) +Merged from 2.2: + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) * Exit JVM if JMX server fails to startup (CASSANDRA-11540) * Produce a heap dump when exiting on OOM (CASSANDRA-9861) http://git-wip-us.apache.org/repos/asf/cassandra/blob/361c4e4c/src/java/org/apache/cassandra/io/util/NIODataInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/NIODataInputStream.java index e599a69,ebeb8ba..c75d44f --- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java +++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java @@@ -17,8 -17,12 +17,7 @@@ */ package org.apache.cassandra.io.util; --import java.io.Closeable; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; @@@ -37,48 -41,251 +36,45 @@@ import com.google.common.base.Precondit * * NIODataInputStream is not thread safe. */ -public class NIODataInputStream extends InputStream implements DataInput, Closeable +public class NIODataInputStream extends RebufferingInputStream { - private final ReadableByteChannel rbc; - private ByteBuffer buf; + protected final ReadableByteChannel channel; - - public NIODataInputStream(ReadableByteChannel rbc, int bufferSize) - { - Preconditions.checkNotNull(rbc); - Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accomadate a long/double"); - this.rbc = rbc; - buf = ByteBuffer.allocateDirect(bufferSize); - buf.position(0); - buf.limit(0); - } - - @Override - public void readFully(byte[] b) throws IOException - { - readFully(b, 0, b.length); - } - - - @Override - public void readFully(byte[] b, int off, int len) throws IOException - { - int copied = 0; - while (copied < len) - { - int read = read(b, off + copied, len - copied); - if (read < 0) - throw new EOFException(); - copied += read; - } - } - - @Override - public int read(byte b[], int off, int len) throws IOException { - if (b == null) - throw new NullPointerException(); - - // avoid int overflow - if (off < 0 || off > b.length || len < 0 - || len > b.length - off) - throw new IndexOutOfBoundsException(); - - if (len == 0) - return 0; - - int copied = 0; - while (copied < len) - { - if (buf.hasRemaining()) - { - int toCopy = Math.min(len - copied, buf.remaining()); - buf.get(b, off + copied, toCopy); - copied += toCopy; - } - else - { - int read = readNext(); - if (read < 0 && copied == 0) return -1; - if (read <= 0) return copied; - } - } - - return copied; - } - - /* - * Refill the buffer, preserving any unread bytes remaining in the buffer - */ - private int readNext() throws IOException - { - Preconditions.checkState(buf.remaining() != buf.capacity()); - assert(buf.remaining() < 8); - - /* - * If there is data already at the start of the buffer, move the position to the end - * If there is data but not at the start, move it to the start - * Otherwise move the position to 0 so writes start at the beginning of the buffer - * - * We go to the trouble of shuffling the bytes remaining for cases where the buffer isn't fully drained - * while retrieving a multi-byte value while the position is in the middle. - */ - if (buf.position() == 0 && buf.hasRemaining()) - { - buf.position(buf.limit()); - } - else if (buf.hasRemaining()) - { - ByteBuffer dup = buf.duplicate(); - buf.clear(); - buf.put(dup); - } - else - { - buf.position(0); - } - - buf.limit(buf.capacity()); - - int read = 0; - while ((read = rbc.read(buf)) == 0) {} - - buf.flip(); - - return read; - } - - /* - * Read at least minimum bytes and throw EOF if that fails - */ - private void readMinimum(int minimum) throws IOException - { - assert(buf.remaining() < 8); - while (buf.remaining() < minimum) - { - int read = readNext(); - if (read == -1) - { - //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here - buf.position(0); - buf.limit(0); - throw new EOFException(); - } - } - } - - /* - * Ensure the buffer contains the minimum number of readable bytes - */ - private void prepareReadPrimitive(int minimum) throws IOException - { - if (buf.remaining() < minimum) readMinimum(minimum); - } - - @Override - public int skipBytes(int n) throws IOException - { - int skipped = 0; - - while (skipped < n) - { - int skippedThisTime = (int)skip(n - skipped); - if (skippedThisTime <= 0) break; - skipped += skippedThisTime; - } - - return skipped; - } - - @Override - public boolean readBoolean() throws IOException - { - prepareReadPrimitive(1); - return buf.get() != 0; - } - - @Override - public byte readByte() throws IOException - { - prepareReadPrimitive(1); - return buf.get(); - } - - @Override - public int readUnsignedByte() throws IOException - { - prepareReadPrimitive(1); - return buf.get() & 0xff; - } - - @Override - public short readShort() throws IOException - { - prepareReadPrimitive(2); - return buf.getShort(); - } - - @Override - public int readUnsignedShort() throws IOException - { - return readShort() & 0xFFFF; - } - - @Override - public char readChar() throws IOException + private static ByteBuffer makeBuffer(int bufferSize) { - prepareReadPrimitive(2); - return buf.getChar(); - } + ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize); + buffer.position(0); + buffer.limit(0); - @Override - public int readInt() throws IOException - { - prepareReadPrimitive(4); - return buf.getInt(); + return buffer; } - public NIODataInputStream(ReadableByteChannel channel, ByteBuffer buffer) - @Override - public long readLong() throws IOException ++ public NIODataInputStream(ReadableByteChannel channel, int bufferSize) { - super(buffer); - prepareReadPrimitive(8); - return buf.getLong(); - } ++ super(makeBuffer(bufferSize)); - @Override - public float readFloat() throws IOException - { - prepareReadPrimitive(4); - return buf.getFloat(); + Preconditions.checkNotNull(channel); + this.channel = channel; } - public NIODataInputStream(ReadableByteChannel channel, int bufferSize) - { - this(channel, makeBuffer(bufferSize)); - } - @Override - public double readDouble() throws IOException + protected void reBuffer() throws IOException { - prepareReadPrimitive(8); - return buf.getDouble(); - } + Preconditions.checkState(buffer.remaining() == 0); + buffer.clear(); - @Override - public String readLine() throws IOException - { - throw new UnsupportedOperationException(); - } + while ((channel.read(buffer)) == 0) {} - @Override - public String readUTF() throws IOException - { - return DataInputStream.readUTF(this); + buffer.flip(); } @Override public void close() throws IOException { - rbc.close(); - FileUtils.clean(buf); - buf = null; - } - - @Override - public int read() throws IOException - { - return readUnsignedByte(); + channel.close(); + super.close(); ++ FileUtils.clean(buffer); ++ buffer = null; } @Override