Repository: cassandra Updated Branches: refs/heads/trunk 5dfe24124 -> c18ce589e
Move sstableRandomAccessReader to nio2 patch by Josh McKenzie; reviewed by Benedict Elliott Smith for CASSANDRA-4050 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c18ce589 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c18ce589 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c18ce589 Branch: refs/heads/trunk Commit: c18ce589efdf480ad4623298ffb7038eb4091afb Parents: 5dfe241 Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Apr 7 15:51:35 2014 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Apr 7 15:51:46 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../compress/CompressedRandomAccessReader.java | 104 ++++++----- .../io/compress/CompressedThrottledReader.java | 2 +- .../cassandra/io/util/AbstractDataInput.java | 24 ++- .../cassandra/io/util/MappedFileDataInput.java | 34 ++-- .../cassandra/io/util/MemoryInputStream.java | 19 +- .../cassandra/io/util/RandomAccessReader.java | 177 +++++++++---------- .../cassandra/io/util/ThrottledReader.java | 2 +- .../apache/cassandra/utils/ByteBufferUtil.java | 9 + .../utils/vint/EncodedDataInputStream.java | 18 +- .../io/util/BufferedRandomAccessFileTest.java | 38 ---- 11 files changed, 208 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 64a53b8..8338f1b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.0 + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) * Remove CQL2 (CASSANDRA-5918) * Add Thrift get_multi_slice call (CASSANDRA-6757) * Optimize fetching multiple cells by name (CASSANDRA-6933) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index 131a4d6..d71964c 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -79,70 +79,80 @@ public class CompressedRandomAccessReader extends RandomAccessReader compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); } + protected ByteBuffer allocateBuffer(int bufferSize) + { + assert Integer.bitCount(bufferSize) == 1; + return ByteBuffer.allocate(bufferSize); + } + @Override protected void reBuffer() { try { - decompressChunk(metadata.chunkFor(current)); - } - catch (CorruptBlockException e) - { - throw new CorruptSSTableException(e, getPath()); - } - catch (IOException e) - { - throw new FSReadError(e, getPath()); - } - } - - private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException - { - if (channel.position() != chunk.offset) - channel.position(chunk.offset); + long position = current(); + assert position < metadata.dataLength; - if (compressed.capacity() < chunk.length) - compressed = ByteBuffer.wrap(new byte[chunk.length]); - else - compressed.clear(); - compressed.limit(chunk.length); + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); - if (channel.read(compressed) != chunk.length) - throw new CorruptBlockException(getPath(), chunk); + if (channel.position() != chunk.offset) + channel.position(chunk.offset); - // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes - // in the future this will save a lot of hair-pulling - compressed.flip(); - try - { - validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0); - } - catch (IOException e) - { - throw new CorruptBlockException(getPath(), chunk); - } + if (compressed.capacity() < chunk.length) + compressed = ByteBuffer.wrap(new byte[chunk.length]); + else + compressed.clear(); + compressed.limit(chunk.length); - if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble()) - { + if (channel.read(compressed) != chunk.length) + throw new CorruptBlockException(getPath(), chunk); - if (metadata.hasPostCompressionAdlerChecksums) + // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes + // in the future this will save a lot of hair-pulling + compressed.flip(); + buffer.clear(); + int decompressedBytes; + try { - checksum.update(compressed.array(), 0, chunk.length); + decompressedBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer.array(), 0); + buffer.limit(decompressedBytes); } - else + catch (IOException e) { - checksum.update(buffer, 0, validBufferBytes); + throw new CorruptBlockException(getPath(), chunk); } - if (checksum(chunk) != (int) checksum.getValue()) - throw new CorruptBlockException(getPath(), chunk); + if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble()) + { - // reset checksum object back to the original (blank) state - checksum.reset(); - } + if (metadata.hasPostCompressionAdlerChecksums) + { + checksum.update(compressed.array(), 0, chunk.length); + } + else + { + checksum.update(buffer.array(), 0, decompressedBytes); + } - // buffer offset is always aligned - bufferOffset = current & ~(buffer.length - 1); + if (checksum(chunk) != (int) checksum.getValue()) + throw new CorruptBlockException(getPath(), chunk); + + // reset checksum object back to the original (blank) state + checksum.reset(); + } + + // buffer offset is always aligned + bufferOffset = position & ~(buffer.capacity() - 1); + buffer.position((int) (position - bufferOffset)); + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, getPath()); + } + catch (IOException e) + { + throw new FSReadError(e, getPath()); + } } private int checksum(CompressionMetadata.Chunk chunk) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java index c5ae795..2495d17 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java @@ -37,7 +37,7 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader protected void reBuffer() { - limiter.acquire(buffer.length); + limiter.acquire(buffer.capacity()); super.reBuffer(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/AbstractDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java index ff8b6b2..2815260 100644 --- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java +++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java @@ -21,12 +21,20 @@ import java.io.*; public abstract class AbstractDataInput extends InputStream implements DataInput { - protected abstract void seekInternal(int position); - protected abstract int getPosition(); + protected abstract void seek(long position) throws IOException; + protected abstract long getPosition(); + protected abstract long getPositionLimit(); - /* - !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile. - */ + public int skipBytes(int n) throws IOException + { + if (n <= 0) + return 0; + long oldPosition = getPosition(); + seek(Math.min(getPositionLimit(), oldPosition + n)); + long skipped = getPosition() - oldPosition; + assert skipped >= 0 && skipped <= n; + return (int) skipped; + } /** * Reads a boolean from the current position in this file. Blocks until one @@ -214,7 +222,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput public final String readLine() throws IOException { StringBuilder line = new StringBuilder(80); // Typical line length boolean foundTerminator = false; - int unreadPosition = 0; + long unreadPosition = -1; while (true) { int nextByte = read(); switch (nextByte) { @@ -222,7 +230,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput return line.length() != 0 ? line.toString() : null; case (byte) '\r': if (foundTerminator) { - seekInternal(unreadPosition); + seek(unreadPosition); return line.toString(); } foundTerminator = true; @@ -233,7 +241,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput return line.toString(); default: if (foundTerminator) { - seekInternal(unreadPosition); + seek(unreadPosition); return line.toString(); } line.append((char) nextByte); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java index f397ddc..0479256 100644 --- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java @@ -24,7 +24,7 @@ import java.nio.channels.FileChannel; import org.apache.cassandra.utils.ByteBufferUtil; -public class MappedFileDataInput extends AbstractDataInput implements FileDataInput +public class MappedFileDataInput extends AbstractDataInput implements FileDataInput, DataInput { private final MappedByteBuffer buffer; private final String filename; @@ -49,12 +49,6 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn this.position = position; } - // don't make this public, this is only for seeking WITHIN the current mapped segment - protected void seekInternal(int pos) - { - position = pos; - } - // Only use when we know the seek in within the mapped segment. Throws an // IOException otherwise. public void seek(long pos) throws IOException @@ -63,17 +57,22 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn if (inSegmentPos < 0 || inSegmentPos > buffer.capacity()) throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity())); - seekInternal((int) inSegmentPos); + position = (int) inSegmentPos; } public long getFilePointer() { - return segmentOffset + (long)position; + return segmentOffset + position; } - protected int getPosition() + protected long getPosition() { - return position; + return segmentOffset + position; + } + + protected long getPositionLimit() + { + return segmentOffset + buffer.capacity(); } @Override @@ -85,7 +84,7 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn public void reset(FileMark mark) throws IOException { assert mark instanceof MappedFileDataInputMark; - seekInternal(((MappedFileDataInputMark) mark).position); + position = ((MappedFileDataInputMark) mark).position; } public FileMark mark() @@ -162,17 +161,6 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn throw new UnsupportedOperationException("use readBytes instead"); } - public int skipBytes(int n) throws IOException - { - assert n >= 0 : "skipping negative bytes is illegal: " + n; - if (n == 0) - return 0; - int oldPosition = position; - assert ((long)oldPosition) + n <= Integer.MAX_VALUE; - position = Math.min(buffer.capacity(), position + n); - return position - oldPosition; - } - private static class MappedFileDataInputMark implements FileMark { int position; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/MemoryInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java index eee030a..73ccc1b 100644 --- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java +++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java @@ -17,9 +17,10 @@ */ package org.apache.cassandra.io.util; +import java.io.DataInput; import java.io.IOException; -public class MemoryInputStream extends AbstractDataInput +public class MemoryInputStream extends AbstractDataInput implements DataInput { private final Memory mem; private int position = 0; @@ -40,20 +41,24 @@ public class MemoryInputStream extends AbstractDataInput position += count; } - protected void seekInternal(int pos) + protected void seek(long pos) { - position = pos; + position = (int) pos; } - protected int getPosition() + protected long getPosition() { return position; } - public int skipBytes(int n) throws IOException + protected long getPositionLimit() { - seekInternal(getPosition() + n); - return position; + return mem.size(); + } + + protected long length() + { + return mem.size(); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 8347cd9..e395510 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -20,12 +20,14 @@ package org.apache.cassandra.io.util; import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.utils.ByteBufferUtil; -public class RandomAccessReader extends RandomAccessFile implements FileDataInput +public class RandomAccessReader extends AbstractDataInput implements FileDataInput { public static final long CACHE_FLUSH_INTERVAL_IN_BYTES = (long) Math.pow(2, 27); // 128mb @@ -36,17 +38,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu private final String filePath; // buffer which will cache file blocks - protected byte[] buffer; + protected ByteBuffer buffer; - // `current` as current position in file // `bufferOffset` is the offset of the beginning of the buffer // `markedPointer` folds the offset of the last file mark - protected long bufferOffset, current = 0, markedPointer; - // `validBufferBytes` is the number of bytes in the buffer that are actually valid; - // this will be LESS than buffer capacity if buffer is not full! - protected int validBufferBytes = 0; + protected long bufferOffset, markedPointer; - // channel liked with the file, used to retrieve data and force updates. + // channel linked with the file, used to retrieve data and force updates. protected final FileChannel channel; private final long fileLength; @@ -55,19 +53,23 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException { - super(file, "r"); - this.owner = owner; - channel = super.getChannel(); filePath = file.getAbsolutePath(); + try + { + channel = FileChannel.open(file.toPath(), StandardOpenOption.READ); + } + catch (IOException e) + { + throw new FileNotFoundException(filePath); + } + // allocating required size of the buffer if (bufferSize <= 0) throw new IllegalArgumentException("bufferSize must be positive"); - buffer = new byte[bufferSize]; - // we can cache file length in read-only mode try { @@ -77,7 +79,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu { throw new FSReadError(e, filePath); } - validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations + buffer = allocateBuffer(bufferSize); + buffer.limit(0); + } + + protected ByteBuffer allocateBuffer(int bufferSize) + { + return ByteBuffer.allocate((int) Math.min(fileLength, bufferSize)); } public static RandomAccessReader open(File file, PoolingSegmentedFile owner) @@ -97,7 +105,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu { return new RandomAccessReader(file, bufferSize, owner); } - catch (FileNotFoundException e) + catch (IOException e) { throw new RuntimeException(e); } @@ -109,31 +117,31 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, null); } + // channel extends FileChannel, impl SeekableByteChannel. Safe to cast. + public FileChannel getChannel() + { + return channel; + } + /** * Read data from file starting from current currentOffset to populate buffer. */ protected void reBuffer() { - resetBuffer(); + bufferOffset += buffer.position(); + buffer.clear(); + assert bufferOffset < fileLength; try { - if (bufferOffset >= channel.size()) - return; - channel.position(bufferOffset); // setting channel position - - int read = 0; - - while (read < buffer.length) + while (buffer.hasRemaining()) { - int n = super.read(buffer, read, buffer.length - read); + int n = channel.read(buffer); if (n < 0) break; - read += n; } - - validBufferBytes = read; + buffer.flip(); } catch (IOException e) { @@ -144,7 +152,12 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu @Override public long getFilePointer() { - return current; + return current(); + } + + protected long current() + { + return bufferOffset + (buffer == null ? 0 : buffer.position()); } public String getPath() @@ -154,7 +167,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu public int getTotalBufferSize() { - return buffer.length; + return buffer.capacity(); } public void reset() @@ -164,14 +177,14 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu public long bytesPastMark() { - long bytes = current - markedPointer; + long bytes = current() - markedPointer; assert bytes >= 0; return bytes; } public FileMark mark() { - markedPointer = current; + markedPointer = current(); return new BufferedRandomAccessFileMark(markedPointer); } @@ -184,7 +197,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu public long bytesPastMark(FileMark mark) { assert mark instanceof BufferedRandomAccessFileMark; - long bytes = current - ((BufferedRandomAccessFileMark) mark).pointer; + long bytes = current() - ((BufferedRandomAccessFileMark) mark).pointer; assert bytes >= 0; return bytes; } @@ -202,17 +215,6 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu return length() - getFilePointer(); } - protected int bufferCursor() - { - return (int) (current - bufferOffset); - } - - protected void resetBuffer() - { - bufferOffset = current; - validBufferBytes = 0; - } - @Override public void close() { @@ -233,11 +235,12 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu public void deallocate() { + bufferOffset += buffer.position(); buffer = null; // makes sure we don't use this after it's ostensibly closed try { - super.close(); + channel.close(); } catch (IOException e) { @@ -270,17 +273,28 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu if (newPosition < 0) throw new IllegalArgumentException("new position should not be negative"); - if (newPosition > length()) // it is save to call length() in read-only mode - throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode", + if (newPosition >= length()) // it is save to call length() in read-only mode + { + if (newPosition > length()) + throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode", newPosition, getPath(), length())); + buffer.limit(0); + bufferOffset = newPosition; + return; + } - current = newPosition; - - if (newPosition > (bufferOffset + validBufferBytes) || newPosition < bufferOffset) - reBuffer(); + if (newPosition >= bufferOffset && newPosition < bufferOffset + buffer.limit()) + { + buffer.position((int) (newPosition - bufferOffset)); + return; + } + // Set current location to newPosition and clear buffer so reBuffer calculates from newPosition + bufferOffset = newPosition; + buffer.clear(); + reBuffer(); + assert current() == newPosition; } - @Override // -1 will be returned if there is nothing to read; higher-level methods like readInt // or readFully (from RandomAccessFile) will throw EOFException but this should not public int read() @@ -291,12 +305,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu if (isEOF()) return -1; // required by RandomAccessFile - if (current >= bufferOffset + buffer.length || validBufferBytes == -1) + if (!buffer.hasRemaining()) reBuffer(); - assert current >= bufferOffset && current < bufferOffset + validBufferBytes; - - return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff; + return (int)buffer.get() & 0xff; } @Override @@ -319,47 +331,41 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu if (isEOF()) return -1; - if (current >= bufferOffset + buffer.length || validBufferBytes == -1) + if (!buffer.hasRemaining()) reBuffer(); - assert current >= bufferOffset && current < bufferOffset + validBufferBytes - : String.format("File (%s), current offset %d, buffer offset %d, buffer limit %d", - getPath(), - current, - bufferOffset, - validBufferBytes); - - int toCopy = Math.min(length, validBufferBytes - bufferCursor()); - - System.arraycopy(buffer, bufferCursor(), buff, offset, toCopy); - current += toCopy; - + int toCopy = Math.min(length, buffer.remaining()); + buffer.get(buff, offset, toCopy); return toCopy; } public ByteBuffer readBytes(int length) throws EOFException { assert length >= 0 : "buffer length should not be negative: " + length; - - byte[] buff = new byte[length]; - try { - readFully(buff); // reading data buffer + ByteBuffer result = ByteBuffer.allocate(length); + while (result.hasRemaining()) + { + if (isEOF()) + throw new EOFException(); + if (!buffer.hasRemaining()) + reBuffer(); + ByteBufferUtil.put(buffer, result); + } + result.flip(); + return result; } catch (EOFException e) { throw e; } - catch (IOException e) + catch (Exception e) { throw new FSReadError(e, filePath); } - - return ByteBuffer.wrap(buff); } - @Override public long length() { return fileLength; @@ -367,24 +373,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu public long getPosition() { - return current; - } - - @Override - public void write(int value) - { - throw new UnsupportedOperationException(); + return bufferOffset + buffer.position(); } - @Override - public void write(byte[] buffer) - { - throw new UnsupportedOperationException(); - } - - @Override - public void write(byte[] buffer, int offset, int length) + public long getPositionLimit() { - throw new UnsupportedOperationException(); + return length(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/ThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java index b12a8a8..b9b645a 100644 --- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java +++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java @@ -38,7 +38,7 @@ public class ThrottledReader extends RandomAccessReader protected void reBuffer() { - limiter.acquire(buffer.length); + limiter.acquire(buffer.capacity()); super.reBuffer(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index f20a46a..91aa6f7 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -273,6 +273,15 @@ public class ByteBufferUtil FastByteOperations.copy(src, srcPos, dst, dstPos, length); } + public static int put(ByteBuffer src, ByteBuffer trg) + { + int length = Math.min(src.remaining(), trg.remaining()); + arrayCopy(src, src.position(), trg, trg.position(), length); + trg.position(trg.position() + length); + src.position(src.position() + length); + return length; + } + public static void writeWithLength(ByteBuffer bytes, DataOutputPlus out) throws IOException { out.writeInt(bytes.remaining()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java index b35d180..6385e5c 100644 --- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java +++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java @@ -25,10 +25,10 @@ import org.apache.cassandra.io.util.AbstractDataInput; /** * Borrows idea from * https://developers.google.com/protocol-buffers/docs/encoding#varints - * + * * Should be used with EncodedDataOutputStream */ -public class EncodedDataInputStream extends AbstractDataInput +public class EncodedDataInputStream extends AbstractDataInput implements DataInput { private DataInput input; @@ -47,12 +47,22 @@ public class EncodedDataInputStream extends AbstractDataInput return input.readByte() & 0xFF; } - protected void seekInternal(int position) + protected void seek(long position) + { + throw new UnsupportedOperationException(); + } + + protected long getPosition() + { + throw new UnsupportedOperationException(); + } + + protected long getPositionLimit() { throw new UnsupportedOperationException(); } - protected int getPosition() + protected long length() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index 75de261..8053553 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -547,34 +547,6 @@ public class BufferedRandomAccessFileTest } }, IllegalArgumentException.class); - // Any write() call should fail - expectException(new Callable<Object>() - { - public Object call() throws IOException - { - copy.write(1); - return null; - } - }, UnsupportedOperationException.class); - - expectException(new Callable<Object>() - { - public Object call() throws IOException - { - copy.write(new byte[1]); - return null; - } - }, UnsupportedOperationException.class); - - expectException(new Callable<Object>() - { - public Object call() throws IOException - { - copy.write(new byte[3], 0, 2); - return null; - } - }, UnsupportedOperationException.class); - copy.seek(0); copy.skipBytes(5); @@ -619,16 +591,6 @@ public class BufferedRandomAccessFileTest } } - @Test (expected=IOException.class) - public void testSetLengthDuringReadMode() throws IOException - { - File tmpFile = File.createTempFile("set_length_during_read_mode", "bin"); - try (RandomAccessReader file = RandomAccessReader.open(tmpFile)) - { - file.setLength(4L); - } - } - private SequentialWriter createTempFile(String name) throws IOException { File tempFile = File.createTempFile(name, null);