Repository: cassandra Updated Branches: refs/heads/trunk 6a7985d87 -> 9343bd407
Implement / integrate FileSegmentInputStream.seek() into CommitLogReader Patch by ichaudhry; reviewed by jmckenzie for CASSANDRA-11957 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9343bd40 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9343bd40 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9343bd40 Branch: refs/heads/trunk Commit: 9343bd4070d69f9c1558656deccfd8e3692c2c80 Parents: 6a7985d Author: Imran Chaudhry <imran.chaud...@datastax.com> Authored: Tue Jun 21 13:20:32 2016 -0400 Committer: Josh McKenzie <jmcken...@apache.org> Committed: Tue Jun 21 15:48:00 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLogReader.java | 6 ++- .../EncryptedFileSegmentInputStream.java | 21 ++++++-- .../db/commitlog/SegmentReaderTest.java | 56 ++++++++++++++++++-- 4 files changed, 75 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f3fc2ca..0695654 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.8 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957) * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002) * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966) * Add cross-DC latency metrics (CASSANDRA-11596) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index 6c4bb60..a914cc9 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@ -199,8 +199,6 @@ public class CommitLogReader statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName()); - // TODO: Since EncryptedFileSegmentInputStream doesn't implement seek(), we cannot pre-emptively seek - // to the desired offset in the syncSegment before reading the section and deserializing the mutations. readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc); if (!statusTracker.shouldContinue()) break; @@ -254,6 +252,10 @@ public class CommitLogReader ReadStatusTracker statusTracker, CommitLogDescriptor desc) throws IOException { + // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment + if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position) + reader.seek(minPosition.position); + while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF()) { long mutationStart = reader.getFilePointer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java index cd7f7cb..9da3d50 100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java @@ -38,7 +38,7 @@ public class EncryptedFileSegmentInputStream extends FileSegmentInputStream impl private final ChunkProvider chunkProvider; /** - * offset the decrypted chunks already processed in this segment. + * Offset representing the decrypted chunks already processed in this segment. */ private int totalChunkOffset; @@ -76,8 +76,23 @@ public class EncryptedFileSegmentInputStream extends FileSegmentInputStream impl public void seek(long position) { - // implement this when we actually need it - throw new UnsupportedOperationException(); + long bufferPos = position - totalChunkOffset - segmentOffset; + while (buffer != null && bufferPos > buffer.capacity()) + { + // rebuffer repeatedly until we have reached desired position + buffer.position(buffer.limit()); + + // increases totalChunkOffset + reBuffer(); + bufferPos = position - totalChunkOffset - segmentOffset; + } + if (buffer == null || bufferPos < 0 || bufferPos > buffer.capacity()) + throw new IllegalArgumentException( + String.format("Unable to seek to position %d in %s (%d bytes) in partial mode", + position, + getPath(), + segmentOffset + expectedLength)); + buffer.position((int) bufferPos); } public long bytesPastMark(DataPosition mark) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java index 034566e..88300a1 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java @@ -26,6 +26,8 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Collections; import java.util.Random; +import java.util.function.BiFunction; + import javax.crypto.Cipher; import org.junit.Assert; @@ -106,15 +108,61 @@ public class SegmentReaderTest } } - private ByteBuffer readBytes(DataInput input, int len) throws IOException + private ByteBuffer readBytes(FileDataInput input, int len) { byte[] buf = new byte[len]; - input.readFully(buf); + try + { + input.readFully(buf); + } + catch (IOException e) + { + throw new RuntimeException(e); + } return ByteBuffer.wrap(buf); } + private ByteBuffer readBytesSeek(FileDataInput input, int len) + { + byte[] buf = new byte[len]; + + /// divide output buffer into 5 + int[] offsets = new int[] { 0, len / 5, 2 * len / 5, 3 * len / 5, 4 * len / 5, len }; + + //seek offset + long inputStart = input.getFilePointer(); + + for (int i = 0; i < offsets.length - 1; i++) + { + try + { + // seek to beginning of offet + input.seek(inputStart + offsets[i]); + //read this segment + input.readFully(buf, offsets[i], offsets[i + 1] - offsets[i]); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + return ByteBuffer.wrap(buf); + } + + @Test + public void encryptedSegmenterRead() throws IOException + { + underlyingEncryptedSegmenterTest((s, t) -> readBytes(s, t)); + } + @Test - public void encryptedSegmenter() throws IOException + public void encryptedSegmenterSeek() throws IOException + { + underlyingEncryptedSegmenterTest((s, t) -> readBytesSeek(s, t)); + } + + public void underlyingEncryptedSegmenterTest(BiFunction<FileDataInput, Integer, ByteBuffer> readFun) + throws IOException { EncryptionContext context = EncryptionContextGenerator.createContext(true); CipherFactory cipherFactory = new CipherFactory(context.getTransparentDataEncryptionOptions()); @@ -140,7 +188,7 @@ public class SegmentReaderTest // EncryptedSegmenter includes the Sync header length in the syncSegment.endPosition (value) Assert.assertEquals(plainTextLength, syncSegment.endPosition - CommitLogSegment.SYNC_MARKER_SIZE); - ByteBuffer fileBuffer = readBytes(syncSegment.input, plainTextLength); + ByteBuffer fileBuffer = readFun.apply(syncSegment.input, plainTextLength); plainTextBuffer.position(0); Assert.assertEquals(plainTextBuffer, fileBuffer); }