http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
new file mode 100644
index 0000000..573ae5c
--- /dev/null
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReadAheadEntryReader.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Lists;
+import 
com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
+import com.twitter.distributedlog.exceptions.DLIllegalStateException;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test Case {@link ReadAheadEntryReader}
+ */
+public class TestReadAheadEntryReader extends TestDistributedLogBase {
+
+    private static final int MAX_CACHED_ENTRIES = 5;
+    private static final int NUM_PREFETCH_ENTRIES = 10;
+
+    @Rule
+    public TestName runtime = new TestName();
+    private DistributedLogConfiguration baseConf;
+    private OrderedScheduler scheduler;
+    private BookKeeperClient bkc;
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        baseConf = new DistributedLogConfiguration();
+        baseConf.addConfiguration(conf);
+        baseConf.setOutputBufferSize(0);
+        baseConf.setPeriodicFlushFrequencyMilliSeconds(0);
+        baseConf.setImmediateFlushEnabled(false);
+        baseConf.setReadAheadMaxRecords(MAX_CACHED_ENTRIES);
+        baseConf.setNumPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES);
+        baseConf.setMaxPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES);
+        bkc = BookKeeperClientBuilder.newBuilder()
+                .name("test-bk")
+                .dlConfig(conf)
+                .ledgersPath("/ledgers")
+                .zkServers(bkutil.getZkServers())
+                .build();
+        scheduler = OrderedScheduler.newBuilder()
+                .name("test-read-ahead-entry-reader")
+                .corePoolSize(1)
+                .build();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != bkc) {
+            bkc.close();
+        }
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
+        super.teardown();
+    }
+
+    private ReadAheadEntryReader createEntryReader(String streamName,
+                                                   DLSN fromDLSN,
+                                                   BKDistributedLogManager dlm,
+                                                   DistributedLogConfiguration 
conf)
+            throws Exception {
+        BKLogReadHandler readHandler = dlm.createReadHandler(
+                Optional.<String>absent(),
+                true);
+        LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
+                conf,
+                bkc.get(),
+                scheduler,
+                NullStatsLogger.INSTANCE,
+                AsyncFailureInjector.NULL);
+        return new ReadAheadEntryReader(
+                streamName,
+                fromDLSN,
+                conf,
+                readHandler,
+                entryStore,
+                scheduler,
+                Ticker.systemTicker(),
+                new AlertStatsLogger(NullStatsLogger.INSTANCE, "test-alert"));
+    }
+
+    private void ensureOrderSchedulerEmpty(String streamName) throws Exception 
{
+        final Promise<Void> promise = new Promise<Void>();
+        scheduler.submit(streamName, new Runnable() {
+            @Override
+            public void run() {
+                FutureUtils.setValue(promise, null);
+            }
+        });
+        FutureUtils.result(promise);
+    }
+
+    void generateCompletedLogSegments(DistributedLogManager dlm,
+                                      long numCompletedSegments,
+                                      long segmentSize) throws Exception {
+        generateCompletedLogSegments(dlm, numCompletedSegments, segmentSize, 
1L);
+    }
+
+    void generateCompletedLogSegments(DistributedLogManager dlm,
+                                      long numCompletedSegments,
+                                      long segmentSize,
+                                      long startTxId) throws Exception {
+
+        long txid = startTxId;
+        for (long i = 0; i < numCompletedSegments; i++) {
+            AsyncLogWriter writer = 
FutureUtils.result(dlm.openAsyncLogWriter());
+            for (long j = 1; j <= segmentSize; j++) {
+                
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
+                ctrlRecord.setControl();
+                FutureUtils.result(writer.write(ctrlRecord));
+            }
+            Utils.close(writer);
+        }
+    }
+
+    AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm,
+                                              DistributedLogConfiguration conf,
+                                              long segmentSize) throws 
Exception {
+        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        for (long i = 1L; i <= segmentSize; i++) {
+            
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
+            ctrlRecord.setControl();
+            FutureUtils.result(writer.write(ctrlRecord));
+        }
+        return writer;
+    }
+
+    void expectAlreadyTruncatedTransactionException(ReadAheadEntryReader 
reader,
+                                                    String errMsg)
+            throws Exception {
+        try {
+            reader.checkLastException();
+            fail(errMsg);
+        } catch (AlreadyTruncatedTransactionException atte) {
+            // expected
+        }
+    }
+
+    void expectIllegalStateException(ReadAheadEntryReader reader,
+                                     String errMsg)
+            throws Exception {
+        try {
+            reader.checkLastException();
+            fail(errMsg);
+        } catch (DLIllegalStateException le) {
+            // expected
+        }
+    }
+
+    void expectNoException(ReadAheadEntryReader reader) throws Exception {
+        reader.checkLastException();
+    }
+
+    //
+    // Test Positioning
+    //
+
+    @Test(timeout = 60000)
+    public void testStartWithEmptySegmentList() throws Exception {
+        String streamName = runtime.getMethodName();
+        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);
+        ReadAheadEntryReader readAheadEntryReader =
+                createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf);
+
+        readAheadEntryReader.start(Lists.<LogSegmentMetadata>newArrayList());
+
+        ensureOrderSchedulerEmpty(streamName);
+        assertFalse("ReadAhead should not be initialized with empty segment 
list",
+                readAheadEntryReader.isInitialized());
+        assertTrue("ReadAhead should be empty when it isn't initialized",
+                readAheadEntryReader.isCacheEmpty());
+        assertFalse("ReadAhead should not be marked as caught up when it isn't 
initialized",
+                readAheadEntryReader.isReadAheadCaughtUp());
+
+        // generate list of log segments
+        generateCompletedLogSegments(dlm, 1, MAX_CACHED_ENTRIES / 2 + 1);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        // notify the readahead reader with new segment lsit
+        readAheadEntryReader.onSegmentsUpdated(segments);
+
+        // check the reader state after initialization
+        ensureOrderSchedulerEmpty(streamName);
+        assertTrue("ReadAhead should be initialized with non-empty segment 
list",
+                readAheadEntryReader.isInitialized());
+        assertNotNull("current segment reader should be initialized",
+                readAheadEntryReader.getCurrentSegmentReader());
+        assertEquals("current segment sequence number should be " + 
segments.get(0).getLogSegmentSequenceNumber(),
+                segments.get(0).getLogSegmentSequenceNumber(), 
readAheadEntryReader.getCurrentSegmentSequenceNumber());
+        assertNull("there should be no next segment reader",
+                readAheadEntryReader.getNextSegmentReader());
+        assertTrue("there should be no remaining segment readers",
+                readAheadEntryReader.getSegmentReaders().isEmpty());
+
+        Utils.close(readAheadEntryReader);
+        dlm.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testInitializeMultipleClosedLogSegments0() throws Exception {
+        // 5 completed log segments, start from the begin
+        testInitializeMultipleClosedLogSegments(5, DLSN.InitialDLSN, 0);
+    }
+
+    @Test(timeout = 60000)
+    public void testInitializeMultipleClosedLogSegments1() throws Exception {
+        // 5 completed log segments, start from the 4th segment and it should 
skip first 3 log segments
+        testInitializeMultipleClosedLogSegments(5, new DLSN(4L, 0L, 0L), 3);
+    }
+
+    private void testInitializeMultipleClosedLogSegments(
+            int numLogSegments,
+            DLSN fromDLSN,
+            int expectedCurrentSegmentIdx
+    ) throws Exception {
+        String streamName = runtime.getMethodName();
+        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);
+
+        // generate list of log segments
+        generateCompletedLogSegments(dlm, 1, MAX_CACHED_ENTRIES / 2 + 1, 1L);
+        generateCompletedLogSegments(dlm, numLogSegments - 1, 1, 
MAX_CACHED_ENTRIES + 2);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be " 
+ numLogSegments,
+                numLogSegments, segments.size());
+
+        ReadAheadEntryReader readAheadEntryReader =
+                createEntryReader(streamName, fromDLSN, dlm, baseConf);
+        readAheadEntryReader.start(segments);
+
+        ensureOrderSchedulerEmpty(streamName);
+        assertTrue("ReadAhead should be initialized with non-empty segment 
list",
+                readAheadEntryReader.isInitialized());
+        assertNotNull("current segment reader should be initialized",
+                readAheadEntryReader.getCurrentSegmentReader());
+        assertTrue("current segment reader should be open and started",
+                readAheadEntryReader.getCurrentSegmentReader().isReaderOpen()
+                        && 
readAheadEntryReader.getCurrentSegmentReader().isReaderStarted());
+        assertEquals("current segment reader should read " + 
segments.get(expectedCurrentSegmentIdx),
+                segments.get(expectedCurrentSegmentIdx),
+                readAheadEntryReader.getCurrentSegmentReader().getSegment());
+        assertEquals("current segment sequence number should be "
+                + 
segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
+                
segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
+                readAheadEntryReader.getCurrentSegmentSequenceNumber());
+        assertNull("next segment reader should not be initialized since it is 
a closed log segment",
+                readAheadEntryReader.getNextSegmentReader());
+        assertEquals("there should be " + (numLogSegments - 
(expectedCurrentSegmentIdx + 1))
+                + " remaining segment readers",
+                numLogSegments - (expectedCurrentSegmentIdx + 1),
+                readAheadEntryReader.getSegmentReaders().size());
+        int segmentIdx = expectedCurrentSegmentIdx + 1;
+        for (ReadAheadEntryReader.SegmentReader reader : 
readAheadEntryReader.getSegmentReaders()) {
+            LogSegmentMetadata expectedSegment = segments.get(segmentIdx);
+            assertEquals("Segment should " + expectedSegment,
+                    expectedSegment, reader.getSegment());
+            assertTrue("Segment reader for " + expectedSegment + " should be 
open",
+                    reader.isReaderOpen());
+            assertFalse("Segment reader for " + expectedSegment + " should not 
be started",
+                    reader.isReaderStarted());
+            ++segmentIdx;
+        }
+
+        Utils.close(readAheadEntryReader);
+        dlm.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testPositioningAtInvalidLogSegment() throws Exception {
+        String streamName = runtime.getMethodName();
+        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);
+
+        // generate list of log segments
+        generateCompletedLogSegments(dlm, 3, 2);
+        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L)));
+
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+
+        // positioning on a truncated log segment (segment 1)
+        ReadAheadEntryReader readAheadEntryReader =
+                createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf);
+        readAheadEntryReader.start(segments);
+        // ensure initialization to complete
+        ensureOrderSchedulerEmpty(streamName);
+        expectAlreadyTruncatedTransactionException(readAheadEntryReader,
+                "should fail on positioning to a truncated log segment");
+
+        // positioning on a partially truncated log segment (segment 2) before 
min active dlsn
+        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 
0L), dlm, baseConf);
+        readAheadEntryReader.start(segments);
+        // ensure initialization to complete
+        ensureOrderSchedulerEmpty(streamName);
+        expectAlreadyTruncatedTransactionException(readAheadEntryReader,
+                "should fail on positioning to a partially truncated log 
segment");
+
+        // positioning on a partially truncated log segment (segment 2) after 
min active dlsn
+        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 
0L), dlm, baseConf);
+        readAheadEntryReader.start(segments);
+        // ensure initialization to complete
+        ensureOrderSchedulerEmpty(streamName);
+        expectNoException(readAheadEntryReader);
+    }
+
+    @Test(timeout = 60000)
+    public void testPositioningIgnoreTruncationStatus() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(baseConf);
+        confLocal.setIgnoreTruncationStatus(true);
+
+        String streamName = runtime.getMethodName();
+        BKDistributedLogManager dlm = createNewDLM(confLocal, streamName);
+
+        // generate list of log segments
+        generateCompletedLogSegments(dlm, 3, 2);
+        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L)));
+
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+
+        // positioning on a truncated log segment (segment 1)
+        ReadAheadEntryReader readAheadEntryReader =
+                createEntryReader(streamName, DLSN.InitialDLSN, dlm, 
confLocal);
+        readAheadEntryReader.start(segments);
+        // ensure initialization to complete
+        ensureOrderSchedulerEmpty(streamName);
+        expectNoException(readAheadEntryReader);
+
+        // positioning on a partially truncated log segment (segment 2) before 
min active dlsn
+        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 
0L), dlm, confLocal);
+        readAheadEntryReader.start(segments);
+        // ensure initialization to complete
+        ensureOrderSchedulerEmpty(streamName);
+        expectNoException(readAheadEntryReader);
+
+        // positioning on a partially truncated log segment (segment 2) after 
min active dlsn
+        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 
0L), dlm, confLocal);
+        readAheadEntryReader.start(segments);
+        // ensure initialization to complete
+        ensureOrderSchedulerEmpty(streamName);
+        expectNoException(readAheadEntryReader);
+    }
+
+    //
+    // Test Reinitialization
+    //
+
+    @Test(timeout = 60000)
+    public void testLogSegmentSequenceNumberGap() throws Exception {
+        String streamName = runtime.getMethodName();
+        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);
+
+        // generate list of log segments
+        generateCompletedLogSegments(dlm, 3, 2);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+
+        ReadAheadEntryReader readAheadEntryReader =
+                createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf);
+        readAheadEntryReader.start(segments.subList(0, 1));
+        int expectedCurrentSegmentIdx = 0;
+        ensureOrderSchedulerEmpty(streamName);
+        assertTrue("ReadAhead should be initialized with non-empty segment 
list",
+                readAheadEntryReader.isInitialized());
+        assertNotNull("current segment reader should be initialized",
+                readAheadEntryReader.getCurrentSegmentReader());
+        assertTrue("current segment reader should be open and started",
+                readAheadEntryReader.getCurrentSegmentReader().isReaderOpen()
+                        && 
readAheadEntryReader.getCurrentSegmentReader().isReaderStarted());
+        assertEquals("current segment reader should read " + 
segments.get(expectedCurrentSegmentIdx),
+                segments.get(expectedCurrentSegmentIdx),
+                readAheadEntryReader.getCurrentSegmentReader().getSegment());
+        assertEquals("current segment sequence number should be "
+                + 
segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
+                
segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
+                readAheadEntryReader.getCurrentSegmentSequenceNumber());
+        assertNull("next segment reader should not be initialized since it is 
a closed log segment",
+                readAheadEntryReader.getNextSegmentReader());
+
+        readAheadEntryReader.onSegmentsUpdated(segments.subList(2, 3));
+        ensureOrderSchedulerEmpty(streamName);
+        expectIllegalStateException(readAheadEntryReader,
+                "inconsistent log segment found");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
index 3c8669b..1739b47 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog;
 
