http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
deleted file mode 100644
index 40e3930..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
+++ /dev/null
@@ -1,992 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Ticker;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.callback.LogSegmentListener;
-import 
com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Futures;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * New ReadAhead Reader that uses {@link 
com.twitter.distributedlog.logsegment.LogSegmentEntryReader}.
- *
- * NOTE: all the state changes happen in the same thread. All *unsafe* methods 
should be submitted to the order
- * scheduler using stream name as the key.
- */
-public class ReadAheadEntryReader implements
-        AsyncCloseable,
-        LogSegmentListener,
-        LogSegmentEntryReader.StateChangeListener,
-        FutureEventListener<List<Entry.Reader>> {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ReadAheadEntryReader.class);
-
-    //
-    // Static Functions
-    //
-
-    private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> 
START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
-        @Override
-        public BoxedUnit apply(LogSegmentEntryReader reader) {
-            reader.start();
-            return BoxedUnit.UNIT;
-        }
-    };
-
-    //
-    // Internal Classes
-    //
-
-    class SegmentReader implements FutureEventListener<LogSegmentEntryReader> {
-
-        private LogSegmentMetadata metadata;
-        private final long startEntryId;
-        private Future<LogSegmentEntryReader> openFuture = null;
-        private LogSegmentEntryReader reader = null;
-        private boolean isStarted = false;
-        private boolean isClosed = false;
-
-        SegmentReader(LogSegmentMetadata metadata,
-                      long startEntryId) {
-            this.metadata = metadata;
-            this.startEntryId = startEntryId;
-        }
-
-        synchronized LogSegmentEntryReader getEntryReader() {
-            return reader;
-        }
-
-        synchronized boolean isBeyondLastAddConfirmed() {
-            return null != reader && reader.isBeyondLastAddConfirmed();
-        }
-
-        synchronized LogSegmentMetadata getSegment() {
-            return metadata;
-        }
-
-        synchronized boolean isReaderOpen() {
-            return null != openFuture;
-        }
-
-        synchronized void openReader() {
-            if (null != openFuture) {
-                return;
-            }
-            openFuture = entryStore.openReader(metadata, 
startEntryId).addEventListener(this);
-        }
-
-        synchronized boolean isReaderStarted() {
-            return isStarted;
-        }
-
-        synchronized void startRead() {
-            if (isStarted) {
-                return;
-            }
-            isStarted = true;
-            if (null != reader) {
-                reader.start();
-            } else {
-                openFuture.onSuccess(START_READER_FUNC);
-            }
-        }
-
-        synchronized Future<List<Entry.Reader>> readNext() {
-            if (null != reader) {
-                checkCatchingUpStatus(reader);
-                return reader.readNext(numReadAheadEntries);
-            } else {
-                return openFuture.flatMap(readFunc);
-            }
-        }
-
-        synchronized void updateLogSegmentMetadata(final LogSegmentMetadata 
segment) {
-            if (null != reader) {
-                reader.onLogSegmentMetadataUpdated(segment);
-                this.metadata = segment;
-            } else {
-                openFuture.onSuccess(new 
AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(LogSegmentEntryReader reader) {
-                        reader.onLogSegmentMetadataUpdated(segment);
-                        synchronized (SegmentReader.this) {
-                            SegmentReader.this.metadata = segment;
-                        }
-                        return BoxedUnit.UNIT;
-                    }
-                });
-            }
-        }
-
-        @Override
-        synchronized public void onSuccess(LogSegmentEntryReader reader) {
-            this.reader = reader;
-            if (reader.getSegment().isInProgress()) {
-                reader.registerListener(ReadAheadEntryReader.this);
-            }
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            // no-op, the failure will be propagated on first read.
-        }
-
-        synchronized boolean isClosed() {
-            return isClosed;
-        }
-
-        synchronized Future<Void> close() {
-            if (null == openFuture) {
-                return Future.Void();
-            }
-            return openFuture.flatMap(new 
AbstractFunction1<LogSegmentEntryReader, Future<Void>>() {
-                @Override
-                public Future<Void> apply(LogSegmentEntryReader reader) {
-                    return reader.asyncClose();
-                }
-            }).ensure(new Function0<BoxedUnit>() {
-                @Override
-                public BoxedUnit apply() {
-                    synchronized (SegmentReader.this) {
-                        isClosed = true;
-                    }
-                    return null;
-                }
-            });
-        }
-    }
-
-    private class ReadEntriesFunc extends 
AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> {
-
-        private final int numEntries;
-
-        ReadEntriesFunc(int numEntries) {
-            this.numEntries = numEntries;
-        }
-
-        @Override
-        public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) {
-            checkCatchingUpStatus(reader);
-            return reader.readNext(numEntries);
-        }
-    }
-
-    private abstract class CloseableRunnable implements Runnable {
-
-        @Override
-        public void run() {
-            synchronized (ReadAheadEntryReader.this) {
-                if (null != closePromise) {
-                    return;
-                }
-            }
-            try {
-                safeRun();
-            } catch (Throwable cause) {
-                logger.error("Caught unexpected exception : ", cause);
-            }
-        }
-
-        abstract void safeRun();
-
-    }
-
-    //
-    // Functions
-    //
-    private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> 
readFunc;
-    private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new 
Function0<BoxedUnit>() {
-        @Override
-        public BoxedUnit apply() {
-            removeClosedSegmentReaders();
-            return BoxedUnit.UNIT;
-        }
-    };
-
-    //
-    // Resources
-    //
-    private final DistributedLogConfiguration conf;
-    private final BKLogReadHandler readHandler;
-    private final LogSegmentEntryStore entryStore;
-    private final OrderedScheduler scheduler;
-
-    //
-    // Parameters
-    //
-    private final String streamName;
-    private final DLSN fromDLSN;
-    private final int maxCachedEntries;
-    private final int numReadAheadEntries;
-    private final int idleWarnThresholdMillis;
-
-    //
-    // Cache
-    //
-    private final LinkedBlockingQueue<Entry.Reader> entryQueue;
-
-    //
-    // State of the reader
-    //
-
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    private boolean isInitialized = false;
-    private boolean readAheadPaused = false;
-    private Promise<Void> closePromise = null;
-    // segment readers
-    private long currentSegmentSequenceNumber;
-    private SegmentReader currentSegmentReader;
-    private SegmentReader nextSegmentReader;
-    private DLSN lastDLSN;
-    private final EntryPosition nextEntryPosition;
-    private volatile boolean isCatchingUp = true;
-    private final LinkedList<SegmentReader> segmentReaders;
-    private final LinkedList<SegmentReader> segmentReadersToClose;
-    // last exception that this reader encounters
-    private final AtomicReference<IOException> lastException = new 
AtomicReference<IOException>(null);
-    // last entry added time
-    private final Stopwatch lastEntryAddedTime;
-    // state change notification
-    private final CopyOnWriteArraySet<AsyncNotification> 
stateChangeNotifications =
-            new CopyOnWriteArraySet<AsyncNotification>();
-    // idle reader check task
-    private final ScheduledFuture<?> idleReaderCheckTask;
-
-    //
-    // Stats
-    //
-    private final AlertStatsLogger alertStatsLogger;
-
-    public ReadAheadEntryReader(String streamName,
-                                DLSN fromDLSN,
-                                DistributedLogConfiguration conf,
-                                BKLogReadHandler readHandler,
-                                LogSegmentEntryStore entryStore,
-                                OrderedScheduler scheduler,
-                                Ticker ticker,
-                                AlertStatsLogger alertStatsLogger) {
-        this.streamName = streamName;
-        this.fromDLSN = lastDLSN = fromDLSN;
-        this.nextEntryPosition = new EntryPosition(
-                fromDLSN.getLogSegmentSequenceNo(),
-                fromDLSN.getEntryId());
-        this.conf = conf;
-        this.maxCachedEntries = conf.getReadAheadMaxRecords();
-        this.numReadAheadEntries = conf.getReadAheadBatchSize();
-        this.idleWarnThresholdMillis = conf.getReaderIdleWarnThresholdMillis();
-        this.readHandler = readHandler;
-        this.entryStore = entryStore;
-        this.scheduler = scheduler;
-        this.readFunc = new ReadEntriesFunc(numReadAheadEntries);
-        this.alertStatsLogger = alertStatsLogger;
-
-        // create the segment reader list
-        this.segmentReaders = new LinkedList<SegmentReader>();
-        this.segmentReadersToClose = new LinkedList<SegmentReader>();
-        // create the readahead entry queue
-        this.entryQueue = new LinkedBlockingQueue<Entry.Reader>();
-
-        // start the idle reader detection
-        lastEntryAddedTime = Stopwatch.createStarted(ticker);
-        // start the idle reader check task
-        idleReaderCheckTask = scheduleIdleReaderTaskIfNecessary();
-    }
-
-    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
-        if (idleWarnThresholdMillis < Integer.MAX_VALUE && 
idleWarnThresholdMillis > 0) {
-            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
-                @Override
-                public void run() {
-                    if (!isReaderIdle(idleWarnThresholdMillis, 
TimeUnit.MILLISECONDS)) {
-                        return;
-                    }
-                    // the readahead has been idle
-                    unsafeCheckIfReadAheadIsIdle();
-                }
-            }, idleWarnThresholdMillis, idleWarnThresholdMillis, 
TimeUnit.MILLISECONDS);
-        }
-        return null;
-    }
-
-    private void unsafeCheckIfReadAheadIsIdle() {
-        boolean forceReadLogSegments =
-                (null == currentSegmentReader) || 
currentSegmentReader.isBeyondLastAddConfirmed();
-        if (forceReadLogSegments) {
-            readHandler.readLogSegmentsFromStore(
-                    LogSegmentMetadata.COMPARATOR,
-                    LogSegmentFilter.DEFAULT_FILTER,
-                    null
-            ).addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-                @Override
-                public void onFailure(Throwable cause) {
-                    // do nothing here since it would be retried on next idle 
reader check task
-                }
-
-                @Override
-                public void onSuccess(Versioned<List<LogSegmentMetadata>> 
segments) {
-                    onSegmentsUpdated(segments.getValue());
-                }
-            });
-        }
-    }
-
-    private void cancelIdleReaderTask() {
-        if (null != idleReaderCheckTask) {
-            idleReaderCheckTask.cancel(true);
-        }
-    }
-
-    @VisibleForTesting
-    EntryPosition getNextEntryPosition() {
-        return nextEntryPosition;
-    }
-
-    @VisibleForTesting
-    SegmentReader getCurrentSegmentReader() {
-        return currentSegmentReader;
-    }
-
-    @VisibleForTesting
-    long getCurrentSegmentSequenceNumber() {
-        return currentSegmentSequenceNumber;
-    }
-
-    @VisibleForTesting
-    SegmentReader getNextSegmentReader() {
-        return nextSegmentReader;
-    }
-
-    @VisibleForTesting
-    LinkedList<SegmentReader> getSegmentReaders() {
-        return segmentReaders;
-    }
-
-    @VisibleForTesting
-    boolean isInitialized() {
-        return isInitialized;
-    }
-
-    private void orderedSubmit(Runnable runnable) {
-        synchronized (this) {
-            if (null != closePromise) {
-                return;
-            }
-        }
-        try {
-            scheduler.submit(streamName, runnable);
-        } catch (RejectedExecutionException ree) {
-            logger.debug("Failed to submit and execute an operation for 
readhead entry reader of {}",
-                    streamName, ree);
-        }
-    }
-
-    public void start(final List<LogSegmentMetadata> segmentList) {
-        logger.info("Starting the readahead entry reader for {} : segments = 
{}",
-                readHandler.getFullyQualifiedName(), segmentList);
-        started.set(true);
-        processLogSegments(segmentList);
-    }
-
-    private void removeClosedSegmentReaders() {
-        orderedSubmit(new CloseableRunnable() {
-            @Override
-            void safeRun() {
-                unsafeRemoveClosedSegmentReaders();
-            }
-        });
-    }
-
-    private void unsafeRemoveClosedSegmentReaders() {
-        SegmentReader reader = segmentReadersToClose.peekFirst();
-        while (null != reader) {
-            if (reader.isClosed()) {
-                segmentReadersToClose.pollFirst();
-                reader = segmentReadersToClose.peekFirst();
-            } else {
-                break;
-            }
-        }
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closeFuture;
-        synchronized (this) {
-            if (null != closePromise) {
-                return closePromise;
-            }
-            closePromise = closeFuture = new Promise<Void>();
-        }
-
-        // cancel the idle reader task
-        cancelIdleReaderTask();
-
-        // use runnable here instead of CloseableRunnable,
-        // because we need this to be executed
-        try {
-            scheduler.submit(streamName, new Runnable() {
-                @Override
-                public void run() {
-                    unsafeAsyncClose(closeFuture);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            logger.warn("Scheduler has been shutdown before closing the 
readahead entry reader for stream {}",
-                    streamName, ree);
-            unsafeAsyncClose(closeFuture);
-        }
-
-        return closeFuture;
-    }
-
-    private void unsafeAsyncClose(Promise<Void> closePromise) {
-        List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize(
-                segmentReaders.size() + segmentReadersToClose.size() + 1);
-        if (null != currentSegmentReader) {
-            segmentReadersToClose.add(currentSegmentReader);
-        }
-        if (null != nextSegmentReader) {
-            segmentReadersToClose.add(nextSegmentReader);
-        }
-        for (SegmentReader reader : segmentReaders) {
-            segmentReadersToClose.add(reader);
-        }
-        segmentReaders.clear();
-        for (SegmentReader reader : segmentReadersToClose) {
-            closeFutures.add(reader.close());
-        }
-        Futures.collect(closeFutures).proxyTo(closePromise);
-    }
-
-    //
-    // Reader State Changes
-    //
-
-    ReadAheadEntryReader addStateChangeNotification(AsyncNotification 
notification) {
-        this.stateChangeNotifications.add(notification);
-        return this;
-    }
-
-    ReadAheadEntryReader removeStateChangeNotification(AsyncNotification 
notification) {
-        this.stateChangeNotifications.remove(notification);
-        return this;
-    }
-
-    private void notifyStateChangeOnSuccess() {
-        for (AsyncNotification notification : stateChangeNotifications) {
-            notification.notifyOnOperationComplete();
-        }
-    }
-
-    private void notifyStateChangeOnFailure(Throwable cause) {
-        for (AsyncNotification notification : stateChangeNotifications) {
-            notification.notifyOnError(cause);
-        }
-    }
-
-    void setLastException(IOException cause) {
-        if (!lastException.compareAndSet(null, cause)) {
-            logger.debug("last exception has already been set to ", 
lastException.get());
-        }
-        // the exception is set and notify the state change
-        notifyStateChangeOnFailure(cause);
-    }
-
-    void checkLastException() throws IOException {
-        if (null != lastException.get()) {
-            throw lastException.get();
-        }
-    }
-
-    void checkCatchingUpStatus(LogSegmentEntryReader reader) {
-        if (reader.getSegment().isInProgress()
-                && isCatchingUp
-                && reader.hasCaughtUpOnInprogress()) {
-            logger.info("ReadAhead for {} is caught up at entry {} @ log 
segment {}.",
-                    new Object[] { readHandler.getFullyQualifiedName(),
-                            reader.getLastAddConfirmed(), reader.getSegment() 
});
-            isCatchingUp = false;
-        }
-    }
-
-    void markCaughtup() {
-        if (isCatchingUp) {
-            isCatchingUp = false;
-            logger.info("ReadAhead for {} is caught up", 
readHandler.getFullyQualifiedName());
-        }
-    }
-
-    public boolean isReadAheadCaughtUp() {
-        return !isCatchingUp;
-    }
-
-    @Override
-    public void onCaughtupOnInprogress() {
-        markCaughtup();
-    }
-
-    //
-    // ReadAhead State Machine
-    //
-
-    @Override
-    public void onSuccess(List<Entry.Reader> entries) {
-        lastEntryAddedTime.reset().start();
-        for (Entry.Reader entry : entries) {
-            entryQueue.add(entry);
-        }
-        if (!entries.isEmpty()) {
-            Entry.Reader lastEntry = entries.get(entries.size() - 1);
-            nextEntryPosition.advance(lastEntry.getLSSN(), 
lastEntry.getEntryId() + 1);
-        }
-        // notify on data available
-        notifyStateChangeOnSuccess();
-        if (entryQueue.size() >= maxCachedEntries) {
-            pauseReadAheadOnCacheFull();
-        } else {
-            scheduleReadNext();
-        }
-    }
-
-    @Override
-    public void onFailure(Throwable cause) {
-        if (cause instanceof EndOfLogSegmentException) {
-            // we reach end of the log segment
-            moveToNextLogSegment();
-            return;
-        }
-        if (cause instanceof IOException) {
-            setLastException((IOException) cause);
-        } else {
-            setLastException(new UnexpectedException("Unexpected non I/O 
exception", cause));
-        }
-    }
-
-    private synchronized void invokeReadAhead() {
-        if (readAheadPaused) {
-            scheduleReadNext();
-            readAheadPaused = false;
-        }
-    }
-
-    private synchronized void pauseReadAheadOnCacheFull() {
-        this.readAheadPaused = true;
-        if (!isCacheFull()) {
-            invokeReadAhead();
-        }
-    }
-
-    private synchronized void pauseReadAheadOnNoMoreLogSegments() {
-        this.readAheadPaused = true;
-    }
-
-    //
-    // Cache Related Methods
-    //
-
-    public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit 
waitTimeUnit) throws IOException {
-        if (null != lastException.get()) {
-            throw lastException.get();
-        }
-        Entry.Reader entry;
-        try {
-            entry = entryQueue.poll(waitTime, waitTimeUnit);
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Interrupted on waiting next 
readahead entry : ", e);
-        }
-        try {
-            return entry;
-        } finally {
-            // resume readahead if the cache becomes empty
-            if (null != entry && !isCacheFull()) {
-                invokeReadAhead();
-            }
-        }
-    }
-
-    /**
-     * Return number cached entries.
-     *
-     * @return number cached entries.
-     */
-    public int getNumCachedEntries() {
-        return entryQueue.size();
-    }
-
-    /**
-     * Return if the cache is full.
-     *
-     * @return true if the cache is full, otherwise false.
-     */
-    public boolean isCacheFull() {
-        return getNumCachedEntries() >= maxCachedEntries;
-    }
-
-    @VisibleForTesting
-    public boolean isCacheEmpty() {
-        return entryQueue.isEmpty();
-    }
-
-    /**
-     * Check whether the readahead becomes stall.
-     *
-     * @param idleReaderErrorThreshold idle reader error threshold
-     * @param timeUnit time unit of the idle reader error threshold
-     * @return true if the readahead becomes stall, otherwise false.
-     */
-    public boolean isReaderIdle(int idleReaderErrorThreshold, TimeUnit 
timeUnit) {
-        return (lastEntryAddedTime.elapsed(timeUnit) > 
idleReaderErrorThreshold);
-    }
-
-    //
-    // LogSegment Management
-    //
-
-    void processLogSegments(final List<LogSegmentMetadata> segments) {
-        orderedSubmit(new CloseableRunnable() {
-            @Override
-            void safeRun() {
-                unsafeProcessLogSegments(segments);
-            }
-        });
-    }
-
-    private void unsafeProcessLogSegments(List<LogSegmentMetadata> segments) {
-        if (isInitialized) {
-            unsafeReinitializeLogSegments(segments);
-        } else {
-            unsafeInitializeLogSegments(segments);
-        }
-    }
-
-    /**
-     * Update the log segment metadata.
-     *
-     * @param reader the reader to update the metadata
-     * @param newMetadata the new metadata received
-     * @return true if successfully, false on encountering errors
-     */
-    private boolean updateLogSegmentMetadata(SegmentReader reader,
-                                             LogSegmentMetadata newMetadata) {
-        if (reader.getSegment().getLogSegmentSequenceNumber() != 
newMetadata.getLogSegmentSequenceNumber()) {
-            setLastException(new DLIllegalStateException("Inconsistent state 
found in entry reader for "
-                    + streamName + " : current segment = " + 
reader.getSegment() + ", new segment = " + newMetadata));
-            return false;
-        }
-        if (!reader.getSegment().isInProgress() && newMetadata.isInProgress()) 
{
-            setLastException(new DLIllegalStateException("An inprogress log 
segment " + newMetadata
-                    + " received after a closed log segment " + 
reader.getSegment() + " on reading segment "
-                    + newMetadata.getLogSegmentSequenceNumber() + " @ stream " 
+ streamName));
-            return false;
-        }
-        if (reader.getSegment().isInProgress() && !newMetadata.isInProgress()) 
{
-            reader.updateLogSegmentMetadata(newMetadata);
-        }
-        return true;
-    }
-
-    /**
-     * Reinitialize the log segments
-     */
-    private void unsafeReinitializeLogSegments(List<LogSegmentMetadata> 
segments) {
-        logger.info("Reinitialize log segments with {}", segments);
-        int segmentIdx = 0;
-        for (; segmentIdx < segments.size(); segmentIdx++) {
-            LogSegmentMetadata segment = segments.get(segmentIdx);
-            if (segment.getLogSegmentSequenceNumber() < 
currentSegmentSequenceNumber) {
-                continue;
-            }
-            break;
-        }
-        if (segmentIdx >= segments.size()) {
-            return;
-        }
-        LogSegmentMetadata segment = segments.get(segmentIdx);
-        if (null != currentSegmentReader) {
-            if (!updateLogSegmentMetadata(currentSegmentReader, segment)) {
-                return;
-            }
-        } else {
-            if (currentSegmentSequenceNumber != 
segment.getLogSegmentSequenceNumber()) {
-                setLastException(new DLIllegalStateException("Inconsistent 
state found in entry reader for "
-                        + streamName + " : current segment sn = " + 
currentSegmentSequenceNumber
-                        + ", new segment sn = " + 
segment.getLogSegmentSequenceNumber()));
-                return;
-            }
-        }
-        segmentIdx++;
-        if (segmentIdx >= segments.size()) {
-            return;
-        }
-        // check next segment
-        segment = segments.get(segmentIdx);
-        if (null != nextSegmentReader) {
-            if (!updateLogSegmentMetadata(nextSegmentReader, segment)) {
-                return;
-            }
-            segmentIdx++;
-        }
-        // check the segment readers in the queue
-        for (int readerIdx = 0;
-             readerIdx < segmentReaders.size() && segmentIdx < segments.size();
-             readerIdx++, segmentIdx++) {
-            SegmentReader reader = segmentReaders.get(readerIdx);
-            segment = segments.get(segmentIdx);
-            if (!updateLogSegmentMetadata(reader, segment)) {
-                return;
-            }
-        }
-        // add the remaining segments to the reader queue
-        for (; segmentIdx < segments.size(); segmentIdx++) {
-            segment = segments.get(segmentIdx);
-            SegmentReader reader = new SegmentReader(segment, 0L);
-            reader.openReader();
-            segmentReaders.add(reader);
-        }
-        if (null == currentSegmentReader) {
-            unsafeMoveToNextLogSegment();
-        }
-        // resume readahead if necessary
-        invokeReadAhead();
-    }
-
-    /**
-     * Initialize the reader with the log <i>segments</i>.
-     *
-     * @param segments list of log segments
-     */
-    private void unsafeInitializeLogSegments(List<LogSegmentMetadata> 
segments) {
-        if (segments.isEmpty()) {
-            // not initialize the background reader, until the first log 
segment is notified
-            return;
-        }
-        boolean skipTruncatedLogSegments = true;
-        DLSN dlsnToStart = fromDLSN;
-        // positioning the reader
-        for (int i = 0; i < segments.size(); i++) {
-            LogSegmentMetadata segment = segments.get(i);
-            // skip any log segments that have smaller log segment sequence 
numbers
-            if (segment.getLogSegmentSequenceNumber() < 
fromDLSN.getLogSegmentSequenceNo()) {
-                continue;
-            }
-            // if the log segment is truncated, skip it.
-            if (skipTruncatedLogSegments &&
-                    !conf.getIgnoreTruncationStatus() &&
-                    segment.isTruncated()) {
-                continue;
-            }
-            // if the log segment is partially truncated, move the start dlsn 
to the min active dlsn
-            if (skipTruncatedLogSegments &&
-                    !conf.getIgnoreTruncationStatus() &&
-                    segment.isPartiallyTruncated()) {
-                if (segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
-                    dlsnToStart = segment.getMinActiveDLSN();
-                }
-            }
-            skipTruncatedLogSegments = false;
-            if (!isAllowedToPosition(segment, dlsnToStart)) {
-                logger.error("segment {} is not allowed to position at {}", 
segment, dlsnToStart);
-                return;
-            }
-
-            SegmentReader reader = new SegmentReader(segment,
-                    segment.getLogSegmentSequenceNumber() == 
dlsnToStart.getLogSegmentSequenceNo()
-                            ? dlsnToStart.getEntryId() : 0L);
-            segmentReaders.add(reader);
-        }
-        if (segmentReaders.isEmpty()) {
-            // not initialize the background reader, until the first log 
segment is available to read
-            return;
-        }
-        currentSegmentReader = segmentReaders.pollFirst();
-        currentSegmentReader.openReader();
-        currentSegmentReader.startRead();
-        currentSegmentSequenceNumber = 
currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
-        unsafeReadNext(currentSegmentReader);
-        if (!segmentReaders.isEmpty()) {
-            for (SegmentReader reader : segmentReaders) {
-                reader.openReader();
-            }
-            unsafePrefetchNextSegment(true);
-        }
-        // mark the reader initialized
-        isInitialized = true;
-    }
-
-    private void unsafePrefetchNextSegment(boolean onlyInprogressLogSegment) {
-        SegmentReader nextReader = segmentReaders.peekFirst();
-        // open the next log segment if it is inprogress
-        if (null != nextReader) {
-            if (onlyInprogressLogSegment && 
!nextReader.getSegment().isInProgress()) {
-                return;
-            }
-            nextReader.startRead();
-            nextSegmentReader = nextReader;
-            segmentReaders.pollFirst();
-        }
-    }
-
-    /**
-     * Check if we are allowed to position the reader at <i>fromDLSN</i>.
-     *
-     * @return true if it is allowed, otherwise false.
-     */
-    private boolean isAllowedToPosition(LogSegmentMetadata segment, DLSN 
fromDLSN) {
-        if (segment.isTruncated()
-                && segment.getLastDLSN().compareTo(fromDLSN) >= 0
-                && !conf.getIgnoreTruncationStatus()) {
-            setLastException(new 
AlreadyTruncatedTransactionException(streamName
-                    + " : trying to position read ahead at " + fromDLSN
-                    + " on a segment " + segment + " that is already marked as 
truncated"));
-            return false;
-        }
-        if (segment.isPartiallyTruncated() &&
-                segment.getMinActiveDLSN().compareTo(fromDLSN) > 0) {
-            if (conf.getAlertWhenPositioningOnTruncated()) {
-                alertStatsLogger.raise("Trying to position reader on {} when 
{} is marked partially truncated",
-                    fromDLSN, segment);
-            }
-            if (!conf.getIgnoreTruncationStatus()) {
-                logger.error("{}: Trying to position reader on {} when {} is 
marked partially truncated",
-                        new Object[]{ streamName, fromDLSN, segment });
-
-                setLastException(new 
AlreadyTruncatedTransactionException(streamName
-                        + " : trying to position read ahead at " + fromDLSN
-                        + " on a segment " + segment + " that is already 
marked as truncated"));
-                return false;
-            }
-        }
-        return true;
-    }
-
-    void moveToNextLogSegment() {
-        orderedSubmit(new CloseableRunnable() {
-            @Override
-            void safeRun() {
-                unsafeMoveToNextLogSegment();
-            }
-        });
-    }
-
-    private void unsafeMoveToNextLogSegment() {
-        if (null != currentSegmentReader) {
-            segmentReadersToClose.add(currentSegmentReader);
-            
currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc);
-            logger.debug("close current segment reader {}", 
currentSegmentReader.getSegment());
-            currentSegmentReader = null;
-        }
-        boolean hasSegmentToRead = false;
-        if (null != nextSegmentReader) {
-            currentSegmentReader = nextSegmentReader;
-            logger.debug("move to read segment {}", 
currentSegmentReader.getSegment());
-            currentSegmentSequenceNumber = 
currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
-            nextSegmentReader = null;
-            // start reading
-            unsafeReadNext(currentSegmentReader);
-            unsafePrefetchNextSegment(true);
-            hasSegmentToRead = true;
-        } else {
-            unsafePrefetchNextSegment(false);
-            if (null != nextSegmentReader) {
-                currentSegmentReader = nextSegmentReader;
-                logger.debug("move to read segment {}", 
currentSegmentReader.getSegment());
-                currentSegmentSequenceNumber = 
currentSegmentReader.getSegment().getLogSegmentSequenceNumber();
-                nextSegmentReader = null;
-                unsafeReadNext(currentSegmentReader);
-                unsafePrefetchNextSegment(true);
-                hasSegmentToRead = true;
-            }
-        }
-        if (!hasSegmentToRead) { // no more segment to read, wait until new 
log segment arrive
-            if (isCatchingUp) {
-                logger.info("ReadAhead for {} is caught up and no log segments 
to read now",
-                        readHandler.getFullyQualifiedName());
-                isCatchingUp = false;
-            }
-            pauseReadAheadOnNoMoreLogSegments();
-        }
-    }
-
-    void scheduleReadNext() {
-        orderedSubmit(new CloseableRunnable() {
-            @Override
-            void safeRun() {
-                if (null == currentSegmentReader) {
-                    pauseReadAheadOnNoMoreLogSegments();
-                    return;
-                }
-                unsafeReadNext(currentSegmentReader);
-            }
-        });
-    }
-
-    private void unsafeReadNext(SegmentReader reader) {
-        reader.readNext().addEventListener(this);
-    }
-
-    @Override
-    public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
-        if (!started.get()) {
-            return;
-        }
-        logger.info("segments is updated with {}", segments);
-        processLogSegments(segments);
-    }
-
-    @Override
-    public void onLogStreamDeleted() {
-        setLastException(new LogNotFoundException("Log stream "
-                + streamName + " is deleted"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java
deleted file mode 100644
index f481561..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadUtils.java
+++ /dev/null
@@ -1,782 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.selector.FirstDLSNNotLessThanSelector;
-import com.twitter.distributedlog.selector.FirstTxIdNotLessThanSelector;
-import com.twitter.distributedlog.selector.LastRecordSelector;
-import com.twitter.distributedlog.selector.LogRecordSelector;
-import com.twitter.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
-
-/**
- * Utility function for readers
- */
-public class ReadUtils {
-
-    static final Logger LOG = LoggerFactory.getLogger(ReadUtils.class);
-
-    private static final int MIN_SEARCH_BATCH_SIZE = 2;
-
-    //
-    // Read First & Last Record Functions
-    //
-
-    /**
-     * Read last record from a log segment.
-     *
-     * @param streamName
-     *          fully qualified stream name (used for logging)
-     * @param l
-     *          log segment metadata.
-     * @param fence
-     *          whether to fence the log segment.
-     * @param includeControl
-     *          whether to include control record.
-     * @param includeEndOfStream
-     *          whether to include end of stream.
-     * @param scanStartBatchSize
-     *          first num entries used for read last record scan
-     * @param scanMaxBatchSize
-     *          max num entries used for read last record scan
-     * @param numRecordsScanned
-     *          num of records scanned to get last record
-     * @param executorService
-     *          executor service used for processing entries
-     * @param entryStore
-     *          log segment entry store
-     * @return a future with last record.
-     */
-    public static Future<LogRecordWithDLSN> asyncReadLastRecord(
-            final String streamName,
-            final LogSegmentMetadata l,
-            final boolean fence,
-            final boolean includeControl,
-            final boolean includeEndOfStream,
-            final int scanStartBatchSize,
-            final int scanMaxBatchSize,
-            final AtomicInteger numRecordsScanned,
-            final ExecutorService executorService,
-            final LogSegmentEntryStore entryStore) {
-        final LogRecordSelector selector = new LastRecordSelector();
-        return asyncReadRecord(streamName, l, fence, includeControl, 
includeEndOfStream, scanStartBatchSize,
-                               scanMaxBatchSize, numRecordsScanned, 
executorService, entryStore,
-                               selector, true /* backward */, 0L);
-    }
-
-    /**
-     * Read first record from a log segment with a DLSN larger than that given.
-     *
-     * @param streamName
-     *          fully qualified stream name (used for logging)
-     * @param l
-     *          log segment metadata.
-     * @param scanStartBatchSize
-     *          first num entries used for read last record scan
-     * @param scanMaxBatchSize
-     *          max num entries used for read last record scan
-     * @param numRecordsScanned
-     *          num of records scanned to get last record
-     * @param executorService
-     *          executor service used for processing entries
-     * @param entryStore
-     *          log segment entry store
-     * @param dlsn
-     *          threshold dlsn
-     * @return a future with last record.
-     */
-    public static Future<LogRecordWithDLSN> asyncReadFirstUserRecord(
-            final String streamName,
-            final LogSegmentMetadata l,
-            final int scanStartBatchSize,
-            final int scanMaxBatchSize,
-            final AtomicInteger numRecordsScanned,
-            final ExecutorService executorService,
-            final LogSegmentEntryStore entryStore,
-            final DLSN dlsn) {
-        long startEntryId = 0L;
-        if (l.getLogSegmentSequenceNumber() == dlsn.getLogSegmentSequenceNo()) 
{
-            startEntryId = dlsn.getEntryId();
-        }
-        final LogRecordSelector selector = new 
FirstDLSNNotLessThanSelector(dlsn);
-        return asyncReadRecord(streamName, l, false, false, false, 
scanStartBatchSize,
-                               scanMaxBatchSize, numRecordsScanned, 
executorService, entryStore,
-                               selector, false /* backward */, startEntryId);
-    }
-
-    //
-    // Private methods for scanning log segments
-    //
-
-    private static class ScanContext {
-        // variables to about current scan state
-        final AtomicInteger numEntriesToScan;
-        final AtomicLong curStartEntryId;
-        final AtomicLong curEndEntryId;
-
-        // scan settings
-        final long startEntryId;
-        final long endEntryId;
-        final int scanStartBatchSize;
-        final int scanMaxBatchSize;
-        final boolean includeControl;
-        final boolean includeEndOfStream;
-        final boolean backward;
-
-        // number of records scanned
-        final AtomicInteger numRecordsScanned;
-
-        ScanContext(long startEntryId, long endEntryId,
-                    int scanStartBatchSize,
-                    int scanMaxBatchSize,
-                    boolean includeControl,
-                    boolean includeEndOfStream,
-                    boolean backward,
-                    AtomicInteger numRecordsScanned) {
-            this.startEntryId = startEntryId;
-            this.endEntryId = endEntryId;
-            this.scanStartBatchSize = scanStartBatchSize;
-            this.scanMaxBatchSize = scanMaxBatchSize;
-            this.includeControl = includeControl;
-            this.includeEndOfStream = includeEndOfStream;
-            this.backward = backward;
-            // Scan state
-            this.numEntriesToScan = new AtomicInteger(scanStartBatchSize);
-            if (backward) {
-                this.curStartEntryId = new AtomicLong(
-                        Math.max(startEntryId, (endEntryId - 
scanStartBatchSize + 1)));
-                this.curEndEntryId = new AtomicLong(endEntryId);
-            } else {
-                this.curStartEntryId = new AtomicLong(startEntryId);
-                this.curEndEntryId = new AtomicLong(
-                        Math.min(endEntryId, (startEntryId + 
scanStartBatchSize - 1)));
-            }
-            this.numRecordsScanned = numRecordsScanned;
-        }
-
-        boolean moveToNextRange() {
-            if (backward) {
-                return moveBackward();
-            } else {
-                return moveForward();
-            }
-        }
-
-        boolean moveBackward() {
-            long nextEndEntryId = curStartEntryId.get() - 1;
-            if (nextEndEntryId < startEntryId) {
-                // no entries to read again
-                return false;
-            }
-            curEndEntryId.set(nextEndEntryId);
-            // update num entries to scan
-            numEntriesToScan.set(
-                    Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize));
-            // update start entry id
-            curStartEntryId.set(Math.max(startEntryId, nextEndEntryId - 
numEntriesToScan.get() + 1));
-            return true;
-        }
-
-        boolean moveForward() {
-            long nextStartEntryId = curEndEntryId.get() + 1;
-            if (nextStartEntryId > endEntryId) {
-                // no entries to read again
-                return false;
-            }
-            curStartEntryId.set(nextStartEntryId);
-            // update num entries to scan
-            numEntriesToScan.set(
-                    Math.min(numEntriesToScan.get() * 2, scanMaxBatchSize));
-            // update start entry id
-            curEndEntryId.set(Math.min(endEntryId, nextStartEntryId + 
numEntriesToScan.get() - 1));
-            return true;
-        }
-    }
-
-    private static class SingleEntryScanContext extends ScanContext {
-        SingleEntryScanContext(long entryId) {
-            super(entryId, entryId, 1, 1, true, true, false, new 
AtomicInteger(0));
-        }
-    }
-
-    /**
-     * Read record from a given range of log segment entries.
-     *
-     * @param streamName
-     *          fully qualified stream name (used for logging)
-     * @param reader
-     *          log segment random access reader
-     * @param executorService
-     *          executor service used for processing entries
-     * @param context
-     *          scan context
-     * @return a future with the log record.
-     */
-    private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries(
-            final String streamName,
-            final LogSegmentRandomAccessEntryReader reader,
-            final LogSegmentMetadata metadata,
-            final ExecutorService executorService,
-            final ScanContext context,
-            final LogRecordSelector selector) {
-        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
-        final long startEntryId = context.curStartEntryId.get();
-        final long endEntryId = context.curEndEntryId.get();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("{} reading entries [{} - {}] from {}.",
-                    new Object[] { streamName, startEntryId, endEntryId, 
metadata});
-        }
-        FutureEventListener<List<Entry.Reader>> readEntriesListener =
-            new FutureEventListener<List<Entry.Reader>>() {
-                @Override
-                public void onSuccess(final List<Entry.Reader> entries) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("{} finished reading entries [{} - {}] from 
{}",
-                                new Object[]{ streamName, startEntryId, 
endEntryId, metadata});
-                    }
-                    for (Entry.Reader entry : entries) {
-                        try {
-                            visitEntryRecords(entry, context, selector);
-                        } catch (IOException ioe) {
-                            // exception is only thrown due to bad ledger 
entry, so it might be corrupted
-                            // we shouldn't do anything beyond this point. 
throw the exception to application
-                            promise.setException(ioe);
-                            return;
-                        }
-                    }
-
-                    LogRecordWithDLSN record = selector.result();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("{} got record from entries [{} - {}] of {} 
: {}",
-                                new Object[]{streamName, startEntryId, 
endEntryId,
-                                        metadata, record});
-                    }
-                    promise.setValue(record);
-                }
-
-                @Override
-                public void onFailure(final Throwable cause) {
-                    promise.setException(cause);
-                }
-            };
-        reader.readEntries(startEntryId, endEntryId)
-                
.addEventListener(FutureEventListenerRunnable.of(readEntriesListener, 
executorService));
-        return promise;
-    }
-
-    /**
-     * Process each record using LogRecordSelector.
-     *
-     * @param entry
-     *          ledger entry
-     * @param context
-     *          scan context
-     * @return log record with dlsn inside the ledger entry
-     * @throws IOException
-     */
-    private static void visitEntryRecords(
-            Entry.Reader entry,
-            ScanContext context,
-            LogRecordSelector selector) throws IOException {
-        LogRecordWithDLSN nextRecord = entry.nextRecord();
-        while (nextRecord != null) {
-            LogRecordWithDLSN record = nextRecord;
-            nextRecord = entry.nextRecord();
-            context.numRecordsScanned.incrementAndGet();
-            if (!context.includeControl && record.isControl()) {
-                continue;
-            }
-            if (!context.includeEndOfStream && record.isEndOfStream()) {
-                continue;
-            }
-            selector.process(record);
-        }
-    }
-
-    /**
-     * Scan entries for the given record.
-     *
-     * @param streamName
-     *          fully qualified stream name (used for logging)
-     * @param reader
-     *          log segment random access reader
-     * @param executorService
-     *          executor service used for processing entries
-     * @param promise
-     *          promise to return desired record.
-     * @param context
-     *          scan context
-     */
-    private static void asyncReadRecordFromEntries(
-            final String streamName,
-            final LogSegmentRandomAccessEntryReader reader,
-            final LogSegmentMetadata metadata,
-            final ExecutorService executorService,
-            final Promise<LogRecordWithDLSN> promise,
-            final ScanContext context,
-            final LogRecordSelector selector) {
-        FutureEventListener<LogRecordWithDLSN> readEntriesListener =
-            new FutureEventListener<LogRecordWithDLSN>() {
-                @Override
-                public void onSuccess(LogRecordWithDLSN value) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("{} read record from [{} - {}] of {} : {}",
-                                new Object[]{streamName, 
context.curStartEntryId.get(), context.curEndEntryId.get(),
-                                        metadata, value});
-                    }
-                    if (null != value) {
-                        promise.setValue(value);
-                        return;
-                    }
-                    if (!context.moveToNextRange()) {
-                        // no entries to read again
-                        promise.setValue(null);
-                        return;
-                    }
-                    // scan next range
-                    asyncReadRecordFromEntries(streamName,
-                            reader,
-                            metadata,
-                            executorService,
-                            promise,
-                            context,
-                            selector);
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    promise.setException(cause);
-                }
-            };
-        asyncReadRecordFromEntries(streamName, reader, metadata, 
executorService, context, selector)
-                
.addEventListener(FutureEventListenerRunnable.of(readEntriesListener, 
executorService));
-    }
-
-    private static void asyncReadRecordFromLogSegment(
-            final String streamName,
-            final LogSegmentRandomAccessEntryReader reader,
-            final LogSegmentMetadata metadata,
-            final ExecutorService executorService,
-            final int scanStartBatchSize,
-            final int scanMaxBatchSize,
-            final boolean includeControl,
-            final boolean includeEndOfStream,
-            final Promise<LogRecordWithDLSN> promise,
-            final AtomicInteger numRecordsScanned,
-            final LogRecordSelector selector,
-            final boolean backward,
-            final long startEntryId) {
-        final long lastAddConfirmed = reader.getLastAddConfirmed();
-        if (lastAddConfirmed < 0) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Log segment {} is empty for {}.", new Object[] { 
metadata, streamName });
-            }
-            promise.setValue(null);
-            return;
-        }
-        final ScanContext context = new ScanContext(
-                startEntryId, lastAddConfirmed,
-                scanStartBatchSize, scanMaxBatchSize,
-                includeControl, includeEndOfStream, backward, 
numRecordsScanned);
-        asyncReadRecordFromEntries(streamName, reader, metadata, 
executorService,
-                                   promise, context, selector);
-    }
-
-    private static Future<LogRecordWithDLSN> asyncReadRecord(
-            final String streamName,
-            final LogSegmentMetadata l,
-            final boolean fence,
-            final boolean includeControl,
-            final boolean includeEndOfStream,
-            final int scanStartBatchSize,
-            final int scanMaxBatchSize,
-            final AtomicInteger numRecordsScanned,
-            final ExecutorService executorService,
-            final LogSegmentEntryStore entryStore,
-            final LogRecordSelector selector,
-            final boolean backward,
-            final long startEntryId) {
-
-        final Promise<LogRecordWithDLSN> promise = new 
Promise<LogRecordWithDLSN>();
-
-        FutureEventListener<LogSegmentRandomAccessEntryReader> 
openReaderListener =
-            new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
-                @Override
-                public void onSuccess(final LogSegmentRandomAccessEntryReader 
reader) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("{} Opened log segment {} for reading 
record",
-                                streamName, l);
-                    }
-                    promise.ensure(new AbstractFunction0<BoxedUnit>() {
-                        @Override
-                        public BoxedUnit apply() {
-                            reader.asyncClose();
-                            return BoxedUnit.UNIT;
-                        }
-                    });
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("{} {} scanning {}.", new Object[]{
-                                (backward ? "backward" : "forward"), 
streamName, l});
-                    }
-                    asyncReadRecordFromLogSegment(
-                            streamName, reader, l, executorService,
-                            scanStartBatchSize, scanMaxBatchSize,
-                            includeControl, includeEndOfStream,
-                            promise, numRecordsScanned, selector, backward, 
startEntryId);
-                }
-
-                @Override
-                public void onFailure(final Throwable cause) {
-                    promise.setException(cause);
-                }
-            };
-        entryStore.openRandomAccessReader(l, fence)
-                
.addEventListener(FutureEventListenerRunnable.of(openReaderListener, 
executorService));
-        return promise;
-    }
-
-    //
-    // Search Functions
-    //
-
-    /**
-     * Get the log record whose transaction id is not less than provided 
<code>transactionId</code>.
-     *
-     * <p>
-     * It uses a binary-search like algorithm to find the log record whose 
transaction id is not less than
-     * provided <code>transactionId</code> within a log <code>segment</code>. 
You could think of a log segment
-     * in terms of a sequence of records whose transaction ids are 
non-decreasing.
-     *
-     * - The sequence of records within a log segment is divided into N pieces.
-     * - Find the piece of records that contains a record whose transaction id 
is not less than provided
-     *   <code>transactionId</code>.
-     *
-     * N could be chosen based on trading off concurrency and latency.
-     * </p>
-     *
-     * @param logName
-     *          name of the log
-     * @param segment
-     *          metadata of the log segment
-     * @param transactionId
-     *          transaction id
-     * @param executorService
-     *          executor service used for processing entries
-     * @param entryStore
-     *          log segment entry store
-     * @param nWays
-     *          how many number of entries to search in parallel
-     * @return found log record. none if all transaction ids are less than 
provided <code>transactionId</code>.
-     */
-    public static Future<Optional<LogRecordWithDLSN>> 
getLogRecordNotLessThanTxId(
-            final String logName,
-            final LogSegmentMetadata segment,
-            final long transactionId,
-            final ExecutorService executorService,
-            final LogSegmentEntryStore entryStore,
-            final int nWays) {
-        if (!segment.isInProgress()) {
-            if (segment.getLastTxId() < transactionId) {
-                // all log records whose transaction id is less than provided 
transactionId
-                // then return none
-                Optional<LogRecordWithDLSN> noneRecord = Optional.absent();
-                return Future.value(noneRecord);
-            }
-        }
-
-        final Promise<Optional<LogRecordWithDLSN>> promise =
-                new Promise<Optional<LogRecordWithDLSN>>();
-        final FutureEventListener<LogSegmentRandomAccessEntryReader> 
openReaderListener =
-            new FutureEventListener<LogSegmentRandomAccessEntryReader>() {
-                @Override
-                public void onSuccess(final LogSegmentRandomAccessEntryReader 
reader) {
-                    promise.ensure(new AbstractFunction0<BoxedUnit>() {
-                        @Override
-                        public BoxedUnit apply() {
-                            reader.asyncClose();
-                            return BoxedUnit.UNIT;
-                        }
-
-                    });
-                    long lastEntryId = reader.getLastAddConfirmed();
-                    if (lastEntryId < 0) {
-                        // it means that the log segment is created but not 
written yet or an empty log segment.
-                        // it is equivalent to 'all log records whose 
transaction id is less than provided transactionId'
-                        Optional<LogRecordWithDLSN> nonRecord = 
Optional.absent();
-                        promise.setValue(nonRecord);
-                        return;
-                    }
-                    // all log records whose transaction id is not less than 
provided transactionId
-                    if (segment.getFirstTxId() >= transactionId) {
-                        final FirstTxIdNotLessThanSelector selector =
-                                new 
FirstTxIdNotLessThanSelector(transactionId);
-                        asyncReadRecordFromEntries(
-                                logName,
-                                reader,
-                                segment,
-                                executorService,
-                                new SingleEntryScanContext(0L),
-                                selector
-                        ).addEventListener(new 
FutureEventListener<LogRecordWithDLSN>() {
-                            @Override
-                            public void onSuccess(LogRecordWithDLSN value) {
-                                
promise.setValue(Optional.of(selector.result()));
-                            }
-
-                            @Override
-                            public void onFailure(Throwable cause) {
-                                promise.setException(cause);
-                            }
-                        });
-
-                        return;
-                    }
-                    getLogRecordNotLessThanTxIdFromEntries(
-                            logName,
-                            segment,
-                            transactionId,
-                            executorService,
-                            reader,
-                            Lists.newArrayList(0L, lastEntryId),
-                            nWays,
-                            Optional.<LogRecordWithDLSN>absent(),
-                            promise);
-                }
-
-                @Override
-                public void onFailure(final Throwable cause) {
-                    promise.setException(cause);
-                }
-            };
-
-        entryStore.openRandomAccessReader(segment, false)
-                
.addEventListener(FutureEventListenerRunnable.of(openReaderListener, 
executorService));
-        return promise;
-    }
-
-    /**
-     * Find the log record whose transaction id is not less than provided 
<code>transactionId</code> from
-     * entries between <code>startEntryId</code> and <code>endEntryId</code>.
-     *
-     * @param logName
-     *          name of the log
-     * @param segment
-     *          log segment
-     * @param transactionId
-     *          provided transaction id to search
-     * @param executorService
-     *          executor service
-     * @param reader
-     *          log segment random access reader
-     * @param entriesToSearch
-     *          list of entries to search
-     * @param nWays
-     *          how many entries to search in parallel
-     * @param prevFoundRecord
-     *          the log record found in previous search
-     * @param promise
-     *          promise to satisfy the result
-     */
-    private static void getLogRecordNotLessThanTxIdFromEntries(
-            final String logName,
-            final LogSegmentMetadata segment,
-            final long transactionId,
-            final ExecutorService executorService,
-            final LogSegmentRandomAccessEntryReader reader,
-            final List<Long> entriesToSearch,
-            final int nWays,
-            final Optional<LogRecordWithDLSN> prevFoundRecord,
-            final Promise<Optional<LogRecordWithDLSN>> promise) {
-        final List<Future<LogRecordWithDLSN>> searchResults =
-                Lists.newArrayListWithExpectedSize(entriesToSearch.size());
-        for (Long entryId : entriesToSearch) {
-            LogRecordSelector selector = new 
FirstTxIdNotLessThanSelector(transactionId);
-            Future<LogRecordWithDLSN> searchResult = 
asyncReadRecordFromEntries(
-                    logName,
-                    reader,
-                    segment,
-                    executorService,
-                    new SingleEntryScanContext(entryId),
-                    selector);
-            searchResults.add(searchResult);
-        }
-        FutureEventListener<List<LogRecordWithDLSN>> 
processSearchResultsListener =
-                new FutureEventListener<List<LogRecordWithDLSN>>() {
-                    @Override
-                    public void onSuccess(List<LogRecordWithDLSN> resultList) {
-                        processSearchResults(
-                                logName,
-                                segment,
-                                transactionId,
-                                executorService,
-                                reader,
-                                resultList,
-                                nWays,
-                                prevFoundRecord,
-                                promise);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        promise.setException(cause);
-                    }
-                };
-        Future.collect(searchResults).addEventListener(
-                FutureEventListenerRunnable.of(processSearchResultsListener, 
executorService));
-    }
-
-    /**
-     * Process the search results
-     */
-    static void processSearchResults(
-            final String logName,
-            final LogSegmentMetadata segment,
-            final long transactionId,
-            final ExecutorService executorService,
-            final LogSegmentRandomAccessEntryReader reader,
-            final List<LogRecordWithDLSN> searchResults,
-            final int nWays,
-            final Optional<LogRecordWithDLSN> prevFoundRecord,
-            final Promise<Optional<LogRecordWithDLSN>> promise) {
-        int found = -1;
-        for (int i = 0; i < searchResults.size(); i++) {
-            LogRecordWithDLSN record = searchResults.get(i);
-            if (record.getTransactionId() >= transactionId) {
-                found = i;
-                break;
-            }
-        }
-        if (found == -1) { // all log records' transaction id is less than 
provided transaction id
-            promise.setValue(prevFoundRecord);
-            return;
-        }
-        // we found a log record
-        LogRecordWithDLSN foundRecord = searchResults.get(found);
-
-        // we found it
-        //   - it is not the first record
-        //   - it is the first record in first search entry
-        //   - its entry is adjacent to previous search entry
-        if (foundRecord.getDlsn().getSlotId() != 0L
-                || found == 0
-                || foundRecord.getDlsn().getEntryId() == 
(searchResults.get(found - 1).getDlsn().getEntryId() + 1)) {
-            promise.setValue(Optional.of(foundRecord));
-            return;
-        }
-
-        // otherwise, we need to search
-        List<Long> nextSearchBatch = getEntriesToSearch(
-                transactionId,
-                searchResults.get(found - 1),
-                searchResults.get(found),
-                nWays);
-        if (nextSearchBatch.isEmpty()) {
-            promise.setValue(prevFoundRecord);
-            return;
-        }
-        getLogRecordNotLessThanTxIdFromEntries(
-                logName,
-                segment,
-                transactionId,
-                executorService,
-                reader,
-                nextSearchBatch,
-                nWays,
-                Optional.of(foundRecord),
-                promise);
-    }
-
-    /**
-     * Get the entries to search provided <code>transactionId</code> between
-     * <code>firstRecord</code> and <code>lastRecord</code>. 
<code>firstRecord</code>
-     * and <code>lastRecord</code> are already searched, which the transaction 
id
-     * of <code>firstRecord</code> is less than <code>transactionId</code> and 
the
-     * transaction id of <code>lastRecord</code> is not less than 
<code>transactionId</code>.
-     *
-     * @param transactionId
-     *          transaction id to search
-     * @param firstRecord
-     *          log record that already searched whose transaction id is leass 
than <code>transactionId</code>.
-     * @param lastRecord
-     *          log record that already searched whose transaction id is not 
less than <code>transactionId</code>.
-     * @param nWays
-     *          N-ways to search
-     * @return the list of entries to search
-     */
-    static List<Long> getEntriesToSearch(
-            long transactionId,
-            LogRecordWithDLSN firstRecord,
-            LogRecordWithDLSN lastRecord,
-            int nWays) {
-        long txnDiff = lastRecord.getTransactionId() - 
firstRecord.getTransactionId();
-        if (txnDiff > 0) {
-            if (lastRecord.getTransactionId() == transactionId) {
-                List<Long> entries = getEntriesToSearch(
-                        firstRecord.getDlsn().getEntryId() + 1,
-                        lastRecord.getDlsn().getEntryId() - 2,
-                        Math.max(MIN_SEARCH_BATCH_SIZE, nWays - 1));
-                entries.add(lastRecord.getDlsn().getEntryId() - 1);
-                return entries;
-            } else {
-                // TODO: improve it by estimating transaction ids.
-                return getEntriesToSearch(
-                        firstRecord.getDlsn().getEntryId() + 1,
-                        lastRecord.getDlsn().getEntryId() - 1,
-                        nWays);
-            }
-        } else {
-            // unexpected condition
-            return Lists.newArrayList();
-        }
-    }
-
-    static List<Long> getEntriesToSearch(
-            long startEntryId,
-            long endEntryId,
-            int nWays) {
-        if (startEntryId > endEntryId) {
-            return Lists.newArrayList();
-        }
-        long numEntries = endEntryId - startEntryId + 1;
-        long step = Math.max(1L, numEntries / nWays);
-        List<Long> entryList = Lists.newArrayListWithExpectedSize(nWays);
-        for (long i = startEntryId, j = nWays - 1; i <= endEntryId && j > 0; i 
+= step, j--) {
-            entryList.add(i);
-        }
-        if (entryList.get(entryList.size() - 1) < endEntryId) {
-            entryList.add(endEntryId);
-        }
-        return entryList;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
deleted file mode 100644
index 0b24c1a..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.exceptions.OverCapacityException;
-import com.twitter.distributedlog.util.PermitLimiter;
-
-public class WriteLimiter {
-
-    String streamName;
-    final PermitLimiter streamLimiter;
-    final PermitLimiter globalLimiter;
-
-    public WriteLimiter(String streamName, PermitLimiter streamLimiter, 
PermitLimiter globalLimiter) {
-        this.streamName = streamName;
-        this.streamLimiter = streamLimiter;
-        this.globalLimiter = globalLimiter;
-    }
-
-    public void acquire() throws OverCapacityException {
-        if (!streamLimiter.acquire()) {
-            throw new OverCapacityException(String.format("Stream write 
capacity exceeded for stream %s", streamName));
-        }
-        try {
-            if (!globalLimiter.acquire()) {
-                throw new OverCapacityException("Global write capacity 
exceeded");
-            }
-        } catch (OverCapacityException ex) {
-            streamLimiter.release(1);
-            throw ex;
-        }
-    }
-
-    public void release() {
-        release(1);
-    }
-
-    public void release(int permits) {
-        streamLimiter.release(permits);
-        globalLimiter.release(permits);
-    }
-
-    public void close() {
-        streamLimiter.close();
-        globalLimiter.close();
-    }
-}

Reply via email to