http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
new file mode 100644
index 0000000..eedfbd6
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -0,0 +1,751 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.apache.distributedlog.exceptions.IdleReaderException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.ReadCancelledException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Throw;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * BookKeeper based {@link AsyncLogReader} implementation.
+ *
+ * <h3>Metrics</h3>
+ * All the metrics are exposed under `async_reader`.
+ * <ul>
+ * <li> `async_reader`/future_set: opstats. time spent on satisfying futures 
of read requests.
+ * if it is high, it means that the caller takes time on processing the result 
of read requests.
+ * The side effect is blocking consequent reads.
+ * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads.
+ * <li> `async_reader`/background_read: opstats. time spent on background 
reads.
+ * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link 
#readNext()}.
+ * <li> `async_reader`/time_between_read_next: opstats. time spent on between 
two consequent {@link #readNext()}.
+ * if it is high, it means that the caller is slowing down on calling {@link 
#readNext()}.
+ * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency 
for the read requests.
+ * <li> `async_reader`/idle_reader_error: counter. the number idle reader 
errors.
+ * </ul>
+ */
+class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification {
+    static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class);
+
+    private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> 
READ_NEXT_MAP_FUNCTION =
+            new AbstractFunction1<List<LogRecordWithDLSN>, 
LogRecordWithDLSN>() {
+                @Override
+                public LogRecordWithDLSN apply(List<LogRecordWithDLSN> 
records) {
+                    return records.get(0);
+                }
+            };
+
+    private final String streamName;
+    protected final BKDistributedLogManager bkDistributedLogManager;
+    protected final BKLogReadHandler readHandler;
+    private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>();
+    private final OrderedScheduler scheduler;
+    private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = 
new ConcurrentLinkedQueue<PendingReadRequest>();
+    private final Object scheduleLock = new Object();
+    private final AtomicLong scheduleCount = new AtomicLong(0);
+    final private Stopwatch scheduleDelayStopwatch;
+    final private Stopwatch readNextDelayStopwatch;
+    private DLSN startDLSN;
+    private ReadAheadEntryReader readAheadReader = null;
+    private int lastPosition = 0;
+    private final boolean positionGapDetectionEnabled;
+    private final int idleErrorThresholdMillis;
+    final ScheduledFuture<?> idleReaderTimeoutTask;
+    private ScheduledFuture<?> backgroundScheduleTask = null;
+    // last process time
+    private final Stopwatch lastProcessTime;
+
+    protected Promise<Void> closeFuture = null;
+
+    private boolean lockStream = false;
+
+    private final boolean returnEndOfStreamRecord;
+
+    private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
+        @Override
+        public void run() {
+            synchronized (scheduleLock) {
+                backgroundScheduleTask = null;
+            }
+            scheduleBackgroundRead();
+        }
+    };
+
+    // State
+    private Entry.Reader currentEntry = null;
+    private LogRecordWithDLSN nextRecord = null;
+
+    // Failure Injector
+    private boolean disableProcessingReadRequests = false;
+
+    // Stats
+    private final OpStatsLogger readNextExecTime;
+    private final OpStatsLogger delayUntilPromiseSatisfied;
+    private final OpStatsLogger timeBetweenReadNexts;
+    private final OpStatsLogger futureSetLatency;
+    private final OpStatsLogger scheduleLatency;
+    private final OpStatsLogger backgroundReaderRunTime;
+    private final Counter idleReaderCheckCount;
+    private final Counter idleReaderCheckIdleReadRequestCount;
+    private final Counter idleReaderCheckIdleReadAheadCount;
+    private final Counter idleReaderError;
+
+    private class PendingReadRequest {
+        private final Stopwatch enqueueTime;
+        private final int numEntries;
+        private final List<LogRecordWithDLSN> records;
+        private final Promise<List<LogRecordWithDLSN>> promise;
+        private final long deadlineTime;
+        private final TimeUnit deadlineTimeUnit;
+
+        PendingReadRequest(int numEntries,
+                           long deadlineTime,
+                           TimeUnit deadlineTimeUnit) {
+            this.numEntries = numEntries;
+            this.enqueueTime = Stopwatch.createStarted();
+            // optimize the space usage for single read.
+            if (numEntries == 1) {
+                this.records = new ArrayList<LogRecordWithDLSN>(1);
+            } else {
+                this.records = new ArrayList<LogRecordWithDLSN>();
+            }
+            this.promise = new Promise<List<LogRecordWithDLSN>>();
+            this.deadlineTime = deadlineTime;
+            this.deadlineTimeUnit = deadlineTimeUnit;
+        }
+
+        Promise<List<LogRecordWithDLSN>> getPromise() {
+            return promise;
+        }
+
+        long elapsedSinceEnqueue(TimeUnit timeUnit) {
+            return enqueueTime.elapsed(timeUnit);
+        }
+
+        void setException(Throwable throwable) {
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            if (promise.updateIfEmpty(new 
Throw<List<LogRecordWithDLSN>>(throwable))) {
+                
futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+                
delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
+            }
+        }
+
+        boolean hasReadRecords() {
+            return records.size() > 0;
+        }
+
+        boolean hasReadEnoughRecords() {
+            return records.size() >= numEntries;
+        }
+
+        long getRemainingWaitTime() {
+            if (deadlineTime <= 0L) {
+                return 0L;
+            }
+            return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit);
+        }
+
+        void addRecord(LogRecordWithDLSN record) {
+            records.add(record);
+        }
+
+        void complete() {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("{} : Satisfied promise with {} records", 
readHandler.getFullyQualifiedName(), records.size());
+            }
+            
delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            promise.setValue(records);
+            
futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    BKAsyncLogReader(BKDistributedLogManager bkdlm,
+                     OrderedScheduler scheduler,
+                     DLSN startDLSN,
+                     Optional<String> subscriberId,
+                     boolean returnEndOfStreamRecord,
+                     StatsLogger statsLogger) {
+        this.streamName = bkdlm.getStreamName();
+        this.bkDistributedLogManager = bkdlm;
+        this.scheduler = scheduler;
+        this.readHandler = 
bkDistributedLogManager.createReadHandler(subscriberId,
+                this, true);
+        LOG.debug("Starting async reader at {}", startDLSN);
+        this.startDLSN = startDLSN;
+        this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
+        this.readNextDelayStopwatch = Stopwatch.createStarted();
+        this.positionGapDetectionEnabled = 
bkdlm.getConf().getPositionGapDetectionEnabled();
+        this.idleErrorThresholdMillis = 
bkdlm.getConf().getReaderIdleErrorThresholdMillis();
+        this.returnEndOfStreamRecord = returnEndOfStreamRecord;
+
+        // Stats
+        StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
+        futureSetLatency = 
asyncReaderStatsLogger.getOpStatsLogger("future_set");
+        scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule");
+        backgroundReaderRunTime = 
asyncReaderStatsLogger.getOpStatsLogger("background_read");
+        readNextExecTime = 
asyncReaderStatsLogger.getOpStatsLogger("read_next_exec");
+        timeBetweenReadNexts = 
asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next");
+        delayUntilPromiseSatisfied = 
asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied");
+        idleReaderError = 
asyncReaderStatsLogger.getCounter("idle_reader_error");
+        idleReaderCheckCount = 
asyncReaderStatsLogger.getCounter("idle_reader_check_total");
+        idleReaderCheckIdleReadRequestCount = 
asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests");
+        idleReaderCheckIdleReadAheadCount = 
asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead");
+
+        // Lock the stream if requested. The lock will be released when the 
reader is closed.
+        this.lockStream = false;
+        this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
+        this.lastProcessTime = Stopwatch.createStarted();
+    }
+
+    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
+        if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
+            // Dont run the task more than once every seconds (for sanity)
+            long period = Math.max(idleErrorThresholdMillis / 10, 1000);
+            // Except when idle reader threshold is less than a second (tests?)
+            period = Math.min(period, idleErrorThresholdMillis / 5);
+
+            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
+                @Override
+                public void run() {
+                    PendingReadRequest nextRequest = pendingRequests.peek();
+
+                    idleReaderCheckCount.inc();
+                    if (null == nextRequest) {
+                        return;
+                    }
+
+                    idleReaderCheckIdleReadRequestCount.inc();
+                    if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) 
< idleErrorThresholdMillis) {
+                        return;
+                    }
+
+                    ReadAheadEntryReader readAheadReader = 
getReadAheadReader();
+
+                    // read request has been idle
+                    //   - cache has records but read request are idle,
+                    //     that means notification was missed between 
readahead and reader.
+                    //   - cache is empty and readahead is idle (no records 
added for a long time)
+                    idleReaderCheckIdleReadAheadCount.inc();
+                    try {
+                        if (null == readAheadReader || (!hasMoreRecords() &&
+                                
readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) 
{
+                            markReaderAsIdle();
+                            return;
+                        } else if 
(lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
+                            markReaderAsIdle();;
+                        }
+                    } catch (IOException e) {
+                        setLastException(e);
+                        return;
+                    }
+                }
+            }, period, period, TimeUnit.MILLISECONDS);
+        }
+        return null;
+    }
+
+    synchronized ReadAheadEntryReader getReadAheadReader() {
+        return readAheadReader;
+    }
+
+    void cancelIdleReaderTask() {
+        // Do this after we have checked that the reader was not previously 
closed
+        try {
+            if (null != idleReaderTimeoutTask) {
+                idleReaderTimeoutTask.cancel(true);
+            }
+        } catch (Exception exc) {
+            LOG.info("{}: Failed to cancel the background idle reader timeout 
task", readHandler.getFullyQualifiedName());
+        }
+    }
+
+    private void markReaderAsIdle() {
+        idleReaderError.inc();
+        IdleReaderException ire = new IdleReaderException("Reader on stream "
+                + readHandler.getFullyQualifiedName()
+                + " is idle for " + idleErrorThresholdMillis +" ms");
+        setLastException(ire);
+        // cancel all pending reads directly rather than notifying on error
+        // because idle reader could happen on idle read requests that usually 
means something wrong
+        // in scheduling reads
+        cancelAllPendingReads(ire);
+    }
+
+    protected synchronized void setStartDLSN(DLSN fromDLSN) throws 
UnexpectedException {
+        if (null != readAheadReader) {
+            throw new UnexpectedException("Could't reset from dlsn after 
reader already starts reading.");
+        }
+        startDLSN = fromDLSN;
+    }
+
+    @VisibleForTesting
+    public synchronized DLSN getStartDLSN() {
+        return startDLSN;
+    }
+
+    public Future<Void> lockStream() {
+        this.lockStream = true;
+        return readHandler.lockStream();
+    }
+
+    private boolean checkClosedOrInError(String operation) {
+        if (null == lastException.get()) {
+            try {
+                if (null != readHandler && null != getReadAheadReader()) {
+                    getReadAheadReader().checkLastException();
+                }
+
+                bkDistributedLogManager.checkClosedOrInError(operation);
+            } catch (IOException exc) {
+                setLastException(exc);
+            }
+        }
+
+        if (lockStream) {
+            try {
+                readHandler.checkReadLock();
+            } catch (IOException ex) {
+                setLastException(ex);
+            }
+        }
+
+        if (null != lastException.get()) {
+            LOG.trace("Cancelling pending reads");
+            cancelAllPendingReads(lastException.get());
+            return true;
+        }
+
+        return false;
+    }
+
+    private void setLastException(IOException exc) {
+        lastException.compareAndSet(null, exc);
+    }
+
+    @Override
+    public String getStreamName() {
+        return streamName;
+    }
+
+    /**
+     * @return A promise that when satisfied will contain the Log Record with 
its DLSN.
+     */
+    @Override
+    public synchronized Future<LogRecordWithDLSN> readNext() {
+        return readInternal(1, 0, 
TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION);
+    }
+
+    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int 
numEntries) {
+        return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int 
numEntries,
+                                                                 long waitTime,
+                                                                 TimeUnit 
timeUnit) {
+        return readInternal(numEntries, waitTime, timeUnit);
+    }
+
+    /**
+     * Read up to <i>numEntries</i> entries. The future will be satisfied when 
any number of entries are
+     * ready (1 to <i>numEntries</i>).
+     *
+     * @param numEntries
+     *          num entries to read
+     * @return A promise that satisfied with a non-empty list of log records 
with their DLSN.
+     */
+    private synchronized Future<List<LogRecordWithDLSN>> readInternal(int 
numEntries,
+                                                                      long 
deadlineTime,
+                                                                      TimeUnit 
deadlineTimeUnit) {
+        
timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+        readNextDelayStopwatch.reset().start();
+        final PendingReadRequest readRequest = new 
PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
+
+        if (null == readAheadReader) {
+            final ReadAheadEntryReader readAheadEntryReader = 
this.readAheadReader = new ReadAheadEntryReader(
+                    getStreamName(),
+                    getStartDLSN(),
+                    bkDistributedLogManager.getConf(),
+                    readHandler,
+                    bkDistributedLogManager.getReaderEntryStore(),
+                    bkDistributedLogManager.getScheduler(),
+                    Ticker.systemTicker(),
+                    bkDistributedLogManager.alertStatsLogger);
+            readHandler.checkLogStreamExists().addEventListener(new 
FutureEventListener<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    try {
+                        readHandler.registerListener(readAheadEntryReader);
+                        readHandler.asyncStartFetchLogSegments()
+                                .map(new 
AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
+                                    @Override
+                                    public BoxedUnit 
apply(Versioned<List<LogSegmentMetadata>> logSegments) {
+                                        
readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
+                                        
readAheadEntryReader.start(logSegments.getValue());
+                                        return BoxedUnit.UNIT;
+                                    }
+                                });
+                    } catch (Exception exc) {
+                        notifyOnError(exc);
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    notifyOnError(cause);
+                }
+            });
+        }
+
+        if (checkClosedOrInError("readNext")) {
+            readRequest.setException(lastException.get());
+        } else {
+            boolean queueEmpty = pendingRequests.isEmpty();
+            pendingRequests.add(readRequest);
+
+            if (queueEmpty) {
+                scheduleBackgroundRead();
+            }
+        }
+
+        
readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+        readNextDelayStopwatch.reset().start();
+
+        return readRequest.getPromise();
+    }
+
+    public synchronized void scheduleBackgroundRead() {
+        // if the reader is already closed, we don't need to schedule 
background read again.
+        if (null != closeFuture) {
+            return;
+        }
+
+        long prevCount = scheduleCount.getAndIncrement();
+        if (0 == prevCount) {
+            scheduleDelayStopwatch.reset().start();
+            scheduler.submit(streamName, this);
+        }
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        // Cancel the idle reader timeout task, interrupting if necessary
+        ReadCancelledException exception;
+        Promise<Void> closePromise;
+        synchronized (this) {
+            if (null != closeFuture) {
+                return closeFuture;
+            }
+            closePromise = closeFuture = new Promise<Void>();
+            exception = new 
ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was 
closed");
+            setLastException(exception);
+        }
+
+        // Do this after we have checked that the reader was not previously 
closed
+        cancelIdleReaderTask();
+
+        synchronized (scheduleLock) {
+            if (null != backgroundScheduleTask) {
+                backgroundScheduleTask.cancel(true);
+            }
+        }
+
+        cancelAllPendingReads(exception);
+
+        ReadAheadEntryReader readAheadReader = getReadAheadReader();
+        if (null != readAheadReader) {
+            readHandler.unregisterListener(readAheadReader);
+            readAheadReader.removeStateChangeNotification(this);
+        }
+        Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
+                readAheadReader,
+                readHandler
+        ).proxyTo(closePromise);
+        return closePromise;
+    }
+
+    private void cancelAllPendingReads(Throwable throwExc) {
+        for (PendingReadRequest promise : pendingRequests) {
+            promise.setException(throwExc);
+        }
+        pendingRequests.clear();
+    }
+
+    synchronized boolean hasMoreRecords() throws IOException {
+        if (null == readAheadReader) {
+            return false;
+        }
+        if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
+            return true;
+        } else if (null != currentEntry) {
+            nextRecord = currentEntry.nextRecord();
+            return null != nextRecord;
+        }
+        return false;
+    }
+
+    private synchronized LogRecordWithDLSN readNextRecord() throws IOException 
{
+        if (null == readAheadReader) {
+            return null;
+        }
+        if (null == currentEntry) {
+            currentEntry = readAheadReader.getNextReadAheadEntry(0L, 
TimeUnit.MILLISECONDS);
+            // no entry after reading from read ahead then return null
+            if (null == currentEntry) {
+                return null;
+            }
+        }
+
+        LogRecordWithDLSN recordToReturn;
+        if (null == nextRecord) {
+            nextRecord = currentEntry.nextRecord();
+            // no more records in current entry
+            if (null == nextRecord) {
+                currentEntry = null;
+                return readNextRecord();
+            }
+        }
+
+        // found a record to return and prefetch the next one
+        recordToReturn = nextRecord;
+        nextRecord = currentEntry.nextRecord();
+        return recordToReturn;
+    }
+
+    @Override
+    public void run() {
+        synchronized(scheduleLock) {
+            if (scheduleDelayStopwatch.isRunning()) {
+                
scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+
+            Stopwatch runTime = Stopwatch.createStarted();
+            int iterations = 0;
+            long scheduleCountLocal = scheduleCount.get();
+            LOG.debug("{}: Scheduled Background Reader", 
readHandler.getFullyQualifiedName());
+            while(true) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("{}: Executing Iteration: {}", 
readHandler.getFullyQualifiedName(), iterations++);
+                }
+
+                PendingReadRequest nextRequest = null;
+                synchronized(this) {
+                    nextRequest = pendingRequests.peek();
+
+                    // Queue is empty, nothing to read, return
+                    if (null == nextRequest) {
+                        LOG.trace("{}: Queue Empty waiting for Input", 
readHandler.getFullyQualifiedName());
+                        scheduleCount.set(0);
+                        
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        return;
+                    }
+
+                    if (disableProcessingReadRequests) {
+                        LOG.info("Reader of {} is forced to stop processing 
read requests", readHandler.getFullyQualifiedName());
+                        return;
+                    }
+                }
+                lastProcessTime.reset().start();
+
+                // If the oldest pending promise is interrupted then we must 
mark
+                // the reader in error and abort all pending reads since we 
dont
+                // know the last consumed read
+                if (null == lastException.get()) {
+                    if (nextRequest.getPromise().isInterrupted().isDefined()) {
+                        setLastException(new 
DLInterruptedException("Interrupted on reading " + 
readHandler.getFullyQualifiedName() + " : ",
+                                
nextRequest.getPromise().isInterrupted().get()));
+                    }
+                }
+
+                if (checkClosedOrInError("readNext")) {
+                    if (!(lastException.get().getCause() instanceof 
LogNotFoundException)) {
+                        LOG.warn("{}: Exception", 
readHandler.getFullyQualifiedName(), lastException.get());
+                    }
+                    
backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                    return;
+                }
+
+                try {
+                    // Fail 10% of the requests when asked to simulate errors
+                    if 
(bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
+                        throw new IOException("Reader Simulated Exception");
+                    }
+                    LogRecordWithDLSN record;
+                    while (!nextRequest.hasReadEnoughRecords()) {
+                        // read single record
+                        do {
+                            record = readNextRecord();
+                        } while (null != record && (record.isControl() || 
(record.getDlsn().compareTo(getStartDLSN()) < 0)));
+                        if (null == record) {
+                            break;
+                        } else {
+                            if (record.isEndOfStream() && 
!returnEndOfStreamRecord) {
+                                setLastException(new EndOfStreamException("End 
of Stream Reached for "
+                                        + 
readHandler.getFullyQualifiedName()));
+                                break;
+                            }
+
+                            // gap detection
+                            if (recordPositionsContainsGap(record, 
lastPosition)) {
+                                bkDistributedLogManager.raiseAlert("Gap 
detected between records at record = {}", record);
+                                if (positionGapDetectionEnabled) {
+                                    throw new DLIllegalStateException("Gap 
detected between records at record = " + record);
+                                }
+                            }
+                            lastPosition = 
record.getLastPositionWithinLogSegment();
+
+                            nextRequest.addRecord(record);
+                        }
+                    };
+                } catch (IOException exc) {
+                    setLastException(exc);
+                    if (!(exc instanceof LogNotFoundException)) {
+                        LOG.warn("{} : read with skip Exception", 
readHandler.getFullyQualifiedName(), lastException.get());
+                    }
+                    continue;
+                }
+
+                if (nextRequest.hasReadRecords()) {
+                    long remainingWaitTime = 
nextRequest.getRemainingWaitTime();
+                    if (remainingWaitTime > 0 && 
!nextRequest.hasReadEnoughRecords()) {
+                        
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        scheduleDelayStopwatch.reset().start();
+                        scheduleCount.set(0);
+                        // the request could still wait for more records
+                        backgroundScheduleTask = scheduler.schedule(
+                                streamName,
+                                BACKGROUND_READ_SCHEDULER,
+                                remainingWaitTime,
+                                nextRequest.deadlineTimeUnit);
+                        return;
+                    }
+
+                    PendingReadRequest request = pendingRequests.poll();
+                    if (null != request && nextRequest == request) {
+                        request.complete();
+                        if (null != backgroundScheduleTask) {
+                            backgroundScheduleTask.cancel(true);
+                            backgroundScheduleTask = null;
+                        }
+                    } else {
+                        DLIllegalStateException ise = new 
DLIllegalStateException("Unexpected condition at dlsn = "
+                                + nextRequest.records.get(0).getDlsn());
+                        nextRequest.setException(ise);
+                        if (null != request) {
+                            request.setException(ise);
+                        }
+                        // We should never get here as we should have exited 
the loop if
+                        // pendingRequests were empty
+                        bkDistributedLogManager.raiseAlert("Unexpected 
condition at dlsn = {}",
+                                nextRequest.records.get(0).getDlsn());
+                        setLastException(ise);
+                    }
+                } else {
+                    if (0 == scheduleCountLocal) {
+                        LOG.trace("Schedule count dropping to zero", 
lastException.get());
+                        
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+                        return;
+                    }
+                    scheduleCountLocal = scheduleCount.decrementAndGet();
+                }
+            }
+        }
+    }
+
+    private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long 
lastPosition) {
+        final boolean firstLogRecord = (1 == 
record.getPositionWithinLogSegment());
+        final boolean endOfStreamRecord = record.isEndOfStream();
+        final boolean emptyLogSegment = (0 == lastPosition);
+        final boolean positionIncreasedByOne = 
(record.getPositionWithinLogSegment() == (lastPosition + 1));
+
+        return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment &&
+               !positionIncreasedByOne;
+    }
+
+    /**
+     * Triggered when the background activity encounters an exception
+     */
+    @Override
+    public void notifyOnError(Throwable cause) {
+        if (cause instanceof IOException) {
+            setLastException((IOException) cause);
+        } else {
+            setLastException(new IOException(cause));
+        }
+        scheduleBackgroundRead();
+    }
+
+    /**
+     * Triggered when the background activity completes an operation
+     */
+    @Override
+    public void notifyOnOperationComplete() {
+        scheduleBackgroundRead();
+    }
+
+    @VisibleForTesting
+    void simulateErrors() {
+        bkDistributedLogManager.getFailureInjector().injectErrors(true);
+    }
+
+    @VisibleForTesting
+    synchronized void disableReadAheadLogSegmentsNotification() {
+        readHandler.disableReadAheadLogSegmentsNotification();
+    }
+
+    @VisibleForTesting
+    synchronized void disableProcessingReadRequests() {
+        disableProcessingReadRequests = true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
new file mode 100644
index 0000000..1102ff5
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.StreamNotReadyException;
+import org.apache.distributedlog.exceptions.WriteCancelledException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.feature.CoreFeatureKeys;
+import org.apache.distributedlog.util.FailpointUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Try;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.Option;
+import scala.runtime.AbstractFunction1;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * BookKeeper based {@link AsyncLogWriter} implementation.
+ *
+ * <h3>Metrics</h3>
+ * All the metrics are exposed under `log_writer`.
+ * <ul>
+ * <li> `log_writer/write`: opstats. latency characteristics about the time 
that write operations spent.
+ * <li> `log_writer/bulk_write`: opstats. latency characteristics about the 
time that bulk_write
+ * operations spent.
+ * are pending in the queue for long time due to log segment rolling.
+ * <li> `log_writer/get_writer`: opstats. the time spent on getting the 
writer. it could spike when there
+ * is log segment rolling happened during getting the writer. it is a good 
stat to look into when the latency
+ * is caused by queuing time.
+ * <li> `log_writer/pending_request_dispatch`: counter. the number of queued 
operations that are dispatched
+ * after log segment is rolled. it is an metric on measuring how many 
operations has been queued because of
+ * log segment rolling.
+ * </ul>
+ * See {@link BKLogSegmentWriter} for segment writer stats.
+ */
+public class BKAsyncLogWriter extends BKAbstractLogWriter implements 
AsyncLogWriter {
+
+    static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class);
+
+    static Function1<List<LogSegmentMetadata>, Boolean> 
TruncationResultConverter =
+            new AbstractFunction1<List<LogSegmentMetadata>, Boolean>() {
+                @Override
+                public Boolean apply(List<LogSegmentMetadata> segments) {
+                    return true;
+                }
+            };
+
+    // Records pending for roll log segment.
+    class PendingLogRecord implements FutureEventListener<DLSN> {
+
+        final LogRecord record;
+        final Promise<DLSN> promise;
+        final boolean flush;
+
+        PendingLogRecord(LogRecord record, boolean flush) {
+            this.record = record;
+            this.promise = new Promise<DLSN>();
+            this.flush = flush;
+        }
+
+        @Override
+        public void onSuccess(DLSN value) {
+            promise.setValue(value);
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            promise.setException(cause);
+            encounteredError = true;
+        }
+    }
+
+    /**
+     * Last pending record in current log segment. After it is satisified, it 
would
+     * roll log segment.
+     *
+     * This implementation is based on the assumption that all future 
satisified in same
+     * order future pool.
+     */
+    class LastPendingLogRecord extends PendingLogRecord {
+
+        LastPendingLogRecord(LogRecord record, boolean flush) {
+            super(record, flush);
+        }
+
+        @Override
+        public void onSuccess(DLSN value) {
+            super.onSuccess(value);
+            // roll log segment and issue all pending requests.
+            rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            super.onFailure(cause);
+            // error out pending requests.
+            errorOutPendingRequestsAndWriter(cause);
+        }
+    }
+
+    private final boolean streamFailFast;
+    private final boolean disableRollOnSegmentError;
+    private LinkedList<PendingLogRecord> pendingRequests = null;
+    private volatile boolean encounteredError = false;
+    private Promise<BKLogSegmentWriter> rollingFuture = null;
+    private long lastTxId = DistributedLogConstants.INVALID_TXID;
+
+    private final StatsLogger statsLogger;
+    private final OpStatsLogger writeOpStatsLogger;
+    private final OpStatsLogger markEndOfStreamOpStatsLogger;
+    private final OpStatsLogger bulkWriteOpStatsLogger;
+    private final OpStatsLogger getWriterOpStatsLogger;
+    private final Counter pendingRequestDispatch;
+
+    private final Feature disableLogSegmentRollingFeature;
+
+    BKAsyncLogWriter(DistributedLogConfiguration conf,
+                     DynamicDistributedLogConfiguration dynConf,
+                     BKDistributedLogManager bkdlm,
+                     BKLogWriteHandler writeHandler, /** log writer owns the 
handler **/
+                     FeatureProvider featureProvider,
+                     StatsLogger dlmStatsLogger) {
+        super(conf, dynConf, bkdlm);
+        this.writeHandler = writeHandler;
+        this.streamFailFast = conf.getFailFastOnStreamNotReady();
+        this.disableRollOnSegmentError = 
conf.getDisableRollingOnLogSegmentError();
+
+        // features
+        disableLogSegmentRollingFeature = 
featureProvider.getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase());
+        // stats
+        this.statsLogger = dlmStatsLogger.scope("log_writer");
+        this.writeOpStatsLogger = statsLogger.getOpStatsLogger("write");
+        this.markEndOfStreamOpStatsLogger = 
statsLogger.getOpStatsLogger("mark_end_of_stream");
+        this.bulkWriteOpStatsLogger = 
statsLogger.getOpStatsLogger("bulk_write");
+        this.getWriterOpStatsLogger = 
statsLogger.getOpStatsLogger("get_writer");
+        this.pendingRequestDispatch = 
statsLogger.getCounter("pending_request_dispatch");
+    }
+
+    @VisibleForTesting
+    synchronized void setLastTxId(long txId) {
+        lastTxId = Math.max(lastTxId, txId);
+    }
+
+    @Override
+    public synchronized long getLastTxId() {
+        return lastTxId;
+    }
+
+    /**
+     * Write a log record as control record. The method will be used by 
Monitor Service to enforce a new inprogress segment.
+     *
+     * @param record
+     *          log record
+     * @return future of the write
+     */
+    public Future<DLSN> writeControlRecord(final LogRecord record) {
+        record.setControl();
+        return write(record);
+    }
+
+    private BKLogSegmentWriter getCachedLogSegmentWriter() throws 
WriteException {
+        if (encounteredError) {
+            throw new WriteException(bkDistributedLogManager.getStreamName(),
+                    "writer has been closed due to error.");
+        }
+        BKLogSegmentWriter segmentWriter = getCachedLogWriter();
+        if (null != segmentWriter
+                && segmentWriter.isLogSegmentInError()
+                && !disableRollOnSegmentError) {
+            return null;
+        } else {
+            return segmentWriter;
+        }
+    }
+
+    private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
+                                                           boolean bestEffort,
+                                                           boolean rollLog,
+                                                           boolean 
allowMaxTxID) {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        return FutureUtils.stats(
+                doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, 
allowMaxTxID),
+                getWriterOpStatsLogger,
+                stopwatch);
+    }
+
+    private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long 
firstTxid,
+                                                             final boolean 
bestEffort,
+                                                             final boolean 
rollLog,
+                                                             final boolean 
allowMaxTxID) {
+        if (encounteredError) {
+            return Future.exception(new 
WriteException(bkDistributedLogManager.getStreamName(),
+                    "writer has been closed due to error."));
+        }
+        Future<BKLogSegmentWriter> writerFuture = 
asyncGetLedgerWriter(!disableRollOnSegmentError);
+        if (null == writerFuture) {
+            return rollLogSegmentIfNecessary(null, firstTxid, bestEffort, 
allowMaxTxID);
+        } else if (rollLog) {
+            return writerFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
+                @Override
+                public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter 
writer) {
+                    return rollLogSegmentIfNecessary(writer, firstTxid, 
bestEffort, allowMaxTxID);
+                }
+            });
+        } else {
+            return writerFuture;
+        }
+    }
+
+    /**
+     * We write end of stream marker by writing a record with MAX_TXID, so we 
need to allow using
+     * max txid when rolling for this case only.
+     */
+    private Future<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() {
+        return getLogSegmentWriter(DistributedLogConstants.MAX_TXID,
+                                     false /* bestEffort */,
+                                     false /* roll log */,
+                                     true /* allow max txid */);
+    }
+
+    private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
+                                                           boolean bestEffort,
+                                                           boolean rollLog) {
+        return getLogSegmentWriter(firstTxid, bestEffort, rollLog, false /* 
allow max txid */);
+    }
+
+    Future<DLSN> queueRequest(LogRecord record, boolean flush) {
+        PendingLogRecord pendingLogRecord = new PendingLogRecord(record, 
flush);
+        pendingRequests.add(pendingLogRecord);
+        return pendingLogRecord.promise;
+    }
+
+    boolean shouldRollLog(BKLogSegmentWriter w) {
+        try {
+            return null == w ||
+                    (!disableLogSegmentRollingFeature.isAvailable() &&
+                    shouldStartNewSegment(w));
+        } catch (IOException ioe) {
+            return false;
+        }
+    }
+
+    void startQueueingRequests() {
+        assert(null == pendingRequests && null == rollingFuture);
+        pendingRequests = new LinkedList<PendingLogRecord>();
+        rollingFuture = new Promise<BKLogSegmentWriter>();
+    }
+
+    // for ordering guarantee, we shouldn't send requests to next log segments 
until
+    // previous log segment is done.
+    private synchronized Future<DLSN> asyncWrite(final LogRecord record,
+                                                 boolean flush) {
+        // The passed in writer may be stale since we acquire the writer 
outside of sync
+        // lock. If we recently rolled and the new writer is cached, use that 
instead.
+        Future<DLSN> result = null;
+        BKLogSegmentWriter w;
+        try {
+            w = getCachedLogSegmentWriter();
+        } catch (WriteException we) {
+            return Future.exception(we);
+        }
+        if (null != rollingFuture) {
+            if (streamFailFast) {
+                result = Future.exception(new StreamNotReadyException("Rolling 
log segment"));
+            } else {
+                result = queueRequest(record, flush);
+            }
+        } else if (shouldRollLog(w)) {
+            // insert a last record, so when it called back, we will trigger a 
log segment rolling
+            startQueueingRequests();
+            if (null != w) {
+                LastPendingLogRecord lastLogRecordInCurrentSegment = new 
LastPendingLogRecord(record, flush);
+                w.asyncWrite(record, 
true).addEventListener(lastLogRecordInCurrentSegment);
+                result = lastLogRecordInCurrentSegment.promise;
+            } else { // no log segment yet. roll the log segment and issue 
pending requests.
+                result = queueRequest(record, flush);
+                
rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
+            }
+        } else {
+            result = w.asyncWrite(record, flush);
+        }
+        // use map here rather than onSuccess because we want lastTxId to be 
updated before
+        // satisfying the future
+        return result.map(new AbstractFunction1<DLSN, DLSN>() {
+            @Override
+            public DLSN apply(DLSN dlsn) {
+                setLastTxId(record.getTransactionId());
+                return dlsn;
+            }
+        });
+    }
+
+    private List<Future<DLSN>> asyncWriteBulk(List<LogRecord> records) {
+        final ArrayList<Future<DLSN>> results = new 
ArrayList<Future<DLSN>>(records.size());
+        Iterator<LogRecord> iterator = records.iterator();
+        while (iterator.hasNext()) {
+            LogRecord record = iterator.next();
+            Future<DLSN> future = asyncWrite(record, !iterator.hasNext());
+            results.add(future);
+
+            // Abort early if an individual write has already failed.
+            Option<Try<DLSN>> result = future.poll();
+            if (result.isDefined() && result.get().isThrow()) {
+                break;
+            }
+        }
+        if (records.size() > results.size()) {
+            appendCancelledFutures(results, records.size() - results.size());
+        }
+        return results;
+    }
+
+    private void appendCancelledFutures(List<Future<DLSN>> futures, int 
numToAdd) {
+        final WriteCancelledException cre =
+            new WriteCancelledException(getStreamName());
+        for (int i = 0; i < numToAdd; i++) {
+            Future<DLSN> cancelledFuture = Future.exception(cre);
+            futures.add(cancelledFuture);
+        }
+    }
+
+    private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) {
+        getLogSegmentWriter(firstTxId, true, true)
+                .addEventListener(new 
FutureEventListener<BKLogSegmentWriter>() {
+            @Override
+            public void onSuccess(BKLogSegmentWriter writer) {
+                try {
+                    synchronized (BKAsyncLogWriter.this) {
+                        for (PendingLogRecord pendingLogRecord : 
pendingRequests) {
+                            
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending);
+                            writer.asyncWrite(pendingLogRecord.record, 
pendingLogRecord.flush)
+                                    .addEventListener(pendingLogRecord);
+                        }
+                        // if there are no records in the pending queue, let's 
write a control record
+                        // so that when a new log segment is rolled, a control 
record will be added and
+                        // the corresponding bookies would be able to create 
its ledger.
+                        if (pendingRequests.isEmpty()) {
+                            LogRecord controlRecord = new LogRecord(firstTxId,
+                                    
DistributedLogConstants.CONTROL_RECORD_CONTENT);
+                            controlRecord.setControl();
+                            PendingLogRecord controlReq = new 
PendingLogRecord(controlRecord, false);
+                            writer.asyncWrite(controlReq.record, 
controlReq.flush)
+                                    .addEventListener(controlReq);
+                        }
+                        if (null != rollingFuture) {
+                            FutureUtils.setValue(rollingFuture, writer);
+                        }
+                        rollingFuture = null;
+                        pendingRequestDispatch.add(pendingRequests.size());
+                        pendingRequests = null;
+                    }
+                } catch (IOException ioe) {
+                    errorOutPendingRequestsAndWriter(ioe);
+                }
+            }
+            @Override
+            public void onFailure(Throwable cause) {
+                errorOutPendingRequestsAndWriter(cause);
+            }
+        });
+    }
+
+    @VisibleForTesting
+    void errorOutPendingRequests(Throwable cause, boolean errorOutWriter) {
+        final List<PendingLogRecord> pendingRequestsSnapshot;
+        synchronized (this) {
+            pendingRequestsSnapshot = pendingRequests;
+            encounteredError = errorOutWriter;
+            pendingRequests = null;
+            if (null != rollingFuture) {
+                FutureUtils.setException(rollingFuture, cause);
+            }
+            rollingFuture = null;
+        }
+
+        pendingRequestDispatch.add(pendingRequestsSnapshot.size());
+
+        // After erroring out the writer above, no more requests
+        // will be enqueued to pendingRequests
+        for (PendingLogRecord pendingLogRecord : pendingRequestsSnapshot) {
+            pendingLogRecord.promise.setException(cause);
+        }
+    }
+
+    void errorOutPendingRequestsAndWriter(Throwable cause) {
+        errorOutPendingRequests(cause, true /* error out writer */);
+    }
+
+    /**
+     * Write a log record to the stream.
+     *
+     * @param record single log record
+     */
+    @Override
+    public Future<DLSN> write(final LogRecord record) {
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        return FutureUtils.stats(
+                asyncWrite(record, true),
+                writeOpStatsLogger,
+                stopwatch);
+    }
+
+    /**
+     * Write many log records to the stream. The return type here is 
unfortunate but its a direct result
+     * of having to combine FuturePool and the asyncWriteBulk method which 
returns a future as well. The
+     * problem is the List that asyncWriteBulk returns can't be materialized 
until getLogSegmentWriter
+     * completes, so it has to be wrapped in a future itself.
+     *
+     * @param records list of records
+     */
+    @Override
+    public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) 
{
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        return FutureUtils.stats(
+                Future.value(asyncWriteBulk(records)),
+                bulkWriteOpStatsLogger,
+                stopwatch);
+    }
+
+    @Override
+    public Future<Boolean> truncate(final DLSN dlsn) {
+        if (DLSN.InvalidDLSN == dlsn) {
+            return Future.value(false);
+        }
+        BKLogWriteHandler writeHandler;
+        try {
+            writeHandler = getWriteHandler();
+        } catch (IOException e) {
+            return Future.exception(e);
+        }
+        return 
writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).map(TruncationResultConverter);
+    }
+
+    Future<Long> flushAndCommit() {
+        Future<BKLogSegmentWriter> writerFuture;
+        synchronized (this) {
+            if (null != this.rollingFuture) {
+                writerFuture = this.rollingFuture;
+            } else {
+                writerFuture = getCachedLogWriterFuture();
+            }
+        }
+        if (null == writerFuture) {
+            return Future.value(getLastTxId());
+        }
+        return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, 
Future<Long>>() {
+            @Override
+            public Future<Long> apply(BKLogSegmentWriter writer) {
+                return writer.flushAndCommit();
+            }
+        });
+    }
+
+    Future<Long> markEndOfStream() {
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        Future<BKLogSegmentWriter> logSegmentWriterFuture;
+        synchronized (this) {
+            logSegmentWriterFuture = this.rollingFuture;
+        }
+        if (null == logSegmentWriterFuture) {
+            logSegmentWriterFuture = getLogSegmentWriterForEndOfStream();
+        }
+
+        return FutureUtils.stats(
+                logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
+                    @Override
+                    public Future<Long> apply(BKLogSegmentWriter w) {
+                        return w.markEndOfStream();
+                    }
+                }),
+                markEndOfStreamOpStatsLogger,
+                stopwatch);
+    }
+
+    @Override
+    protected Future<Void> asyncCloseAndComplete() {
+        Future<BKLogSegmentWriter> logSegmentWriterFuture;
+        synchronized (this) {
+            logSegmentWriterFuture = this.rollingFuture;
+        }
+
+        if (null == logSegmentWriterFuture) {
+            return super.asyncCloseAndComplete();
+        } else {
+            return logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Void>>() {
+                @Override
+                public Future<Void> apply(BKLogSegmentWriter segmentWriter) {
+                    return BKAsyncLogWriter.super.asyncCloseAndComplete();
+                }
+            });
+        }
+    }
+
+    @Override
+    void closeAndComplete() throws IOException {
+        FutureUtils.result(asyncCloseAndComplete());
+    }
+
+    /**
+     * *TEMP HACK*
+     * Get the name of the stream this writer writes data to
+     */
+    @Override
+    public String getStreamName() {
+        return bkDistributedLogManager.getStreamName();
+    }
+
+    @Override
+    public Future<Void> asyncAbort() {
+        Future<Void> result = super.asyncAbort();
+        synchronized (this) {
+            if (pendingRequests != null) {
+                for (PendingLogRecord pendingLogRecord : pendingRequests) {
+                    pendingLogRecord.promise.setException(new 
WriteException(bkDistributedLogManager.getStreamName(),
+                            "abort wring: writer has been closed due to 
error."));
+                }
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("AsyncLogWriter:%s", getStreamName());
+    }
+}

Reply via email to