+import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import org.slf4j.Logger;
@@ -121,6 +122,9 @@ public class TestReader implements 
FutureEventListener<LogRecordWithDLSN> {
                     }
                     nextDLSN = dlsn;
                     LOG.info("Positioned reader {} at {}", readerName, dlsn);
+                    if (null != TestReader.this.reader) {
+                        Utils.close(TestReader.this.reader);
+                    }
                     TestReader.this.reader = reader;
                     readNext();
                     readyLatch.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
index 027b012..ee53362 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
 import com.twitter.distributedlog.feature.CoreFeatureKeys;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryReader;
 import com.twitter.distributedlog.util.FailpointUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Utils;
@@ -318,11 +319,11 @@ public class TestRollLogSegments extends 
TestDistributedLogBase {
         }
         assertEquals(expectedWriterPosition, 
getLedgerHandle(writer).getLastAddConfirmed());
         assertEquals(expectedLac, inspector.readLastConfirmed());
-        LedgerReadPosition readPosition = 
reader.bkLedgerManager.readAheadWorker.getNextReadAheadPosition();
+        EntryPosition readPosition = 
reader.getReadAheadReader().getNextEntryPosition();
         logger.info("ReadAhead moved read position {} : ", readPosition);
         while (readPosition.getEntryId() < expectedReaderPosition) {
             Thread.sleep(1000);
-            readPosition = 
reader.bkLedgerManager.readAheadWorker.getNextReadAheadPosition();
+            readPosition = reader.getReadAheadReader().getNextEntryPosition();
             logger.info("ReadAhead moved read position {} : ", readPosition);
         }
         assertEquals(expectedReaderPosition, readPosition.getEntryId());
