http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
deleted file mode 100644
index aee4103..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
+++ /dev/null
@@ -1,751 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Ticker;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.exceptions.ReadCancelledException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Throw;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * BookKeeper based {@link AsyncLogReader} implementation.
- *
- * <h3>Metrics</h3>
- * All the metrics are exposed under `async_reader`.
- * <ul>
- * <li> `async_reader`/future_set: opstats. time spent on satisfying futures 
of read requests.
- * if it is high, it means that the caller takes time on processing the result 
of read requests.
- * The side effect is blocking consequent reads.
- * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads.
- * <li> `async_reader`/background_read: opstats. time spent on background 
reads.
- * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link 
#readNext()}.
- * <li> `async_reader`/time_between_read_next: opstats. time spent on between 
two consequent {@link #readNext()}.
- * if it is high, it means that the caller is slowing down on calling {@link 
#readNext()}.
- * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency 
for the read requests.
- * <li> `async_reader`/idle_reader_error: counter. the number idle reader 
errors.
- * </ul>
- */
-class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification {
-    static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class);
-
-    private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> 
READ_NEXT_MAP_FUNCTION =
-            new AbstractFunction1<List<LogRecordWithDLSN>, 
LogRecordWithDLSN>() {
-                @Override
-                public LogRecordWithDLSN apply(List<LogRecordWithDLSN> 
records) {
-                    return records.get(0);
-                }
-            };
-
-    private final String streamName;
-    protected final BKDistributedLogManager bkDistributedLogManager;
-    protected final BKLogReadHandler readHandler;
-    private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>();
-    private final OrderedScheduler scheduler;
-    private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = 
new ConcurrentLinkedQueue<PendingReadRequest>();
-    private final Object scheduleLock = new Object();
-    private final AtomicLong scheduleCount = new AtomicLong(0);
-    final private Stopwatch scheduleDelayStopwatch;
-    final private Stopwatch readNextDelayStopwatch;
-    private DLSN startDLSN;
-    private ReadAheadEntryReader readAheadReader = null;
-    private int lastPosition = 0;
-    private final boolean positionGapDetectionEnabled;
-    private final int idleErrorThresholdMillis;
-    final ScheduledFuture<?> idleReaderTimeoutTask;
-    private ScheduledFuture<?> backgroundScheduleTask = null;
-    // last process time
-    private final Stopwatch lastProcessTime;
-
-    protected Promise<Void> closeFuture = null;
-
-    private boolean lockStream = false;
-
-    private final boolean returnEndOfStreamRecord;
-
-    private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
-        @Override
-        public void run() {
-            synchronized (scheduleLock) {
-                backgroundScheduleTask = null;
-            }
-            scheduleBackgroundRead();
-        }
-    };
-
-    // State
-    private Entry.Reader currentEntry = null;
-    private LogRecordWithDLSN nextRecord = null;
-
-    // Failure Injector
-    private boolean disableProcessingReadRequests = false;
-
-    // Stats
-    private final OpStatsLogger readNextExecTime;
-    private final OpStatsLogger delayUntilPromiseSatisfied;
-    private final OpStatsLogger timeBetweenReadNexts;
-    private final OpStatsLogger futureSetLatency;
-    private final OpStatsLogger scheduleLatency;
-    private final OpStatsLogger backgroundReaderRunTime;
-    private final Counter idleReaderCheckCount;
-    private final Counter idleReaderCheckIdleReadRequestCount;
-    private final Counter idleReaderCheckIdleReadAheadCount;
-    private final Counter idleReaderError;
-
-    private class PendingReadRequest {
-        private final Stopwatch enqueueTime;
-        private final int numEntries;
-        private final List<LogRecordWithDLSN> records;
-        private final Promise<List<LogRecordWithDLSN>> promise;
-        private final long deadlineTime;
-        private final TimeUnit deadlineTimeUnit;
-
-        PendingReadRequest(int numEntries,
-                           long deadlineTime,
-                           TimeUnit deadlineTimeUnit) {
-            this.numEntries = numEntries;
-            this.enqueueTime = Stopwatch.createStarted();
-            // optimize the space usage for single read.
-            if (numEntries == 1) {
-                this.records = new ArrayList<LogRecordWithDLSN>(1);
-            } else {
-                this.records = new ArrayList<LogRecordWithDLSN>();
-            }
-            this.promise = new Promise<List<LogRecordWithDLSN>>();
-            this.deadlineTime = deadlineTime;
-            this.deadlineTimeUnit = deadlineTimeUnit;
-        }
-
-        Promise<List<LogRecordWithDLSN>> getPromise() {
-            return promise;
-        }
-
-        long elapsedSinceEnqueue(TimeUnit timeUnit) {
-            return enqueueTime.elapsed(timeUnit);
-        }
-
-        void setException(Throwable throwable) {
-            Stopwatch stopwatch = Stopwatch.createStarted();
-            if (promise.updateIfEmpty(new 
Throw<List<LogRecordWithDLSN>>(throwable))) {
-                
futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-                
delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
-            }
-        }
-
-        boolean hasReadRecords() {
-            return records.size() > 0;
-        }
-
-        boolean hasReadEnoughRecords() {
-            return records.size() >= numEntries;
-        }
-
-        long getRemainingWaitTime() {
-            if (deadlineTime <= 0L) {
-                return 0L;
-            }
-            return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit);
-        }
-
-        void addRecord(LogRecordWithDLSN record) {
-            records.add(record);
-        }
-
-        void complete() {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("{} : Satisfied promise with {} records", 
readHandler.getFullyQualifiedName(), records.size());
-            }
-            
delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
-            Stopwatch stopwatch = Stopwatch.createStarted();
-            promise.setValue(records);
-            
futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-        }
-    }
-
-    BKAsyncLogReader(BKDistributedLogManager bkdlm,
-                     OrderedScheduler scheduler,
-                     DLSN startDLSN,
-                     Optional<String> subscriberId,
-                     boolean returnEndOfStreamRecord,
-                     StatsLogger statsLogger) {
-        this.streamName = bkdlm.getStreamName();
-        this.bkDistributedLogManager = bkdlm;
-        this.scheduler = scheduler;
-        this.readHandler = 
bkDistributedLogManager.createReadHandler(subscriberId,
-                this, true);
-        LOG.debug("Starting async reader at {}", startDLSN);
-        this.startDLSN = startDLSN;
-        this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
-        this.readNextDelayStopwatch = Stopwatch.createStarted();
-        this.positionGapDetectionEnabled = 
bkdlm.getConf().getPositionGapDetectionEnabled();
-        this.idleErrorThresholdMillis = 
bkdlm.getConf().getReaderIdleErrorThresholdMillis();
-        this.returnEndOfStreamRecord = returnEndOfStreamRecord;
-
-        // Stats
-        StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader");
-        futureSetLatency = 
asyncReaderStatsLogger.getOpStatsLogger("future_set");
-        scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule");
-        backgroundReaderRunTime = 
asyncReaderStatsLogger.getOpStatsLogger("background_read");
-        readNextExecTime = 
asyncReaderStatsLogger.getOpStatsLogger("read_next_exec");
-        timeBetweenReadNexts = 
asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next");
-        delayUntilPromiseSatisfied = 
asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied");
-        idleReaderError = 
asyncReaderStatsLogger.getCounter("idle_reader_error");
-        idleReaderCheckCount = 
asyncReaderStatsLogger.getCounter("idle_reader_check_total");
-        idleReaderCheckIdleReadRequestCount = 
asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests");
-        idleReaderCheckIdleReadAheadCount = 
asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead");
-
-        // Lock the stream if requested. The lock will be released when the 
reader is closed.
-        this.lockStream = false;
-        this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
-        this.lastProcessTime = Stopwatch.createStarted();
-    }
-
-    private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
-        if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
-            // Dont run the task more than once every seconds (for sanity)
-            long period = Math.max(idleErrorThresholdMillis / 10, 1000);
-            // Except when idle reader threshold is less than a second (tests?)
-            period = Math.min(period, idleErrorThresholdMillis / 5);
-
-            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
-                @Override
-                public void run() {
-                    PendingReadRequest nextRequest = pendingRequests.peek();
-
-                    idleReaderCheckCount.inc();
-                    if (null == nextRequest) {
-                        return;
-                    }
-
-                    idleReaderCheckIdleReadRequestCount.inc();
-                    if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) 
< idleErrorThresholdMillis) {
-                        return;
-                    }
-
-                    ReadAheadEntryReader readAheadReader = 
getReadAheadReader();
-
-                    // read request has been idle
-                    //   - cache has records but read request are idle,
-                    //     that means notification was missed between 
readahead and reader.
-                    //   - cache is empty and readahead is idle (no records 
added for a long time)
-                    idleReaderCheckIdleReadAheadCount.inc();
-                    try {
-                        if (null == readAheadReader || (!hasMoreRecords() &&
-                                
readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) 
{
-                            markReaderAsIdle();
-                            return;
-                        } else if 
(lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) {
-                            markReaderAsIdle();;
-                        }
-                    } catch (IOException e) {
-                        setLastException(e);
-                        return;
-                    }
-                }
-            }, period, period, TimeUnit.MILLISECONDS);
-        }
-        return null;
-    }
-
-    synchronized ReadAheadEntryReader getReadAheadReader() {
-        return readAheadReader;
-    }
-
-    void cancelIdleReaderTask() {
-        // Do this after we have checked that the reader was not previously 
closed
-        try {
-            if (null != idleReaderTimeoutTask) {
-                idleReaderTimeoutTask.cancel(true);
-            }
-        } catch (Exception exc) {
-            LOG.info("{}: Failed to cancel the background idle reader timeout 
task", readHandler.getFullyQualifiedName());
-        }
-    }
-
-    private void markReaderAsIdle() {
-        idleReaderError.inc();
-        IdleReaderException ire = new IdleReaderException("Reader on stream "
-                + readHandler.getFullyQualifiedName()
-                + " is idle for " + idleErrorThresholdMillis +" ms");
-        setLastException(ire);
-        // cancel all pending reads directly rather than notifying on error
-        // because idle reader could happen on idle read requests that usually 
means something wrong
-        // in scheduling reads
-        cancelAllPendingReads(ire);
-    }
-
-    protected synchronized void setStartDLSN(DLSN fromDLSN) throws 
UnexpectedException {
-        if (null != readAheadReader) {
-            throw new UnexpectedException("Could't reset from dlsn after 
reader already starts reading.");
-        }
-        startDLSN = fromDLSN;
-    }
-
-    @VisibleForTesting
-    public synchronized DLSN getStartDLSN() {
-        return startDLSN;
-    }
-
-    public Future<Void> lockStream() {
-        this.lockStream = true;
-        return readHandler.lockStream();
-    }
-
-    private boolean checkClosedOrInError(String operation) {
-        if (null == lastException.get()) {
-            try {
-                if (null != readHandler && null != getReadAheadReader()) {
-                    getReadAheadReader().checkLastException();
-                }
-
-                bkDistributedLogManager.checkClosedOrInError(operation);
-            } catch (IOException exc) {
-                setLastException(exc);
-            }
-        }
-
-        if (lockStream) {
-            try {
-                readHandler.checkReadLock();
-            } catch (IOException ex) {
-                setLastException(ex);
-            }
-        }
-
-        if (null != lastException.get()) {
-            LOG.trace("Cancelling pending reads");
-            cancelAllPendingReads(lastException.get());
-            return true;
-        }
-
-        return false;
-    }
-
-    private void setLastException(IOException exc) {
-        lastException.compareAndSet(null, exc);
-    }
-
-    @Override
-    public String getStreamName() {
-        return streamName;
-    }
-
-    /**
-     * @return A promise that when satisfied will contain the Log Record with 
its DLSN.
-     */
-    @Override
-    public synchronized Future<LogRecordWithDLSN> readNext() {
-        return readInternal(1, 0, 
TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION);
-    }
-
-    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int 
numEntries) {
-        return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int 
numEntries,
-                                                                 long waitTime,
-                                                                 TimeUnit 
timeUnit) {
-        return readInternal(numEntries, waitTime, timeUnit);
-    }
-
-    /**
-     * Read up to <i>numEntries</i> entries. The future will be satisfied when 
any number of entries are
-     * ready (1 to <i>numEntries</i>).
-     *
-     * @param numEntries
-     *          num entries to read
-     * @return A promise that satisfied with a non-empty list of log records 
with their DLSN.
-     */
-    private synchronized Future<List<LogRecordWithDLSN>> readInternal(int 
numEntries,
-                                                                      long 
deadlineTime,
-                                                                      TimeUnit 
deadlineTimeUnit) {
-        
timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
-        readNextDelayStopwatch.reset().start();
-        final PendingReadRequest readRequest = new 
PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
-
-        if (null == readAheadReader) {
-            final ReadAheadEntryReader readAheadEntryReader = 
this.readAheadReader = new ReadAheadEntryReader(
-                    getStreamName(),
-                    getStartDLSN(),
-                    bkDistributedLogManager.getConf(),
-                    readHandler,
-                    bkDistributedLogManager.getReaderEntryStore(),
-                    bkDistributedLogManager.getScheduler(),
-                    Ticker.systemTicker(),
-                    bkDistributedLogManager.alertStatsLogger);
-            readHandler.checkLogStreamExists().addEventListener(new 
FutureEventListener<Void>() {
-                @Override
-                public void onSuccess(Void value) {
-                    try {
-                        readHandler.registerListener(readAheadEntryReader);
-                        readHandler.asyncStartFetchLogSegments()
-                                .map(new 
AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                                    @Override
-                                    public BoxedUnit 
apply(Versioned<List<LogSegmentMetadata>> logSegments) {
-                                        
readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
-                                        
readAheadEntryReader.start(logSegments.getValue());
-                                        return BoxedUnit.UNIT;
-                                    }
-                                });
-                    } catch (Exception exc) {
-                        notifyOnError(exc);
-                    }
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    notifyOnError(cause);
-                }
-            });
-        }
-
-        if (checkClosedOrInError("readNext")) {
-            readRequest.setException(lastException.get());
-        } else {
-            boolean queueEmpty = pendingRequests.isEmpty();
-            pendingRequests.add(readRequest);
-
-            if (queueEmpty) {
-                scheduleBackgroundRead();
-            }
-        }
-
-        
readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
-        readNextDelayStopwatch.reset().start();
-
-        return readRequest.getPromise();
-    }
-
-    public synchronized void scheduleBackgroundRead() {
-        // if the reader is already closed, we don't need to schedule 
background read again.
-        if (null != closeFuture) {
-            return;
-        }
-
-        long prevCount = scheduleCount.getAndIncrement();
-        if (0 == prevCount) {
-            scheduleDelayStopwatch.reset().start();
-            scheduler.submit(streamName, this);
-        }
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        // Cancel the idle reader timeout task, interrupting if necessary
-        ReadCancelledException exception;
-        Promise<Void> closePromise;
-        synchronized (this) {
-            if (null != closeFuture) {
-                return closeFuture;
-            }
-            closePromise = closeFuture = new Promise<Void>();
-            exception = new 
ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was 
closed");
-            setLastException(exception);
-        }
-
-        // Do this after we have checked that the reader was not previously 
closed
-        cancelIdleReaderTask();
-
-        synchronized (scheduleLock) {
-            if (null != backgroundScheduleTask) {
-                backgroundScheduleTask.cancel(true);
-            }
-        }
-
-        cancelAllPendingReads(exception);
-
-        ReadAheadEntryReader readAheadReader = getReadAheadReader();
-        if (null != readAheadReader) {
-            readHandler.unregisterListener(readAheadReader);
-            readAheadReader.removeStateChangeNotification(this);
-        }
-        Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
-                readAheadReader,
-                readHandler
-        ).proxyTo(closePromise);
-        return closePromise;
-    }
-
-    private void cancelAllPendingReads(Throwable throwExc) {
-        for (PendingReadRequest promise : pendingRequests) {
-            promise.setException(throwExc);
-        }
-        pendingRequests.clear();
-    }
-
-    synchronized boolean hasMoreRecords() throws IOException {
-        if (null == readAheadReader) {
-            return false;
-        }
-        if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) {
-            return true;
-        } else if (null != currentEntry) {
-            nextRecord = currentEntry.nextRecord();
-            return null != nextRecord;
-        }
-        return false;
-    }
-
-    private synchronized LogRecordWithDLSN readNextRecord() throws IOException 
{
-        if (null == readAheadReader) {
-            return null;
-        }
-        if (null == currentEntry) {
-            currentEntry = readAheadReader.getNextReadAheadEntry(0L, 
TimeUnit.MILLISECONDS);
-            // no entry after reading from read ahead then return null
-            if (null == currentEntry) {
-                return null;
-            }
-        }
-
-        LogRecordWithDLSN recordToReturn;
-        if (null == nextRecord) {
-            nextRecord = currentEntry.nextRecord();
-            // no more records in current entry
-            if (null == nextRecord) {
-                currentEntry = null;
-                return readNextRecord();
-            }
-        }
-
-        // found a record to return and prefetch the next one
-        recordToReturn = nextRecord;
-        nextRecord = currentEntry.nextRecord();
-        return recordToReturn;
-    }
-
-    @Override
-    public void run() {
-        synchronized(scheduleLock) {
-            if (scheduleDelayStopwatch.isRunning()) {
-                
scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            }
-
-            Stopwatch runTime = Stopwatch.createStarted();
-            int iterations = 0;
-            long scheduleCountLocal = scheduleCount.get();
-            LOG.debug("{}: Scheduled Background Reader", 
readHandler.getFullyQualifiedName());
-            while(true) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("{}: Executing Iteration: {}", 
readHandler.getFullyQualifiedName(), iterations++);
-                }
-
-                PendingReadRequest nextRequest = null;
-                synchronized(this) {
-                    nextRequest = pendingRequests.peek();
-
-                    // Queue is empty, nothing to read, return
-                    if (null == nextRequest) {
-                        LOG.trace("{}: Queue Empty waiting for Input", 
readHandler.getFullyQualifiedName());
-                        scheduleCount.set(0);
-                        
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                        return;
-                    }
-
-                    if (disableProcessingReadRequests) {
-                        LOG.info("Reader of {} is forced to stop processing 
read requests", readHandler.getFullyQualifiedName());
-                        return;
-                    }
-                }
-                lastProcessTime.reset().start();
-
-                // If the oldest pending promise is interrupted then we must 
mark
-                // the reader in error and abort all pending reads since we 
dont
-                // know the last consumed read
-                if (null == lastException.get()) {
-                    if (nextRequest.getPromise().isInterrupted().isDefined()) {
-                        setLastException(new 
DLInterruptedException("Interrupted on reading " + 
readHandler.getFullyQualifiedName() + " : ",
-                                
nextRequest.getPromise().isInterrupted().get()));
-                    }
-                }
-
-                if (checkClosedOrInError("readNext")) {
-                    if (!(lastException.get().getCause() instanceof 
LogNotFoundException)) {
-                        LOG.warn("{}: Exception", 
readHandler.getFullyQualifiedName(), lastException.get());
-                    }
-                    
backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                    return;
-                }
-
-                try {
-                    // Fail 10% of the requests when asked to simulate errors
-                    if 
(bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) {
-                        throw new IOException("Reader Simulated Exception");
-                    }
-                    LogRecordWithDLSN record;
-                    while (!nextRequest.hasReadEnoughRecords()) {
-                        // read single record
-                        do {
-                            record = readNextRecord();
-                        } while (null != record && (record.isControl() || 
(record.getDlsn().compareTo(getStartDLSN()) < 0)));
-                        if (null == record) {
-                            break;
-                        } else {
-                            if (record.isEndOfStream() && 
!returnEndOfStreamRecord) {
-                                setLastException(new EndOfStreamException("End 
of Stream Reached for "
-                                        + 
readHandler.getFullyQualifiedName()));
-                                break;
-                            }
-
-                            // gap detection
-                            if (recordPositionsContainsGap(record, 
lastPosition)) {
-                                bkDistributedLogManager.raiseAlert("Gap 
detected between records at record = {}", record);
-                                if (positionGapDetectionEnabled) {
-                                    throw new DLIllegalStateException("Gap 
detected between records at record = " + record);
-                                }
-                            }
-                            lastPosition = 
record.getLastPositionWithinLogSegment();
-
-                            nextRequest.addRecord(record);
-                        }
-                    };
-                } catch (IOException exc) {
-                    setLastException(exc);
-                    if (!(exc instanceof LogNotFoundException)) {
-                        LOG.warn("{} : read with skip Exception", 
readHandler.getFullyQualifiedName(), lastException.get());
-                    }
-                    continue;
-                }
-
-                if (nextRequest.hasReadRecords()) {
-                    long remainingWaitTime = 
nextRequest.getRemainingWaitTime();
-                    if (remainingWaitTime > 0 && 
!nextRequest.hasReadEnoughRecords()) {
-                        
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                        scheduleDelayStopwatch.reset().start();
-                        scheduleCount.set(0);
-                        // the request could still wait for more records
-                        backgroundScheduleTask = scheduler.schedule(
-                                streamName,
-                                BACKGROUND_READ_SCHEDULER,
-                                remainingWaitTime,
-                                nextRequest.deadlineTimeUnit);
-                        return;
-                    }
-
-                    PendingReadRequest request = pendingRequests.poll();
-                    if (null != request && nextRequest == request) {
-                        request.complete();
-                        if (null != backgroundScheduleTask) {
-                            backgroundScheduleTask.cancel(true);
-                            backgroundScheduleTask = null;
-                        }
-                    } else {
-                        DLIllegalStateException ise = new 
DLIllegalStateException("Unexpected condition at dlsn = "
-                                + nextRequest.records.get(0).getDlsn());
-                        nextRequest.setException(ise);
-                        if (null != request) {
-                            request.setException(ise);
-                        }
-                        // We should never get here as we should have exited 
the loop if
-                        // pendingRequests were empty
-                        bkDistributedLogManager.raiseAlert("Unexpected 
condition at dlsn = {}",
-                                nextRequest.records.get(0).getDlsn());
-                        setLastException(ise);
-                    }
-                } else {
-                    if (0 == scheduleCountLocal) {
-                        LOG.trace("Schedule count dropping to zero", 
lastException.get());
-                        
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
-                        return;
-                    }
-                    scheduleCountLocal = scheduleCount.decrementAndGet();
-                }
-            }
-        }
-    }
-
-    private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long 
lastPosition) {
-        final boolean firstLogRecord = (1 == 
record.getPositionWithinLogSegment());
-        final boolean endOfStreamRecord = record.isEndOfStream();
-        final boolean emptyLogSegment = (0 == lastPosition);
-        final boolean positionIncreasedByOne = 
(record.getPositionWithinLogSegment() == (lastPosition + 1));
-
-        return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment &&
-               !positionIncreasedByOne;
-    }
-
-    /**
-     * Triggered when the background activity encounters an exception
-     */
-    @Override
-    public void notifyOnError(Throwable cause) {
-        if (cause instanceof IOException) {
-            setLastException((IOException) cause);
-        } else {
-            setLastException(new IOException(cause));
-        }
-        scheduleBackgroundRead();
-    }
-
-    /**
-     * Triggered when the background activity completes an operation
-     */
-    @Override
-    public void notifyOnOperationComplete() {
-        scheduleBackgroundRead();
-    }
-
-    @VisibleForTesting
-    void simulateErrors() {
-        bkDistributedLogManager.getFailureInjector().injectErrors(true);
-    }
-
-    @VisibleForTesting
-    synchronized void disableReadAheadLogSegmentsNotification() {
-        readHandler.disableReadAheadLogSegmentsNotification();
-    }
-
-    @VisibleForTesting
-    synchronized void disableProcessingReadRequests() {
-        disableProcessingReadRequests = true;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
deleted file mode 100644
index 9432e8a..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.StreamNotReadyException;
-import com.twitter.distributedlog.exceptions.WriteCancelledException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.feature.CoreFeatureKeys;
-import com.twitter.distributedlog.util.FailpointUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.Option;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * BookKeeper based {@link AsyncLogWriter} implementation.
- *
- * <h3>Metrics</h3>
- * All the metrics are exposed under `log_writer`.
- * <ul>
- * <li> `log_writer/write`: opstats. latency characteristics about the time 
that write operations spent.
- * <li> `log_writer/bulk_write`: opstats. latency characteristics about the 
time that bulk_write
- * operations spent.
- * are pending in the queue for long time due to log segment rolling.
- * <li> `log_writer/get_writer`: opstats. the time spent on getting the 
writer. it could spike when there
- * is log segment rolling happened during getting the writer. it is a good 
stat to look into when the latency
- * is caused by queuing time.
- * <li> `log_writer/pending_request_dispatch`: counter. the number of queued 
operations that are dispatched
- * after log segment is rolled. it is an metric on measuring how many 
operations has been queued because of
- * log segment rolling.
- * </ul>
- * See {@link BKLogSegmentWriter} for segment writer stats.
- */
-public class BKAsyncLogWriter extends BKAbstractLogWriter implements 
AsyncLogWriter {
-
-    static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class);
-
-    static Function1<List<LogSegmentMetadata>, Boolean> 
TruncationResultConverter =
-            new AbstractFunction1<List<LogSegmentMetadata>, Boolean>() {
-                @Override
-                public Boolean apply(List<LogSegmentMetadata> segments) {
-                    return true;
-                }
-            };
-
-    // Records pending for roll log segment.
-    class PendingLogRecord implements FutureEventListener<DLSN> {
-
-        final LogRecord record;
-        final Promise<DLSN> promise;
-        final boolean flush;
-
-        PendingLogRecord(LogRecord record, boolean flush) {
-            this.record = record;
-            this.promise = new Promise<DLSN>();
-            this.flush = flush;
-        }
-
-        @Override
-        public void onSuccess(DLSN value) {
-            promise.setValue(value);
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            promise.setException(cause);
-            encounteredError = true;
-        }
-    }
-
-    /**
-     * Last pending record in current log segment. After it is satisified, it 
would
-     * roll log segment.
-     *
-     * This implementation is based on the assumption that all future 
satisified in same
-     * order future pool.
-     */
-    class LastPendingLogRecord extends PendingLogRecord {
-
-        LastPendingLogRecord(LogRecord record, boolean flush) {
-            super(record, flush);
-        }
-
-        @Override
-        public void onSuccess(DLSN value) {
-            super.onSuccess(value);
-            // roll log segment and issue all pending requests.
-            rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            super.onFailure(cause);
-            // error out pending requests.
-            errorOutPendingRequestsAndWriter(cause);
-        }
-    }
-
-    private final boolean streamFailFast;
-    private final boolean disableRollOnSegmentError;
-    private LinkedList<PendingLogRecord> pendingRequests = null;
-    private volatile boolean encounteredError = false;
-    private Promise<BKLogSegmentWriter> rollingFuture = null;
-    private long lastTxId = DistributedLogConstants.INVALID_TXID;
-
-    private final StatsLogger statsLogger;
-    private final OpStatsLogger writeOpStatsLogger;
-    private final OpStatsLogger markEndOfStreamOpStatsLogger;
-    private final OpStatsLogger bulkWriteOpStatsLogger;
-    private final OpStatsLogger getWriterOpStatsLogger;
-    private final Counter pendingRequestDispatch;
-
-    private final Feature disableLogSegmentRollingFeature;
-
-    BKAsyncLogWriter(DistributedLogConfiguration conf,
-                     DynamicDistributedLogConfiguration dynConf,
-                     BKDistributedLogManager bkdlm,
-                     BKLogWriteHandler writeHandler, /** log writer owns the 
handler **/
-                     FeatureProvider featureProvider,
-                     StatsLogger dlmStatsLogger) {
-        super(conf, dynConf, bkdlm);
-        this.writeHandler = writeHandler;
-        this.streamFailFast = conf.getFailFastOnStreamNotReady();
-        this.disableRollOnSegmentError = 
conf.getDisableRollingOnLogSegmentError();
-
-        // features
-        disableLogSegmentRollingFeature = 
featureProvider.getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase());
-        // stats
-        this.statsLogger = dlmStatsLogger.scope("log_writer");
-        this.writeOpStatsLogger = statsLogger.getOpStatsLogger("write");
-        this.markEndOfStreamOpStatsLogger = 
statsLogger.getOpStatsLogger("mark_end_of_stream");
-        this.bulkWriteOpStatsLogger = 
statsLogger.getOpStatsLogger("bulk_write");
-        this.getWriterOpStatsLogger = 
statsLogger.getOpStatsLogger("get_writer");
-        this.pendingRequestDispatch = 
statsLogger.getCounter("pending_request_dispatch");
-    }
-
-    @VisibleForTesting
-    synchronized void setLastTxId(long txId) {
-        lastTxId = Math.max(lastTxId, txId);
-    }
-
-    @Override
-    public synchronized long getLastTxId() {
-        return lastTxId;
-    }
-
-    /**
-     * Write a log record as control record. The method will be used by 
Monitor Service to enforce a new inprogress segment.
-     *
-     * @param record
-     *          log record
-     * @return future of the write
-     */
-    public Future<DLSN> writeControlRecord(final LogRecord record) {
-        record.setControl();
-        return write(record);
-    }
-
-    private BKLogSegmentWriter getCachedLogSegmentWriter() throws 
WriteException {
-        if (encounteredError) {
-            throw new WriteException(bkDistributedLogManager.getStreamName(),
-                    "writer has been closed due to error.");
-        }
-        BKLogSegmentWriter segmentWriter = getCachedLogWriter();
-        if (null != segmentWriter
-                && segmentWriter.isLogSegmentInError()
-                && !disableRollOnSegmentError) {
-            return null;
-        } else {
-            return segmentWriter;
-        }
-    }
-
-    private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
-                                                           boolean bestEffort,
-                                                           boolean rollLog,
-                                                           boolean 
allowMaxTxID) {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        return FutureUtils.stats(
-                doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, 
allowMaxTxID),
-                getWriterOpStatsLogger,
-                stopwatch);
-    }
-
-    private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long 
firstTxid,
-                                                             final boolean 
bestEffort,
-                                                             final boolean 
rollLog,
-                                                             final boolean 
allowMaxTxID) {
-        if (encounteredError) {
-            return Future.exception(new 
WriteException(bkDistributedLogManager.getStreamName(),
-                    "writer has been closed due to error."));
-        }
-        Future<BKLogSegmentWriter> writerFuture = 
asyncGetLedgerWriter(!disableRollOnSegmentError);
-        if (null == writerFuture) {
-            return rollLogSegmentIfNecessary(null, firstTxid, bestEffort, 
allowMaxTxID);
-        } else if (rollLog) {
-            return writerFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
-                @Override
-                public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter 
writer) {
-                    return rollLogSegmentIfNecessary(writer, firstTxid, 
bestEffort, allowMaxTxID);
-                }
-            });
-        } else {
-            return writerFuture;
-        }
-    }
-
-    /**
-     * We write end of stream marker by writing a record with MAX_TXID, so we 
need to allow using
-     * max txid when rolling for this case only.
-     */
-    private Future<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() {
-        return getLogSegmentWriter(DistributedLogConstants.MAX_TXID,
-                                     false /* bestEffort */,
-                                     false /* roll log */,
-                                     true /* allow max txid */);
-    }
-
-    private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
-                                                           boolean bestEffort,
-                                                           boolean rollLog) {
-        return getLogSegmentWriter(firstTxid, bestEffort, rollLog, false /* 
allow max txid */);
-    }
-
-    Future<DLSN> queueRequest(LogRecord record, boolean flush) {
-        PendingLogRecord pendingLogRecord = new PendingLogRecord(record, 
flush);
-        pendingRequests.add(pendingLogRecord);
-        return pendingLogRecord.promise;
-    }
-
-    boolean shouldRollLog(BKLogSegmentWriter w) {
-        try {
-            return null == w ||
-                    (!disableLogSegmentRollingFeature.isAvailable() &&
-                    shouldStartNewSegment(w));
-        } catch (IOException ioe) {
-            return false;
-        }
-    }
-
-    void startQueueingRequests() {
-        assert(null == pendingRequests && null == rollingFuture);
-        pendingRequests = new LinkedList<PendingLogRecord>();
-        rollingFuture = new Promise<BKLogSegmentWriter>();
-    }
-
-    // for ordering guarantee, we shouldn't send requests to next log segments 
until
-    // previous log segment is done.
-    private synchronized Future<DLSN> asyncWrite(final LogRecord record,
-                                                 boolean flush) {
-        // The passed in writer may be stale since we acquire the writer 
outside of sync
-        // lock. If we recently rolled and the new writer is cached, use that 
instead.
-        Future<DLSN> result = null;
-        BKLogSegmentWriter w;
-        try {
-            w = getCachedLogSegmentWriter();
-        } catch (WriteException we) {
-            return Future.exception(we);
-        }
-        if (null != rollingFuture) {
-            if (streamFailFast) {
-                result = Future.exception(new StreamNotReadyException("Rolling 
log segment"));
-            } else {
-                result = queueRequest(record, flush);
-            }
-        } else if (shouldRollLog(w)) {
-            // insert a last record, so when it called back, we will trigger a 
log segment rolling
-            startQueueingRequests();
-            if (null != w) {
-                LastPendingLogRecord lastLogRecordInCurrentSegment = new 
LastPendingLogRecord(record, flush);
-                w.asyncWrite(record, 
true).addEventListener(lastLogRecordInCurrentSegment);
-                result = lastLogRecordInCurrentSegment.promise;
-            } else { // no log segment yet. roll the log segment and issue 
pending requests.
-                result = queueRequest(record, flush);
-                
rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
-            }
-        } else {
-            result = w.asyncWrite(record, flush);
-        }
-        // use map here rather than onSuccess because we want lastTxId to be 
updated before
-        // satisfying the future
-        return result.map(new AbstractFunction1<DLSN, DLSN>() {
-            @Override
-            public DLSN apply(DLSN dlsn) {
-                setLastTxId(record.getTransactionId());
-                return dlsn;
-            }
-        });
-    }
-
-    private List<Future<DLSN>> asyncWriteBulk(List<LogRecord> records) {
-        final ArrayList<Future<DLSN>> results = new 
ArrayList<Future<DLSN>>(records.size());
-        Iterator<LogRecord> iterator = records.iterator();
-        while (iterator.hasNext()) {
-            LogRecord record = iterator.next();
-            Future<DLSN> future = asyncWrite(record, !iterator.hasNext());
-            results.add(future);
-
-            // Abort early if an individual write has already failed.
-            Option<Try<DLSN>> result = future.poll();
-            if (result.isDefined() && result.get().isThrow()) {
-                break;
-            }
-        }
-        if (records.size() > results.size()) {
-            appendCancelledFutures(results, records.size() - results.size());
-        }
-        return results;
-    }
-
-    private void appendCancelledFutures(List<Future<DLSN>> futures, int 
numToAdd) {
-        final WriteCancelledException cre =
-            new WriteCancelledException(getStreamName());
-        for (int i = 0; i < numToAdd; i++) {
-            Future<DLSN> cancelledFuture = Future.exception(cre);
-            futures.add(cancelledFuture);
-        }
-    }
-
-    private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) {
-        getLogSegmentWriter(firstTxId, true, true)
-                .addEventListener(new 
FutureEventListener<BKLogSegmentWriter>() {
-            @Override
-            public void onSuccess(BKLogSegmentWriter writer) {
-                try {
-                    synchronized (BKAsyncLogWriter.this) {
-                        for (PendingLogRecord pendingLogRecord : 
pendingRequests) {
-                            
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending);
-                            writer.asyncWrite(pendingLogRecord.record, 
pendingLogRecord.flush)
-                                    .addEventListener(pendingLogRecord);
-                        }
-                        // if there are no records in the pending queue, let's 
write a control record
-                        // so that when a new log segment is rolled, a control 
record will be added and
-                        // the corresponding bookies would be able to create 
its ledger.
-                        if (pendingRequests.isEmpty()) {
-                            LogRecord controlRecord = new LogRecord(firstTxId,
-                                    
DistributedLogConstants.CONTROL_RECORD_CONTENT);
-                            controlRecord.setControl();
-                            PendingLogRecord controlReq = new 
PendingLogRecord(controlRecord, false);
-                            writer.asyncWrite(controlReq.record, 
controlReq.flush)
-                                    .addEventListener(controlReq);
-                        }
-                        if (null != rollingFuture) {
-                            FutureUtils.setValue(rollingFuture, writer);
-                        }
-                        rollingFuture = null;
-                        pendingRequestDispatch.add(pendingRequests.size());
-                        pendingRequests = null;
-                    }
-                } catch (IOException ioe) {
-                    errorOutPendingRequestsAndWriter(ioe);
-                }
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                errorOutPendingRequestsAndWriter(cause);
-            }
-        });
-    }
-
-    @VisibleForTesting
-    void errorOutPendingRequests(Throwable cause, boolean errorOutWriter) {
-        final List<PendingLogRecord> pendingRequestsSnapshot;
-        synchronized (this) {
-            pendingRequestsSnapshot = pendingRequests;
-            encounteredError = errorOutWriter;
-            pendingRequests = null;
-            if (null != rollingFuture) {
-                FutureUtils.setException(rollingFuture, cause);
-            }
-            rollingFuture = null;
-        }
-
-        pendingRequestDispatch.add(pendingRequestsSnapshot.size());
-
-        // After erroring out the writer above, no more requests
-        // will be enqueued to pendingRequests
-        for (PendingLogRecord pendingLogRecord : pendingRequestsSnapshot) {
-            pendingLogRecord.promise.setException(cause);
-        }
-    }
-
-    void errorOutPendingRequestsAndWriter(Throwable cause) {
-        errorOutPendingRequests(cause, true /* error out writer */);
-    }
-
-    /**
-     * Write a log record to the stream.
-     *
-     * @param record single log record
-     */
-    @Override
-    public Future<DLSN> write(final LogRecord record) {
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        return FutureUtils.stats(
-                asyncWrite(record, true),
-                writeOpStatsLogger,
-                stopwatch);
-    }
-
-    /**
-     * Write many log records to the stream. The return type here is 
unfortunate but its a direct result
-     * of having to combine FuturePool and the asyncWriteBulk method which 
returns a future as well. The
-     * problem is the List that asyncWriteBulk returns can't be materialized 
until getLogSegmentWriter
-     * completes, so it has to be wrapped in a future itself.
-     *
-     * @param records list of records
-     */
-    @Override
-    public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) 
{
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        return FutureUtils.stats(
-                Future.value(asyncWriteBulk(records)),
-                bulkWriteOpStatsLogger,
-                stopwatch);
-    }
-
-    @Override
-    public Future<Boolean> truncate(final DLSN dlsn) {
-        if (DLSN.InvalidDLSN == dlsn) {
-            return Future.value(false);
-        }
-        BKLogWriteHandler writeHandler;
-        try {
-            writeHandler = getWriteHandler();
-        } catch (IOException e) {
-            return Future.exception(e);
-        }
-        return 
writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).map(TruncationResultConverter);
-    }
-
-    Future<Long> flushAndCommit() {
-        Future<BKLogSegmentWriter> writerFuture;
-        synchronized (this) {
-            if (null != this.rollingFuture) {
-                writerFuture = this.rollingFuture;
-            } else {
-                writerFuture = getCachedLogWriterFuture();
-            }
-        }
-        if (null == writerFuture) {
-            return Future.value(getLastTxId());
-        }
-        return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, 
Future<Long>>() {
-            @Override
-            public Future<Long> apply(BKLogSegmentWriter writer) {
-                return writer.flushAndCommit();
-            }
-        });
-    }
-
-    Future<Long> markEndOfStream() {
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        Future<BKLogSegmentWriter> logSegmentWriterFuture;
-        synchronized (this) {
-            logSegmentWriterFuture = this.rollingFuture;
-        }
-        if (null == logSegmentWriterFuture) {
-            logSegmentWriterFuture = getLogSegmentWriterForEndOfStream();
-        }
-
-        return FutureUtils.stats(
-                logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
-                    @Override
-                    public Future<Long> apply(BKLogSegmentWriter w) {
-                        return w.markEndOfStream();
-                    }
-                }),
-                markEndOfStreamOpStatsLogger,
-                stopwatch);
-    }
-
-    @Override
-    protected Future<Void> asyncCloseAndComplete() {
-        Future<BKLogSegmentWriter> logSegmentWriterFuture;
-        synchronized (this) {
-            logSegmentWriterFuture = this.rollingFuture;
-        }
-
-        if (null == logSegmentWriterFuture) {
-            return super.asyncCloseAndComplete();
-        } else {
-            return logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Void>>() {
-                @Override
-                public Future<Void> apply(BKLogSegmentWriter segmentWriter) {
-                    return BKAsyncLogWriter.super.asyncCloseAndComplete();
-                }
-            });
-        }
-    }
-
-    @Override
-    void closeAndComplete() throws IOException {
-        FutureUtils.result(asyncCloseAndComplete());
-    }
-
-    /**
-     * *TEMP HACK*
-     * Get the name of the stream this writer writes data to
-     */
-    @Override
-    public String getStreamName() {
-        return bkDistributedLogManager.getStreamName();
-    }
-
-    @Override
-    public Future<Void> asyncAbort() {
-        Future<Void> result = super.asyncAbort();
-        synchronized (this) {
-            if (pendingRequests != null) {
-                for (PendingLogRecord pendingLogRecord : pendingRequests) {
-                    pendingLogRecord.promise.setException(new 
WriteException(bkDistributedLogManager.getStreamName(),
-                            "abort wring: writer has been closed due to 
error."));
-                }
-            }
-        }
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("AsyncLogWriter:%s", getStreamName());
-    }
-}

Reply via email to