DL-195: ReadAhead Improvement (part 1) - Interface for LogSegmentEntryReader 
and LogSegmentEntryWriter

Create interface for log segment entry reader and writer to abstract data 
write/read operations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/27c04f37
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/27c04f37
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/27c04f37

Branch: refs/heads/master
Commit: 27c04f37b1b564a06d9105acb1849a6d98449ca9
Parents: 98a29a5
Author: Sijie Guo <sij...@twitter.com>
Authored: Wed Dec 28 14:51:54 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Thu Dec 29 02:11:23 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKLogWriteHandler.java       |   2 +-
 .../DistributedLogConfiguration.java            |  44 ++
 .../impl/BKLogSegmentEntryWriter.java           |  61 --
 .../logsegment/BKLogSegmentEntryReader.java     | 740 +++++++++++++++++++
 .../logsegment/BKLogSegmentEntryWriter.java     |  61 ++
 .../distributedlog/impl/logsegment/BKUtils.java |  72 ++
 .../logsegment/LogSegmentEntryReader.java       |  67 ++
 .../logsegment/LogSegmentEntryStore.java        |  46 ++
 .../com/twitter/distributedlog/DLMTestUtil.java |   2 +-
 .../distributedlog/TestBKLogSegmentWriter.java  |   2 +-
 .../distributedlog/TestDistributedLogBase.java  |   2 +-
 .../logsegment/TestBKLogSegmentEntryReader.java | 553 ++++++++++++++
 .../exceptions/EndOfLogSegmentException.java    |  32 +
 .../src/main/thrift/service.thrift              |   2 +
 14 files changed, 1621 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index f8bc917..16c0111 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -29,7 +29,7 @@ import 
com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.GetLastTxIdFunction;
-import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index 6f37a59..6c6017e 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -352,6 +352,10 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
     public static final int 
BKDL_READAHEAD_NOSUCHLEDGER_EXCEPTION_ON_READLAC_ERROR_THRESHOLD_MILLIS_DEFAULT 
= 10000;
     public static final String BKDL_READAHEAD_SKIP_BROKEN_ENTRIES = 
"readAheadSkipBrokenEntries";
     public static final boolean BKDL_READAHEAD_SKIP_BROKEN_ENTRIES_DEFAULT = 
false;
+    public static final String BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT = 
"numPrefetchEntriesPerLogSegment";
+    public static final int BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 
4;
+    public static final String BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT = 
"maxPrefetchEntriesPerLogSegment";
+    public static final int BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT = 
32;
 
     // Scan Settings
     public static final String 
BKDL_FIRST_NUM_ENTRIES_PER_READ_LAST_RECORD_SCAN = 
"firstNumEntriesEachPerLastRecordScan";
@@ -2770,6 +2774,46 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
         return this;
     }
 