@@ -386,7 +387,12 @@ public class TestRollLogSegments extends 
TestDistributedLogBase {
         // Writer moved to lac = 11, while reader knows lac = 10 and moving to 
wait on 11
         checkAndWaitWriterReaderPosition(perStreamWriter, 11, reader, 11, 
readLh, 10);
 
-        while (null == 
reader.bkLedgerManager.readAheadWorker.getMetadataNotification()) {
+        while (true) {
+            BKLogSegmentEntryReader entryReader =
+                    (BKLogSegmentEntryReader) 
reader.getReadAheadReader().getCurrentSegmentReader().getEntryReader();
+            if (null != entryReader && null != 
entryReader.getOutstandingLongPoll()) {
+                break;
+            }
             Thread.sleep(1000);
         }
         logger.info("Waiting for long poll getting interrupted with metadata 
changed");

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java
index 98d2020..011fc70 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestTruncate.java
@@ -329,7 +329,6 @@ public class TestTruncate extends TestDistributedLogBase {
         LogRecord r = reader.readNext(false);
         while (null != r) {
             DLMTestUtil.verifyLogRecord(r);
-            LOG.trace("Read entry {}.", r.getTransactionId());
             assertEquals(txid++, r.getTransactionId());
             ++numRead;
             r = reader.readNext(false);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
index 9a194c4..88840a0 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
@@ -32,24 +32,22 @@ import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.TestDistributedLogBase;
 import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
 import com.twitter.distributedlog.exceptions.ReadCancelledException;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Future;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.*;
 
 /**
@@ -92,25 +90,9 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
                                               long startEntryId,
                                               DistributedLogConfiguration conf)
             throws Exception {
-        LedgerHandle lh;
-        if (segment.isInProgress()) {
-            lh = bkc.get().openLedgerNoRecovery(
-                    segment.getLedgerId(),
-                    BookKeeper.DigestType.CRC32,
-                    conf.getBKDigestPW().getBytes(UTF_8));
-        } else {
-            lh = bkc.get().openLedger(
-                    segment.getLedgerId(),
-                    BookKeeper.DigestType.CRC32,
-                    conf.getBKDigestPW().getBytes(UTF_8));
-        }
-        return new BKLogSegmentEntryReader(
-                segment,
-                lh,
-                startEntryId,
-                bkc.get(),
-                scheduler,
-                conf);
+        LogSegmentEntryStore store = new BKLogSegmentEntryStore(
+                conf, bkc.get(), scheduler, NullStatsLogger.INSTANCE, 
AsyncFailureInjector.NULL);
+        return (BKLogSegmentEntryReader) 
FutureUtils.result(store.openReader(segment, startEntryId));
     }
 
     void generateCompletedLogSegments(DistributedLogManager dlm,
@@ -186,6 +168,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
             ++entryId;
         }
         assertEquals(21, txId);
+        assertFalse(reader.hasCaughtUpOnInprogress());
         Utils.close(reader);
     }
 
@@ -216,6 +199,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
                 // expected
             }
         }
+        assertFalse(reader.hasCaughtUpOnInprogress());
         assertTrue("Reader should be closed yet", reader.isClosed());
     }
 
@@ -245,9 +229,9 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         long txId = 1L;
         long entryId = 0L;
 
-
         assertEquals(10, reader.readAheadEntries.size());
         assertEquals(10, reader.getNextEntryId());
+        assertFalse(reader.hasCaughtUpOnInprogress());
         // read first entry
         Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
         LogRecordWithDLSN record = entryReader.nextRecord();
@@ -271,6 +255,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
 
         assertEquals(10, reader.readAheadEntries.size());
         assertEquals(11, reader.getNextEntryId());
+        assertFalse(reader.hasCaughtUpOnInprogress());
 
         Utils.close(reader);
     }
@@ -326,6 +311,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
 
         assertEquals(5, reader.readAheadEntries.size());
         assertEquals(6, reader.getNextEntryId());
+        assertFalse(reader.hasCaughtUpOnInprogress());
 
         Utils.close(reader);
     }
@@ -376,6 +362,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         assertEquals(2L, txId);
         assertEquals(reader.getLastAddConfirmed(), 
reader.readAheadEntries.size());
         assertEquals((reader.getLastAddConfirmed() + 1), 
reader.getNextEntryId());
+        assertFalse(reader.hasCaughtUpOnInprogress());
 
         Utils.close(reader);
     }
@@ -434,6 +421,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         // the long poll will be satisfied
         List<Entry.Reader> nextReadEntries = 
FutureUtils.result(nextReadFuture);
         assertEquals(1, nextReadEntries.size());
+        assertTrue(reader.hasCaughtUpOnInprogress());
         Entry.Reader entryReader = nextReadEntries.get(0);
         LogRecordWithDLSN record = entryReader.nextRecord();
         assertNotNull(record);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java
 
b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java
new file mode 100644
index 0000000..2b56550
--- /dev/null
+++ 
b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/LogSegmentIsTruncatedException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.exceptions;
+
+import com.twitter.distributedlog.thrift.service.StatusCode;
+
+/**
+ * Exception is thrown when reading data from a truncated log segment.
+ */
+public class LogSegmentIsTruncatedException extends DLException {
+
+    private static final long serialVersionUID = -218506870918498062L;
+
+    public LogSegmentIsTruncatedException(String logSegmentName) {
+        super(StatusCode.LOG_SEGMENT_IS_TRUNCATED, "Log Segment '"
+                + logSegmentName + "'" + " is already truncated");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-protocol/src/main/thrift/service.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/thrift/service.thrift 
b/distributedlog-protocol/src/main/thrift/service.thrift
index a67664c..c2f77f3 100644
--- a/distributedlog-protocol/src/main/thrift/service.thrift
+++ b/distributedlog-protocol/src/main/thrift/service.thrift
@@ -98,6 +98,8 @@ enum StatusCode {
     LOG_SEGMENT_NOT_FOUND = 525,
     // End of Log Segment
     END_OF_LOG_SEGMENT = 526,
+    // Log Segment Is Truncated
+    LOG_SEGMENT_IS_TRUNCATED = 527,
 
     /* 6xx: unexpected */
 

Reply via email to