+    /**
+     * Get the number prefetch entries per log segment. Default value is 4.
+     *
+     * @return the number prefetch entries per log segment.
+     */
+    public int getNumPrefetchEntriesPerLogSegment() {
+        return getInt(BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT, 
BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT);
+    }
+
+    /**
+     * Set the number prefetch entries per log segment.
+     *
+     * @param numEntries the number prefetch entries per log segment.
+     * @return configuration
+     */
+    public DistributedLogConfiguration setNumPrefetchEntriesPerLogSegment(int 
numEntries) {
+        setProperty(BKDL_NUM_PREFETCH_ENTRIES_PER_LOGSEGMENT, numEntries);
+        return this;
+    }
+
+    /**
+     * Get the max prefetch entries per log segment. Default value is 4.
+     *
+     * @return the max prefetch entries per log segment.
+     */
+    public int getMaxPrefetchEntriesPerLogSegment() {
+        return getInt(BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT, 
BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT_DEFAULT);
+    }
+
+    /**
+     * Set the max prefetch entries per log segment.
+     *
+     * @param numEntries the max prefetch entries per log segment.
+     * @return configuration
+     */
+    public DistributedLogConfiguration setMaxPrefetchEntriesPerLogSegment(int 
numEntries) {
+        setProperty(BKDL_MAX_PREFETCH_ENTRIES_PER_LOGSEGMENT, numEntries);
+        return this;
+    }
+
     //
     // DL Reader Scan Settings
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java
deleted file mode 100644
index 57b4e69..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-/**
- * Ledger based log segment entry writer.
- */
-public class BKLogSegmentEntryWriter implements LogSegmentEntryWriter {
-
-    private final LedgerHandle lh;
-
-    public BKLogSegmentEntryWriter(LedgerHandle lh) {
-        this.lh = lh;
-    }
-
-    @VisibleForTesting
-    public LedgerHandle getLedgerHandle() {
-        return this.lh;
-    }
-
-    @Override
-    public long getLogSegmentId() {
-        return lh.getId();
-    }
-
-    @Override
-    public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) {
-        lh.asyncClose(callback, ctx);
-    }
-
-    @Override
-    public void asyncAddEntry(byte[] data, int offset, int length,
-                              AsyncCallback.AddCallback callback, Object ctx) {
-        lh.asyncAddEntry(data, offset, length, callback, ctx);
-    }
-
-    @Override
-    public long size() {
-        return lh.getLength();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
new file mode 100644
index 0000000..fd3b63f
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.logsegment;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.Entry;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.twitter.distributedlog.exceptions.DLIllegalStateException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
+import com.twitter.distributedlog.exceptions.ReadCancelledException;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * BookKeeper ledger based log segment entry reader.
+ */
+public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader, AsyncCallback.OpenCallback {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BKLogSegmentEntryReader.class);
+
+    private class CacheEntry implements Runnable, AsyncCallback.ReadCallback,
+            AsyncCallback.ReadLastConfirmedAndEntryCallback {
+
+        protected final long entryId;
+        private boolean done;
+        private LedgerEntry entry;
+        private int rc;
+
+        private CacheEntry(long entryId) {
+            this.entryId = entryId;
+            this.entry = null;
+            this.rc = BKException.Code.UnexpectedConditionException;
+            this.done = false;
+        }
+
+        long getEntryId() {
+            return entryId;
+        }
+
+        synchronized boolean isDone() {
+            return done;
+        }
+
+        void setValue(LedgerEntry entry) {
+            synchronized (this) {
+                if (done) {
+                    return;
+                }
+                this.rc = BKException.Code.OK;
+                this.entry = entry;
+            }
+            setDone(true);
+        }
+
+        void setException(int rc) {
+            synchronized (this) {
+                if (done) {
+                    return;
+                }
+                this.rc = rc;
+            }
+            setDone(false);
+        }
+
+        void setDone(boolean success) {
+            synchronized (this) {
+                this.done = true;
+            }
+            onReadEntryDone(success);
+        }
+
+        synchronized boolean isSuccess() {
+            return BKException.Code.OK == rc;
+        }
+
+        synchronized LedgerEntry getEntry() {
+            return this.entry;
+        }
+
+        synchronized int getRc() {
+            return rc;
+        }
+
+        @Override
+        public void readComplete(int rc,
+                                 LedgerHandle lh,
+                                 Enumeration<LedgerEntry> entries,
+                                 Object ctx) {
+            if (isDone()) {
+                return;
+            }
+            if (!checkReturnCodeAndHandleFailure(rc, false)) {
+                return;
+            }
+            LedgerEntry entry = null;
+            while (entries.hasMoreElements()) {
+                // more entries are returned
+                if (null != entry) {
+                    
setException(BKException.Code.UnexpectedConditionException);
+                    return;
+                }
+                entry = entries.nextElement();
+            }
+            if (null == entry || entry.getEntryId() != entryId) {
+                setException(BKException.Code.UnexpectedConditionException);
+                return;
+            }
+            setValue(entry);
+        }
+
+        @Override
+        public void readLastConfirmedAndEntryComplete(int rc,
+                                                      long entryId,
+                                                      LedgerEntry entry,
+                                                      Object ctx) {
+            if (isDone()) {
+                return;
+            }
+            if (!checkReturnCodeAndHandleFailure(rc, true)) {
+                return;
+            }
+            if (null != entry && this.entryId == entryId) {
+                setValue(entry);
+                return;
+            }
+            // the long poll is timeout or interrupted; we will retry it again.
+            issueRead(this);
+        }
+
+        /**
+         * Check return code and retry if needed.
+         *
+         * @param rc the return code
+         * @param isLongPoll is it a long poll request
+         * @return is the request successful or not
+         */
+        boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) {
+            if (BKException.Code.OK == rc) {
+                numReadErrors.set(0);
+                return true;
+            }
+            if (BKException.Code.BookieHandleNotAvailableException == rc ||
+                    (isLongPoll && 
BKException.Code.NoSuchLedgerExistsException == rc)) {
+                int numErrors = Math.max(1, numReadErrors.incrementAndGet());
+                int nextReadBackoffTime = Math.min(numErrors * 
readAheadWaitTime, maxReadBackoffTime);
+                scheduler.schedule(
+                        this,
+                        nextReadBackoffTime,
+                        TimeUnit.MILLISECONDS);
+            } else {
+                setException(rc);
+            }
+            return false;
+        }
+
+        @Override
+        public void run() {
+            issueRead(this);
+        }
+    }
+
+    private class PendingReadRequest {
+        private final int numEntries;
+        private final List<Entry.Reader> entries;
+        private final Promise<List<Entry.Reader>> promise;
+
+        PendingReadRequest(int numEntries) {
+            this.numEntries = numEntries;
+            if (numEntries == 1) {
+                this.entries = new ArrayList<Entry.Reader>(1);
+            } else {
+                this.entries = new ArrayList<Entry.Reader>();
+            }
+            this.promise = new Promise<List<Entry.Reader>>();
+        }
+
+        Promise<List<Entry.Reader>> getPromise() {
+            return promise;
+        }
+
+        void setException(Throwable throwable) {
+            FutureUtils.setException(promise, throwable);
+        }
+
+        void addEntry(Entry.Reader entry) {
+            entries.add(entry);
+        }
+
+        void complete() {
+            FutureUtils.setValue(promise, entries);
+            onEntriesConsumed(entries.size());
+        }
+
+        boolean hasReadEntries() {
+            return entries.size() > 0;
+        }
+
+        boolean hasReadEnoughEntries() {
+            return entries.size() >= numEntries;
+        }
+    }
+
+    private final BookKeeper bk;
+    private final DistributedLogConfiguration conf;
+    private final OrderedScheduler scheduler;
+    private final long startEntryId;
+    private final long lssn;
+    private final long startSequenceId;
+    private final boolean envelopeEntries;
+    private final int numPrefetchEntries;
+    private final int maxPrefetchEntries;
+    // state
+    private Promise<Void> closePromise = null;
+    private LogSegmentMetadata metadata;
+    private LedgerHandle lh;
+    private final List<LedgerHandle> openLedgerHandles;
+    private CacheEntry outstandingLongPoll;
+    private long nextEntryId;
+    private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>(null);
+    private final AtomicLong scheduleCount = new AtomicLong(0);
+    // read retries
+    private int readAheadWaitTime;
+    private final int maxReadBackoffTime;
+    private final AtomicInteger numReadErrors = new AtomicInteger(0);
+    // readahead cache
+    int cachedEntries = 0;
+    int numOutstandingEntries = 0;
+    final LinkedBlockingQueue<CacheEntry> readAheadEntries;
+    // request queue
+    final ConcurrentLinkedQueue<PendingReadRequest> readQueue;
+
+    BKLogSegmentEntryReader(LogSegmentMetadata metadata,
+                            LedgerHandle lh,
+                            long startEntryId,
+                            BookKeeper bk,
+                            OrderedScheduler scheduler,
+                            DistributedLogConfiguration conf) {
+        this.metadata = metadata;
+        this.lssn = metadata.getLogSegmentSequenceNumber();
+        this.startSequenceId = metadata.getStartSequenceId();
+        this.envelopeEntries = metadata.getEnvelopeEntries();
+        this.lh = lh;
+        this.startEntryId = this.nextEntryId = Math.max(startEntryId, 0);
+        this.bk = bk;
+        this.conf = conf;
+        this.numPrefetchEntries = conf.getNumPrefetchEntriesPerLogSegment();
+        this.maxPrefetchEntries = conf.getMaxPrefetchEntriesPerLogSegment();
+        this.scheduler = scheduler;
+        this.openLedgerHandles = Lists.newArrayList();
+        this.openLedgerHandles.add(lh);
+        this.outstandingLongPoll = null;
+        // create the readahead queue
+        this.readAheadEntries = new LinkedBlockingQueue<CacheEntry>();
+        // create the read request queue
+        this.readQueue = new ConcurrentLinkedQueue<PendingReadRequest>();
+        // read backoff settings
+        this.readAheadWaitTime = conf.getReadAheadWaitTime();
+        this.maxReadBackoffTime = 4 * conf.getReadAheadWaitTime();
+    }
+
+    synchronized LedgerHandle getLh() {
+        return lh;
+    }
+
+    synchronized LogSegmentMetadata getSegment() {
+        return metadata;
+    }
+
+    @VisibleForTesting
+    synchronized long getNextEntryId() {
+        return nextEntryId;
+    }
+
+    @Override
+    public void start() {
+        prefetchIfNecessary();
+    }
+
+    //
+    // Process on Log Segment Metadata Updates
+    //
+
+    @Override
+    public synchronized void onLogSegmentMetadataUpdated(LogSegmentMetadata 
segment) {
+        if (metadata == segment ||
+                LogSegmentMetadata.COMPARATOR.compare(metadata, segment) == 0 
||
+                !(metadata.isInProgress() && !segment.isInProgress())) {
+            return;
+        }
+        // segment is closed from inprogress, then re-open the log segment
+        bk.asyncOpenLedger(
+                segment.getLedgerId(),
+                BookKeeper.DigestType.CRC32,
+                conf.getBKDigestPW().getBytes(UTF_8),
+                this,
+                segment);
+    }
+
+    @Override
+    public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+        LogSegmentMetadata segment = (LogSegmentMetadata) ctx;
+        if (BKException.Code.OK != rc) {
+            // fail current reader or retry opening the reader
+            failOrRetryOpenLedger(rc, segment);
+            return;
+        }
+        // switch to new ledger handle if the log segment is moved to 
completed.
+        CacheEntry longPollRead = null;
+        synchronized (this) {
+            if (isClosed()) {
+                lh.asyncClose(new AsyncCallback.CloseCallback() {
+                    @Override
+                    public void closeComplete(int rc, LedgerHandle lh, Object 
ctx) {
+                        logger.debug("Close the open ledger {} since the log 
segment reader is already closed",
+                                lh.getId());
+                    }
+                }, null);
+                return;
+            }
+            this.metadata = segment;
+            this.lh = lh;
+            this.openLedgerHandles.add(lh);
+            longPollRead = outstandingLongPoll;
+        }
+        if (null != longPollRead) {
+            // reissue the long poll read when the log segment state is changed
+            issueRead(longPollRead);
+        }
+        // notify readers
+        notifyReaders();
+    }
+
+    private void failOrRetryOpenLedger(int rc, final LogSegmentMetadata 
segment) {
+        if (isClosed()) {
+            return;
+        }
+        if (isBeyondLastAddConfirmed()) {
+            // if the reader is already caught up, let's fail the reader 
immediately
+            // as we need to pull the latest metadata of this log segment.
+            setException(new BKTransmitException("Failed to open ledger for 
reading log segment " + getSegment(), rc),
+                    true);
+            return;
+        }
+        // the reader is still catching up, retry opening the log segment later
+        scheduler.schedule(new Runnable() {
+            @Override
+            public void run() {
+                onLogSegmentMetadataUpdated(segment);
+            }
+        }, conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    //
+    // Change the state of this reader
+    //
+
+    private boolean checkClosedOrInError() {
+        if (null != lastException.get()) {
+            cancelAllPendingReads(lastException.get());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Set the reader into error state with return code <i>rc</i>.
+     *
+     * @param throwable exception indicating the error
+     * @param isBackground is the reader set exception by background reads or 
foreground reads
+     */
+    private void setException(Throwable throwable, boolean isBackground) {
+        lastException.compareAndSet(null, throwable);
+        if (isBackground) {
+            notifyReaders();
+        }
+    }
+
+    /**
+     * Notify the readers with the state change.
+     */
+    private void notifyReaders() {
+        processReadRequests();
+    }
+
+    private void cancelAllPendingReads(Throwable throwExc) {
+        for (PendingReadRequest request : readQueue) {
+            request.setException(throwExc);
+        }
+        readQueue.clear();
+    }
+
+    //
+    // Background Read Operations
+    //
+
+    private void onReadEntryDone(boolean success) {
+        // we successfully read an entry
+        synchronized (this) {
+            --numOutstandingEntries;
+        }
+        // notify reader that there is entry ready
+        notifyReaders();
+        // stop prefetch if we already encountered exceptions
+        if (success) {
+            prefetchIfNecessary();
+        }
+    }
+
+    private void onEntriesConsumed(int numEntries) {
+        synchronized (this) {
+            cachedEntries -= numEntries;
+        }
+        prefetchIfNecessary();
+    }
+
+    private void prefetchIfNecessary() {
+        List<CacheEntry> entriesToFetch;
+        synchronized (this) {
+            if (cachedEntries >= maxPrefetchEntries) {
+                return;
+            }
+            // we don't have enough entries, do prefetch
+            int numEntriesToFetch = numPrefetchEntries - numOutstandingEntries;
+            if (numEntriesToFetch <= 0) {
+                return;
+            }
+            entriesToFetch = new ArrayList<CacheEntry>(numEntriesToFetch);
+            for (int i = 0; i < numEntriesToFetch; i++) {
+                if (cachedEntries >= maxPrefetchEntries) {
+                    break;
+                }
+                if ((isLedgerClosed() && nextEntryId > getLastAddConfirmed()) 
||
+                        (!isLedgerClosed() && nextEntryId > 
getLastAddConfirmed() + 1)) {
+                    break;
+                }
+                entriesToFetch.add(new CacheEntry(nextEntryId));
+                ++numOutstandingEntries;
+                ++cachedEntries;
+                ++nextEntryId;
+            }
+        }
+        for (CacheEntry entry : entriesToFetch) {
+            readAheadEntries.add(entry);
+            issueRead(entry);
+        }
+    }
+
+
+    private void issueRead(CacheEntry cacheEntry) {
+        if (isClosed()) {
+            return;
+        }
+        if (isLedgerClosed()) {
+            if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) {
+                issueSimpleRead(cacheEntry);
+                return;
+            } else {
+                // Reach the end of stream
+                notifyReaders();
+            }
+        } else { // the ledger is still in progress
+            if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) {
+                issueSimpleRead(cacheEntry);
+            } else {
+                issueLongPollRead(cacheEntry);
+            }
+        }
+    }
+
+    private void issueSimpleRead(CacheEntry cacheEntry) {
+        getLh().asyncReadEntries(cacheEntry.entryId, cacheEntry.entryId, 
cacheEntry, null);
+    }
+
+    private void issueLongPollRead(CacheEntry cacheEntry) {
+        // register the read as outstanding reads
+        synchronized (this) {
+            this.outstandingLongPoll = cacheEntry;
+        }
+        getLh().asyncReadLastConfirmedAndEntry(
+                cacheEntry.entryId,
+                conf.getReadLACLongPollTimeout(),
+                false,
+                cacheEntry,
+                null);
+    }
+
+    //
+    // Foreground Read Operations
+    //
+
+    Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
+        return Entry.newBuilder()
+                .setLogSegmentInfo(lssn, startSequenceId)
+                .setEntryId(entry.getEntryId())
+                .setEnvelopeEntry(envelopeEntries)
+                .deserializeRecordSet(false)
+                .setInputStream(entry.getEntryInputStream())
+                .buildReader();
+    }
+
+    @Override
+    public Future<List<Entry.Reader>> readNext(int numEntries) {
+        final PendingReadRequest readRequest = new 
PendingReadRequest(numEntries);
+
+        if (checkClosedOrInError()) {
+            readRequest.setException(lastException.get());
+        } else {
+            boolean wasQueueEmpty;
+            synchronized (readQueue) {
+                wasQueueEmpty = readQueue.isEmpty();
+                readQueue.add(readRequest);
+            }
+            if (wasQueueEmpty) {
+                processReadRequests();
+            }
+        }
+        return readRequest.getPromise();
+    }
+
+    private void processReadRequests() {
+        if (isClosed()) {
+            // the reader is already closed.
+            return;
+        }
+
+        long prevCount = scheduleCount.getAndIncrement();
+        if (0 == prevCount) {
+            scheduler.submit(this);
+        }
+    }
+
+    /**
+     * The core function to propagate fetched entries to read requests
+     */
+    @Override
+    public void run() {
+        long scheduleCountLocal = scheduleCount.get();
+        while (true) {
+            PendingReadRequest nextRequest = null;
+            synchronized (readQueue) {
+                nextRequest = readQueue.peek();
+            }
+
+            // if read queue is empty, nothing to read, return
+            if (null == nextRequest) {
+                scheduleCount.set(0L);
+                return;
+            }
+
+            // if the oldest pending promise is interrupted then we must
+            // mark the reader in error and abort all pending reads since
+            // we don't know the last consumed read
+            if (null == lastException.get()) {
+                if (nextRequest.getPromise().isInterrupted().isDefined()) {
+                    setException(new DLInterruptedException("Interrupted on 
reading log segment "
+                            + getSegment() + " : " + 
nextRequest.getPromise().isInterrupted().get()), false);
+                }
+            }
+
+            // if the reader is in error state, stop read
+            if (checkClosedOrInError()) {
+                return;
+            }
+
+            // read entries from readahead cache to satisfy next read request
+            readEntriesFromReadAheadCache(nextRequest);
+
+            // check if we can satisfy the read request
+            if (nextRequest.hasReadEntries()) {
+                PendingReadRequest request;
+                synchronized (readQueue) {
+                    request = readQueue.poll();
+                }
+                if (null != request && nextRequest == request) {
+                    request.complete();
+                } else {
+                    DLIllegalStateException ise = new 
DLIllegalStateException("Unexpected condition at reading from "
+                            + getSegment());
+                    nextRequest.setException(ise);
+                    if (null != request) {
+                        request.setException(ise);
+                    }
+                    setException(ise, false);
+                }
+            } else {
+                if (0 == scheduleCountLocal) {
+                    return;
+                }
+                scheduleCountLocal = scheduleCount.decrementAndGet();
+            }
+        }
+    }
+
+    private void readEntriesFromReadAheadCache(PendingReadRequest nextRequest) 
{
+        while (!nextRequest.hasReadEnoughEntries()) {
+            CacheEntry entry = readAheadEntries.peek();
+            // no entry available in the read ahead cache
+            if (null == entry) {
+                if (isEndOfLogSegment()) {
+                    setException(new 
EndOfLogSegmentException(getSegment().getZNodeName()), false);
+                }
+                return;
+            }
+            // entry is not complete yet.
+            if (!entry.isDone()) {
+                // we already reached end of the log segment
+                if (isEndOfLogSegment(entry.getEntryId())) {
+                    setException(new 
EndOfLogSegmentException(getSegment().getZNodeName()), false);
+                }
+                return;
+            }
+            if (entry.isSuccess()) {
+                CacheEntry removedEntry = readAheadEntries.poll();
+                if (entry != removedEntry) {
+                    DLIllegalStateException ise = new 
DLIllegalStateException("Unexpected condition at reading from "
+                            + getSegment());
+                    setException(ise, false);
+                    return;
+                }
+                try {
+                    nextRequest.addEntry(processReadEntry(entry.getEntry()));
+                } catch (IOException e) {
+                    setException(e, false);
+                    return;
+                }
+            } else {
+                setException(new BKTransmitException("Encountered issue on 
reading entry " + entry.getEntryId()
+                        + " @ log segment " + getSegment(), entry.getRc()), 
false);
+                return;
+            }
+        }
+    }
+
+    //
+    // State Management
+    //
+
+    private synchronized boolean isEndOfLogSegment() {
+        return isEndOfLogSegment(nextEntryId);
+    }
+
+    private boolean isEndOfLogSegment(long entryId) {
+        return isLedgerClosed() && entryId > getLastAddConfirmed();
+    }
+
+    private synchronized boolean isBeyondLastAddConfirmed() {
+        return isBeyondLastAddConfirmed(nextEntryId);
+    }
+
+    private boolean isBeyondLastAddConfirmed(long entryId) {
+        return entryId > getLastAddConfirmed();
+    }
+
+    private synchronized boolean isNotBeyondLastAddConfirmed() {
+        return isNotBeyondLastAddConfirmed(nextEntryId);
+    }
+
+    private boolean isNotBeyondLastAddConfirmed(long entryId) {
+        return entryId <= getLastAddConfirmed();
+    }
+
+    private boolean isLedgerClosed() {
+        return getLh().isClosed();
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return getLh().getLastAddConfirmed();
+    }
+
+    synchronized boolean isClosed() {
+        return null != closePromise;
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        final Promise<Void> closeFuture;
+        ReadCancelledException exception;
+        LedgerHandle[] lhsToClose;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+            lhsToClose = openLedgerHandles.toArray(new 
LedgerHandle[openLedgerHandles.size()]);
+            // set the exception to cancel pending and subsequent reads
+            exception = new 
ReadCancelledException(getSegment().getZNodeName(), "Reader was closed");
+            setException(exception, false);
+        }
+
+        // cancel all pending reads
+        cancelAllPendingReads(exception);
+
+        // close all the open ledger
+        BKUtils.closeLedgers(lhsToClose).proxyTo(closeFuture);
+        return closeFuture;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
new file mode 100644
index 0000000..34fe1c3
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.logsegment;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+/**
+ * Ledger based log segment entry writer.
+ */
+public class BKLogSegmentEntryWriter implements LogSegmentEntryWriter {
+
+    private final LedgerHandle lh;
+
+    public BKLogSegmentEntryWriter(LedgerHandle lh) {
+        this.lh = lh;
+    }
+
+    @VisibleForTesting
+    public LedgerHandle getLedgerHandle() {
+        return this.lh;
+    }
+
+    @Override
+    public long getLogSegmentId() {
+        return lh.getId();
+    }
+
+    @Override
+    public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) {
+        lh.asyncClose(callback, ctx);
+    }
+
+    @Override
+    public void asyncAddEntry(byte[] data, int offset, int length,
+                              AsyncCallback.AddCallback callback, Object ctx) {
+        lh.asyncAddEntry(data, offset, length, callback, ctx);
+    }
+
+    @Override
+    public long size() {
+        return lh.getLength();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
new file mode 100644
index 0000000..c71c67e
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.logsegment;
+
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.function.VoidFunctions;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+import java.util.List;
+
+/**
+ * BookKeeper Util Functions
+ */
+public class BKUtils {
+
+    /**
+     * Close a ledger <i>lh</i>.
+     *
+     * @param lh ledger handle
+     * @return future represents close result.
+     */
+    public static Future<Void> closeLedger(LedgerHandle lh) {
+        final Promise<Void> closePromise = new Promise<Void>();
+        lh.asyncClose(new AsyncCallback.CloseCallback() {
+            @Override
+            public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+                if (BKException.Code.OK != rc) {
+                    FutureUtils.setException(closePromise, 
BKException.create(rc));
+                } else {
+                    FutureUtils.setValue(closePromise, null);
+                }
+            }
+        }, null);
+        return closePromise;
+    }
+
+    /**
+     * Close a list of ledgers <i>lhs</i>.
+     *
+     * @param lhs a list of ledgers
+     * @return future represents close results.
+     */
+    public static Future<Void> closeLedgers(LedgerHandle ... lhs) {
+        List<Future<Void>> closeResults = 
Lists.newArrayListWithExpectedSize(lhs.length);
+        for (LedgerHandle lh : lhs) {
+            closeResults.add(closeLedger(lh));
+        }
+        return 
Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
new file mode 100644
index 0000000..d43f3d8
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.logsegment;
+
+import com.google.common.annotations.Beta;
+import com.twitter.distributedlog.Entry;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.io.AsyncCloseable;
+import com.twitter.util.Future;
+
+import java.util.List;
+
+/**
+ * An interface class to read the enveloped entry (serialized bytes of
+ * {@link com.twitter.distributedlog.Entry}) from a log segment
+ */
+@Beta
+public interface LogSegmentEntryReader extends AsyncCloseable {
+
+    /**
+     * Start the reader. The method to signal the implementation
+     * to start preparing the data for consumption {@link #readNext(int)}
+     */
+    void start();
+
+    /**
+     * Update the log segment each time when the metadata has changed.
+     *
+     * @param segment new metadata of the log segment.
+     */
+    void onLogSegmentMetadataUpdated(LogSegmentMetadata segment);
+
+    /**
+     * Read next <i>numEntries</i> entries from current log segment.
+     * <p>
+     * <i>numEntries</i> will be best-effort.
+     *
+     * @param numEntries num entries to read from current log segment
+     * @return A promise that when satisified will contain a non-empty list of 
entries with their content.
+     * @throws {@link 
com.twitter.distributedlog.exceptions.EndOfLogSegmentException} when
+     *          read entries beyond the end of a <i>closed</i> log segment.
+     */
+    Future<List<Entry.Reader>> readNext(int numEntries);
+
+    /**
+     * Return the last add confirmed entry id (LAC).
+     *
+     * @return the last add confirmed entry id.
+     */
+    long getLastAddConfirmed();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
new file mode 100644
index 0000000..ff47691
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.logsegment;
+
+import com.google.common.annotations.Beta;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.util.Future;
+
+/**
+ * Log Segment Store to read log segments
+ */
+@Beta
+public interface LogSegmentEntryStore {
+
+    /**
+     * Open the writer for writing data to the log <i>segment</i>.
+     *
+     * @param segment the log <i>segment</i> to write data to
+     * @return future represent the opened writer
+     */
+    Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment);
+
+    /**
+     * Open the reader for reading data to the log <i>segment</i>.
+     *
+     * @param segment the log <i>segment</i> to read data from
+     * @return future represent the opened reader
+     */
+    Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
index 1485ae6..588c366 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/DLMTestUtil.java
@@ -17,7 +17,7 @@
  */
 package com.twitter.distributedlog;
 
-import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
index 7e497c4..b350255 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKLogSegmentWriter.java
@@ -21,7 +21,7 @@ import 
com.twitter.distributedlog.exceptions.BKTransmitException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
 import com.twitter.distributedlog.exceptions.WriteCancelledException;
 import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.io.Abortables;
 import com.twitter.distributedlog.lock.SessionLockFactory;
 import com.twitter.distributedlog.lock.ZKDistributedLock;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
index 8a734b5..a388b68 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestDistributedLogBase.java
@@ -19,7 +19,7 @@ package com.twitter.distributedlog;
 
 import static org.junit.Assert.assertTrue;
 
-import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
+import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
new file mode 100644
index 0000000..9a194c4
--- /dev/null
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.logsegment;
+
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.AsyncLogWriter;
+import com.twitter.distributedlog.BookKeeperClient;
+import com.twitter.distributedlog.BookKeeperClientBuilder;
+import com.twitter.distributedlog.DLMTestUtil;
+import com.twitter.distributedlog.DLSN;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.Entry;
+import com.twitter.distributedlog.LogRecord;
+import com.twitter.distributedlog.LogRecordWithDLSN;
+import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.TestDistributedLogBase;
+import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
+import com.twitter.distributedlog.exceptions.ReadCancelledException;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.*;
+
+/**
+ * Test Case for {@link BKLogSegmentEntryReader}
+ */
+public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {
+
+    @Rule
+    public TestName runtime = new TestName();
+    private OrderedScheduler scheduler;
+    private BookKeeperClient bkc;
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        bkc = BookKeeperClientBuilder.newBuilder()
+                .name("test-bk")
+                .dlConfig(conf)
+                .ledgersPath("/ledgers")
+                .zkServers(bkutil.getZkServers())
+                .build();
+        scheduler = OrderedScheduler.newBuilder()
+                .name("test-bk-logsegment-entry-reader")
+                .corePoolSize(1)
+                .build();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != bkc) {
+            bkc.close();
+        }
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
+        super.teardown();
+    }
+
+    BKLogSegmentEntryReader createEntryReader(LogSegmentMetadata segment,
+                                              long startEntryId,
+                                              DistributedLogConfiguration conf)
+            throws Exception {
+        LedgerHandle lh;
+        if (segment.isInProgress()) {
+            lh = bkc.get().openLedgerNoRecovery(
+                    segment.getLedgerId(),
+                    BookKeeper.DigestType.CRC32,
+                    conf.getBKDigestPW().getBytes(UTF_8));
+        } else {
+            lh = bkc.get().openLedger(
+                    segment.getLedgerId(),
+                    BookKeeper.DigestType.CRC32,
+                    conf.getBKDigestPW().getBytes(UTF_8));
+        }
+        return new BKLogSegmentEntryReader(
+                segment,
+                lh,
+                startEntryId,
+                bkc.get(),
+                scheduler,
+                conf);
+    }
+
+    void generateCompletedLogSegments(DistributedLogManager dlm,
+                                      DistributedLogConfiguration conf,
+                                      long numCompletedSegments,
+                                      long segmentSize) throws Exception {
+        long txid = 1L;
+        for (long i = 0; i < numCompletedSegments; i++) {
+            AsyncLogWriter writer = 
FutureUtils.result(dlm.openAsyncLogWriter());
+            for (long j = 1; j <= segmentSize; j++) {
+                
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
+                ctrlRecord.setControl();
+                FutureUtils.result(writer.write(ctrlRecord));
+            }
+            Utils.close(writer);
+        }
+    }
+
+    AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm,
+                                              DistributedLogConfiguration conf,
+                                              long segmentSize) throws 
Exception {
+        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        for (long i = 1L; i <= segmentSize; i++) {
+            
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
+            ctrlRecord.setControl();
+            FutureUtils.result(writer.write(ctrlRecord));
+        }
+        return writer;
+    }
+
+    @Test(timeout = 60000)
+    public void testReadEntriesFromCompleteLogSegment() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setNumPrefetchEntriesPerLogSegment(10);
+        confLocal.setMaxPrefetchEntriesPerLogSegment(10);
+        DistributedLogManager dlm = createNewDLM(confLocal, 
runtime.getMethodName());
+        generateCompletedLogSegments(dlm, confLocal, 1, 20);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
+        reader.start();
+        boolean done = false;
+        long txId = 1L;
+        long entryId = 0L;
+        while (!done) {
+            Entry.Reader entryReader;
+            try {
+                entryReader = FutureUtils.result(reader.readNext(1)).get(0);
+            } catch (EndOfLogSegmentException eol) {
+                done = true;
+                continue;
+            }
+            LogRecordWithDLSN record = entryReader.nextRecord();
+            while (null != record) {
+                if (!record.isControl()) {
+                    DLMTestUtil.verifyLogRecord(record);
+                    assertEquals(txId, record.getTransactionId());
+                    ++txId;
+                }
+                DLSN dlsn = record.getDlsn();
+                assertEquals(1L, dlsn.getLogSegmentSequenceNo());
+                assertEquals(entryId, dlsn.getEntryId());
+                record = entryReader.nextRecord();
+            }
+            ++entryId;
+        }
+        assertEquals(21, txId);
+        Utils.close(reader);
+    }
+
+    @Test(timeout = 60000)
+    public void testCloseReaderToCancelPendingReads() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setNumPrefetchEntriesPerLogSegment(10);
+        confLocal.setMaxPrefetchEntriesPerLogSegment(10);
+        DistributedLogManager dlm = createNewDLM(confLocal, 
runtime.getMethodName());
+        DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 1, 20);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
+        List<Future<List<Entry.Reader>>> futures = Lists.newArrayList();
+        for (int i = 0; i < 5; i++) {
+            futures.add(reader.readNext(1));
+        }
+        assertFalse("Reader should not be closed yet", reader.isClosed());
+        Utils.close(reader);
+        for (Future<List<Entry.Reader>> future : futures) {
+            try {
+                FutureUtils.result(future);
+                fail("The read request should be cancelled");
+            } catch (ReadCancelledException rce) {
+                // expected
+            }
+        }
+        assertTrue("Reader should be closed yet", reader.isClosed());
+    }
+
+    @Test(timeout = 60000)
+    public void testMaxPrefetchEntriesSmallBatch() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setNumPrefetchEntriesPerLogSegment(2);
+        confLocal.setMaxPrefetchEntriesPerLogSegment(10);
+        DistributedLogManager dlm = createNewDLM(confLocal, 
runtime.getMethodName());
+        generateCompletedLogSegments(dlm, confLocal, 1, 20);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
+        reader.start();
+
+        // wait for the read ahead entries to become available
+        while (reader.readAheadEntries.size() < 10) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+
+        long txId = 1L;
+        long entryId = 0L;
+
+
+        assertEquals(10, reader.readAheadEntries.size());
+        assertEquals(10, reader.getNextEntryId());
+        // read first entry
+        Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+        LogRecordWithDLSN record = entryReader.nextRecord();
+        while (null != record) {
+            if (!record.isControl()) {
+                DLMTestUtil.verifyLogRecord(record);
+                assertEquals(txId, record.getTransactionId());
+                ++txId;
+            }
+            DLSN dlsn = record.getDlsn();
+            assertEquals(1L, dlsn.getLogSegmentSequenceNo());
+            assertEquals(entryId, dlsn.getEntryId());
+            record = entryReader.nextRecord();
+        }
+        ++entryId;
+        assertEquals(2L, txId);
+        // wait for the read ahead entries to become 10 again
+        while (reader.readAheadEntries.size() < 10) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+
+        assertEquals(10, reader.readAheadEntries.size());
+        assertEquals(11, reader.getNextEntryId());
+
+        Utils.close(reader);
+    }
+
+    @Test(timeout = 60000)
+    public void testMaxPrefetchEntriesLargeBatch() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setNumPrefetchEntriesPerLogSegment(10);
+        confLocal.setMaxPrefetchEntriesPerLogSegment(5);
+        DistributedLogManager dlm = createNewDLM(confLocal, 
runtime.getMethodName());
+        generateCompletedLogSegments(dlm, confLocal, 1, 20);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
+        reader.start();
+
+        // wait for the read ahead entries to become available
+        while (reader.readAheadEntries.size() < 5) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+
+        long txId = 1L;
+        long entryId = 0L;
+
+        assertEquals(5, reader.readAheadEntries.size());
+        assertEquals(5, reader.getNextEntryId());
+        // read first entry
+        Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+        LogRecordWithDLSN record = entryReader.nextRecord();
+        while (null != record) {
+            if (!record.isControl()) {
+                DLMTestUtil.verifyLogRecord(record);
+                assertEquals(txId, record.getTransactionId());
+                ++txId;
+            }
+            DLSN dlsn = record.getDlsn();
+            assertEquals(1L, dlsn.getLogSegmentSequenceNo());
+            assertEquals(entryId, dlsn.getEntryId());
+            record = entryReader.nextRecord();
+        }
+        ++entryId;
+        assertEquals(2L, txId);
+        // wait for the read ahead entries to become 10 again
+        while (reader.readAheadEntries.size() < 5) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+
+        assertEquals(5, reader.readAheadEntries.size());
+        assertEquals(6, reader.getNextEntryId());
+
+        Utils.close(reader);
+    }
+
+    @Test(timeout = 60000)
+    public void testMaxPrefetchEntriesSmallSegment() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setNumPrefetchEntriesPerLogSegment(10);
+        confLocal.setMaxPrefetchEntriesPerLogSegment(20);
+        DistributedLogManager dlm = createNewDLM(confLocal, 
runtime.getMethodName());
+        generateCompletedLogSegments(dlm, confLocal, 1, 5);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
+        reader.start();
+
+        // wait for the read ahead entries to become available
+        while (reader.readAheadEntries.size() < (reader.getLastAddConfirmed() 
+ 1)) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+
+        long txId = 1L;
+        long entryId = 0L;
+
+        assertEquals((reader.getLastAddConfirmed() + 1), 
reader.readAheadEntries.size());
+        assertEquals((reader.getLastAddConfirmed() + 1), 
reader.getNextEntryId());
+        // read first entry
+        Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+        LogRecordWithDLSN record = entryReader.nextRecord();
+        while (null != record) {
+            if (!record.isControl()) {
+                DLMTestUtil.verifyLogRecord(record);
+                assertEquals(txId, record.getTransactionId());
+                ++txId;
+            }
+            DLSN dlsn = record.getDlsn();
+            assertEquals(1L, dlsn.getLogSegmentSequenceNo());
+            assertEquals(entryId, dlsn.getEntryId());
+            record = entryReader.nextRecord();
+        }
+        ++entryId;
+        assertEquals(2L, txId);
+        assertEquals(reader.getLastAddConfirmed(), 
reader.readAheadEntries.size());
+        assertEquals((reader.getLastAddConfirmed() + 1), 
reader.getNextEntryId());
+
+        Utils.close(reader);
+    }
+
+    @Test(timeout = 60000)
+    public void testReadEntriesFromInprogressSegment() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setNumPrefetchEntriesPerLogSegment(20);
+        confLocal.setMaxPrefetchEntriesPerLogSegment(20);
+        DistributedLogManager dlm = createNewDLM(confLocal, 
runtime.getMethodName());
+        AsyncLogWriter writer = createInprogressLogSegment(dlm, confLocal, 5);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
+        reader.start();
+
+        long expectedLastAddConfirmed = 8L;
+        // wait until sending out all prefetch requests
+        while (reader.readAheadEntries.size() < expectedLastAddConfirmed + 2) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertEquals(expectedLastAddConfirmed + 2, reader.getNextEntryId());
+
+        long txId = 1L;
+        long entryId = 0L;
+        while (true) {
+            Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+            LogRecordWithDLSN record = entryReader.nextRecord();
+            while (null != record) {
+                if (!record.isControl()) {
+                    DLMTestUtil.verifyLogRecord(record);
+                    assertEquals(txId, record.getTransactionId());
+                    ++txId;
+                }
+                DLSN dlsn = record.getDlsn();
+                assertEquals(1L, dlsn.getLogSegmentSequenceNo());
+                assertEquals(entryId, dlsn.getEntryId());
+                record = entryReader.nextRecord();
+            }
+            ++entryId;
+            if (entryId == expectedLastAddConfirmed + 1) {
+                break;
+            }
+        }
+        assertEquals(6L, txId);
+
+        Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
+        // write another record to commit previous writes
+        
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
+        // the long poll will be satisfied
+        List<Entry.Reader> nextReadEntries = 
FutureUtils.result(nextReadFuture);
+        assertEquals(1, nextReadEntries.size());
+        Entry.Reader entryReader = nextReadEntries.get(0);
+        LogRecordWithDLSN record = entryReader.nextRecord();
+        assertNotNull(record);
+        assertTrue(record.isControl());
+        assertNull(entryReader.nextRecord());
+        // once the read is advanced, we will prefetch next record
+        while (reader.getNextEntryId() <= entryId) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertEquals(entryId + 2, reader.getNextEntryId());
+        assertEquals(1, reader.readAheadEntries.size());
+
+        Utils.close(reader);
+        Utils.close(writer);
+    }
+
+    @Test(timeout = 60000)
+    public void testReadEntriesOnStateChange() throws Exception {
+        DistributedLogConfiguration confLocal = new 
DistributedLogConfiguration();
+        confLocal.addConfiguration(conf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setNumPrefetchEntriesPerLogSegment(20);
+        confLocal.setMaxPrefetchEntriesPerLogSegment(20);
+        DistributedLogManager dlm = createNewDLM(confLocal, 
runtime.getMethodName());
+        AsyncLogWriter writer = createInprogressLogSegment(dlm, confLocal, 5);
+        List<LogSegmentMetadata> segments = dlm.getLogSegments();
+        assertEquals(segments.size() + " log segments found, expected to be 
only one",
+                1, segments.size());
+
+        BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
+        reader.start();
+
+        long expectedLastAddConfirmed = 8L;
+        // wait until sending out all prefetch requests
+        while (reader.readAheadEntries.size() < expectedLastAddConfirmed + 2) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertEquals(expectedLastAddConfirmed + 2, reader.getNextEntryId());
+
+        long txId = 1L;
+        long entryId = 0L;
+        while (true) {
+            Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+            LogRecordWithDLSN record = entryReader.nextRecord();
+            while (null != record) {
+                if (!record.isControl()) {
+                    DLMTestUtil.verifyLogRecord(record);
+                    assertEquals(txId, record.getTransactionId());
+                    ++txId;
+                }
+                DLSN dlsn = record.getDlsn();
+                assertEquals(1L, dlsn.getLogSegmentSequenceNo());
+                assertEquals(entryId, dlsn.getEntryId());
+                record = entryReader.nextRecord();
+            }
+            ++entryId;
+            if (entryId == expectedLastAddConfirmed + 1) {
+                break;
+            }
+        }
+        assertEquals(6L, txId);
+
+        Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
+        // write another record to commit previous writes
+        
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
+        // the long poll will be satisfied
+        List<Entry.Reader> nextReadEntries = 
FutureUtils.result(nextReadFuture);
+        assertEquals(1, nextReadEntries.size());
+        Entry.Reader entryReader = nextReadEntries.get(0);
+        LogRecordWithDLSN record = entryReader.nextRecord();
+        assertNotNull(record);
+        assertTrue(record.isControl());
+        assertNull(entryReader.nextRecord());
+        // once the read is advanced, we will prefetch next record
+        while (reader.getNextEntryId() <= entryId) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertEquals(entryId + 2, reader.getNextEntryId());
+        assertEquals(1, reader.readAheadEntries.size());
+
+        // advance the entry id
+        ++entryId;
+        // close the writer, the write will be committed
+        Utils.close(writer);
+        entryReader = FutureUtils.result(reader.readNext(1)).get(0);
+        record = entryReader.nextRecord();
+        assertNotNull(record);
+        assertFalse(record.isControl());
+        assertNull(entryReader.nextRecord());
+        while (reader.getNextEntryId() <= entryId + 1) {
+            TimeUnit.MILLISECONDS.sleep(10);
+        }
+        assertEquals(entryId + 2, reader.getNextEntryId());
+        assertEquals(1, reader.readAheadEntries.size());
+
+        // get the new log segment
+        List<LogSegmentMetadata> newSegments = dlm.getLogSegments();
+        assertEquals(1, newSegments.size());
+        assertFalse(newSegments.get(0).isInProgress());
+        reader.onLogSegmentMetadataUpdated(newSegments.get(0));
+        // when reader received the new log segments. the outstanding long poll
+        // should be cancelled and end of log segment should be signaled 
correctly
+        try {
+            // when we closed the log segment, another control record will be
+            // written, so we loop over the reader until we reach end of log 
segment.
+            FutureUtils.result(reader.readNext(1));
+            FutureUtils.result(reader.readNext(1));
+            fail("Should reach end of log segment");
+        } catch (EndOfLogSegmentException eol) {
+            // expected
+        }
+        Utils.close(reader);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java
 
b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java
new file mode 100644
index 0000000..ae4aa45
--- /dev/null
+++ 
b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/exceptions/EndOfLogSegmentException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.exceptions;
+
+import com.twitter.distributedlog.thrift.service.StatusCode;
+
+/**
+ * Exception thrown when reach end of the log segment.
+ */
+public class EndOfLogSegmentException extends DLException {
+
+    private static final long serialVersionUID = 6060419315910178451L;
+
+    public EndOfLogSegmentException(String logSegmentName) {
+        super(StatusCode.END_OF_LOG_SEGMENT, "end of log segment " + 
logSegmentName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/27c04f37/distributedlog-protocol/src/main/thrift/service.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/thrift/service.thrift 
b/distributedlog-protocol/src/main/thrift/service.thrift
index a25af63..a67664c 100644
--- a/distributedlog-protocol/src/main/thrift/service.thrift
+++ b/distributedlog-protocol/src/main/thrift/service.thrift
@@ -96,6 +96,8 @@ enum StatusCode {
     TOO_MANY_STREAMS = 524,
     /* Log Segment Not Found */
     LOG_SEGMENT_NOT_FOUND = 525,
+    // End of Log Segment
+    END_OF_LOG_SEGMENT = 526,
 
     /* 6xx: unexpected */
 

Reply via email to