http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index eedfbd6..26a4a76 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -21,39 +21,33 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Ticker;
-import org.apache.distributedlog.exceptions.DLIllegalStateException;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.exceptions.EndOfStreamException;
-import org.apache.distributedlog.exceptions.IdleReaderException;
-import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.exceptions.ReadCancelledException;
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Throw;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-
+import java.util.function.Function;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.apache.distributedlog.exceptions.IdleReaderException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.ReadCancelledException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 /**
  * BookKeeper based {@link AsyncLogReader} implementation.
@@ -76,13 +70,8 @@ import scala.runtime.BoxedUnit;
 class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification {
     static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class);
 
-    private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> 
READ_NEXT_MAP_FUNCTION =
-            new AbstractFunction1<List<LogRecordWithDLSN>, 
LogRecordWithDLSN>() {
-                @Override
-                public LogRecordWithDLSN apply(List<LogRecordWithDLSN> 
records) {
-                    return records.get(0);
-                }
-            };
+    private static final Function<List<LogRecordWithDLSN>, LogRecordWithDLSN> 
READ_NEXT_MAP_FUNCTION =
+        records -> records.get(0);
 
     private final String streamName;
     protected final BKDistributedLogManager bkDistributedLogManager;
@@ -104,7 +93,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
     // last process time
     private final Stopwatch lastProcessTime;
 
-    protected Promise<Void> closeFuture = null;
+    protected CompletableFuture<Void> closeFuture = null;
 
     private boolean lockStream = false;
 
@@ -143,7 +132,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
         private final Stopwatch enqueueTime;
         private final int numEntries;
         private final List<LogRecordWithDLSN> records;
-        private final Promise<List<LogRecordWithDLSN>> promise;
+        private final CompletableFuture<List<LogRecordWithDLSN>> promise;
         private final long deadlineTime;
         private final TimeUnit deadlineTimeUnit;
 
@@ -158,12 +147,12 @@ class BKAsyncLogReader implements AsyncLogReader, 
Runnable, AsyncNotification {
             } else {
                 this.records = new ArrayList<LogRecordWithDLSN>();
             }
-            this.promise = new Promise<List<LogRecordWithDLSN>>();
+            this.promise = new CompletableFuture<List<LogRecordWithDLSN>>();
             this.deadlineTime = deadlineTime;
             this.deadlineTimeUnit = deadlineTimeUnit;
         }
 
-        Promise<List<LogRecordWithDLSN>> getPromise() {
+        CompletableFuture<List<LogRecordWithDLSN>> getPromise() {
             return promise;
         }
 
@@ -171,9 +160,9 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
             return enqueueTime.elapsed(timeUnit);
         }
 
-        void setException(Throwable throwable) {
+        void completeExceptionally(Throwable throwable) {
             Stopwatch stopwatch = Stopwatch.createStarted();
-            if (promise.updateIfEmpty(new 
Throw<List<LogRecordWithDLSN>>(throwable))) {
+            if (promise.completeExceptionally(throwable)) {
                 
futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
                 
delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
             }
@@ -204,7 +193,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
             }
             
delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
             Stopwatch stopwatch = Stopwatch.createStarted();
-            promise.setValue(records);
+            promise.complete(records);
             
futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
         }
     }
@@ -333,7 +322,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
         return startDLSN;
     }
 
-    public Future<Void> lockStream() {
+    public CompletableFuture<Void> lockStream() {
         this.lockStream = true;
         return readHandler.lockStream();
     }
@@ -381,16 +370,16 @@ class BKAsyncLogReader implements AsyncLogReader, 
Runnable, AsyncNotification {
      * @return A promise that when satisfied will contain the Log Record with 
its DLSN.
      */
     @Override
-    public synchronized Future<LogRecordWithDLSN> readNext() {
-        return readInternal(1, 0, 
TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION);
+    public synchronized CompletableFuture<LogRecordWithDLSN> readNext() {
+        return readInternal(1, 0, 
TimeUnit.MILLISECONDS).thenApply(READ_NEXT_MAP_FUNCTION);
     }
 
-    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int 
numEntries) {
+    public synchronized CompletableFuture<List<LogRecordWithDLSN>> 
readBulk(int numEntries) {
         return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
     }
 
     @Override
-    public synchronized Future<List<LogRecordWithDLSN>> readBulk(int 
numEntries,
+    public synchronized CompletableFuture<List<LogRecordWithDLSN>> 
readBulk(int numEntries,
                                                                  long waitTime,
                                                                  TimeUnit 
timeUnit) {
         return readInternal(numEntries, waitTime, timeUnit);
@@ -404,7 +393,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
      *          num entries to read
      * @return A promise that satisfied with a non-empty list of log records 
with their DLSN.
      */
-    private synchronized Future<List<LogRecordWithDLSN>> readInternal(int 
numEntries,
+    private synchronized CompletableFuture<List<LogRecordWithDLSN>> 
readInternal(int numEntries,
                                                                       long 
deadlineTime,
                                                                       TimeUnit 
deadlineTimeUnit) {
         
timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
@@ -421,19 +410,15 @@ class BKAsyncLogReader implements AsyncLogReader, 
Runnable, AsyncNotification {
                     bkDistributedLogManager.getScheduler(),
                     Ticker.systemTicker(),
                     bkDistributedLogManager.alertStatsLogger);
-            readHandler.checkLogStreamExists().addEventListener(new 
FutureEventListener<Void>() {
+            readHandler.checkLogStreamExists().whenComplete(new 
FutureEventListener<Void>() {
                 @Override
                 public void onSuccess(Void value) {
                     try {
                         readHandler.registerListener(readAheadEntryReader);
                         readHandler.asyncStartFetchLogSegments()
-                                .map(new 
AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                                    @Override
-                                    public BoxedUnit 
apply(Versioned<List<LogSegmentMetadata>> logSegments) {
-                                        
readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
-                                        
readAheadEntryReader.start(logSegments.getValue());
-                                        return BoxedUnit.UNIT;
-                                    }
+                                .thenAccept(logSegments -> {
+                                    
readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this);
+                                    
readAheadEntryReader.start(logSegments.getValue());
                                 });
                     } catch (Exception exc) {
                         notifyOnError(exc);
@@ -448,7 +433,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
         }
 
         if (checkClosedOrInError("readNext")) {
-            readRequest.setException(lastException.get());
+            readRequest.completeExceptionally(lastException.get());
         } else {
             boolean queueEmpty = pendingRequests.isEmpty();
             pendingRequests.add(readRequest);
@@ -478,15 +463,15 @@ class BKAsyncLogReader implements AsyncLogReader, 
Runnable, AsyncNotification {
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         // Cancel the idle reader timeout task, interrupting if necessary
         ReadCancelledException exception;
-        Promise<Void> closePromise;
+        CompletableFuture<Void> closePromise;
         synchronized (this) {
             if (null != closeFuture) {
                 return closeFuture;
             }
-            closePromise = closeFuture = new Promise<Void>();
+            closePromise = closeFuture = new CompletableFuture<Void>();
             exception = new 
ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was 
closed");
             setLastException(exception);
         }
@@ -507,16 +492,18 @@ class BKAsyncLogReader implements AsyncLogReader, 
Runnable, AsyncNotification {
             readHandler.unregisterListener(readAheadReader);
             readAheadReader.removeStateChangeNotification(this);
         }
-        Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
-                readAheadReader,
-                readHandler
-        ).proxyTo(closePromise);
+        FutureUtils.proxyTo(
+            Utils.closeSequence(bkDistributedLogManager.getScheduler(), true,
+                    readAheadReader,
+                    readHandler
+            ),
+            closePromise);
         return closePromise;
     }
 
     private void cancelAllPendingReads(Throwable throwExc) {
         for (PendingReadRequest promise : pendingRequests) {
-            promise.setException(throwExc);
+            promise.completeExceptionally(throwExc);
         }
         pendingRequests.clear();
     }
@@ -591,7 +578,8 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
                     }
 
                     if (disableProcessingReadRequests) {
-                        LOG.info("Reader of {} is forced to stop processing 
read requests", readHandler.getFullyQualifiedName());
+                        LOG.info("Reader of {} is forced to stop processing 
read requests",
+                            readHandler.getFullyQualifiedName());
                         return;
                     }
                 }
@@ -601,9 +589,9 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
                 // the reader in error and abort all pending reads since we 
dont
                 // know the last consumed read
                 if (null == lastException.get()) {
-                    if (nextRequest.getPromise().isInterrupted().isDefined()) {
-                        setLastException(new 
DLInterruptedException("Interrupted on reading " + 
readHandler.getFullyQualifiedName() + " : ",
-                                
nextRequest.getPromise().isInterrupted().get()));
+                    if (nextRequest.getPromise().isCancelled()) {
+                        setLastException(new 
DLInterruptedException("Interrupted on reading "
+                            + readHandler.getFullyQualifiedName()));
                     }
                 }
 
@@ -680,9 +668,9 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
                     } else {
                         DLIllegalStateException ise = new 
DLIllegalStateException("Unexpected condition at dlsn = "
                                 + nextRequest.records.get(0).getDlsn());
-                        nextRequest.setException(ise);
+                        nextRequest.completeExceptionally(ise);
                         if (null != request) {
-                            request.setException(ise);
+                            request.completeExceptionally(ise);
                         }
                         // We should never get here as we should have exited 
the loop if
                         // pendingRequests were empty

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
index a1b1d5c..62b32f2 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
@@ -19,33 +19,30 @@ package org.apache.distributedlog;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.StreamNotReadyException;
 import org.apache.distributedlog.exceptions.WriteCancelledException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.feature.CoreFeatureKeys;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.Option;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 
 /**
  * BookKeeper based {@link AsyncLogWriter} implementation.
@@ -70,35 +67,30 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
 
     static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class);
 
-    static Function1<List<LogSegmentMetadata>, Boolean> 
TruncationResultConverter =
-            new AbstractFunction1<List<LogSegmentMetadata>, Boolean>() {
-                @Override
-                public Boolean apply(List<LogSegmentMetadata> segments) {
-                    return true;
-                }
-            };
+    static Function<List<LogSegmentMetadata>, Boolean> 
TruncationResultConverter =
+        segments -> true;
 
     // Records pending for roll log segment.
     class PendingLogRecord implements FutureEventListener<DLSN> {
 
         final LogRecord record;
-        final Promise<DLSN> promise;
+        final CompletableFuture<DLSN> promise;
         final boolean flush;
 
         PendingLogRecord(LogRecord record, boolean flush) {
             this.record = record;
-            this.promise = new Promise<DLSN>();
+            this.promise = new CompletableFuture<DLSN>();
             this.flush = flush;
         }
 
         @Override
         public void onSuccess(DLSN value) {
-            promise.setValue(value);
+            promise.complete(value);
         }
 
         @Override
         public void onFailure(Throwable cause) {
-            promise.setException(cause);
+            promise.completeExceptionally(cause);
             encounteredError = true;
         }
     }
@@ -135,7 +127,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
     private final boolean disableRollOnSegmentError;
     private LinkedList<PendingLogRecord> pendingRequests = null;
     private volatile boolean encounteredError = false;
-    private Promise<BKLogSegmentWriter> rollingFuture = null;
+    private CompletableFuture<BKLogSegmentWriter> rollingFuture = null;
     private long lastTxId = DistributedLogConstants.INVALID_TXID;
 
     private final StatsLogger statsLogger;
@@ -186,7 +178,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
      *          log record
      * @return future of the write
      */
-    public Future<DLSN> writeControlRecord(final LogRecord record) {
+    public CompletableFuture<DLSN> writeControlRecord(final LogRecord record) {
         record.setControl();
         return write(record);
     }
@@ -206,7 +198,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
         }
     }
 
-    private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
+    private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriter(long 
firstTxid,
                                                            boolean bestEffort,
                                                            boolean rollLog,
                                                            boolean 
allowMaxTxID) {
@@ -217,24 +209,20 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
                 stopwatch);
     }
 
-    private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long 
firstTxid,
+    private CompletableFuture<BKLogSegmentWriter> doGetLogSegmentWriter(final 
long firstTxid,
                                                              final boolean 
bestEffort,
                                                              final boolean 
rollLog,
                                                              final boolean 
allowMaxTxID) {
         if (encounteredError) {
-            return Future.exception(new 
WriteException(bkDistributedLogManager.getStreamName(),
+            return FutureUtils.exception(new 
WriteException(bkDistributedLogManager.getStreamName(),
                     "writer has been closed due to error."));
         }
-        Future<BKLogSegmentWriter> writerFuture = 
asyncGetLedgerWriter(!disableRollOnSegmentError);
+        CompletableFuture<BKLogSegmentWriter> writerFuture = 
asyncGetLedgerWriter(!disableRollOnSegmentError);
         if (null == writerFuture) {
             return rollLogSegmentIfNecessary(null, firstTxid, bestEffort, 
allowMaxTxID);
         } else if (rollLog) {
-            return writerFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
-                @Override
-                public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter 
writer) {
-                    return rollLogSegmentIfNecessary(writer, firstTxid, 
bestEffort, allowMaxTxID);
-                }
-            });
+            return writerFuture.thenCompose(
+                writer -> rollLogSegmentIfNecessary(writer, firstTxid, 
bestEffort, allowMaxTxID));
         } else {
             return writerFuture;
         }
@@ -244,20 +232,20 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
      * We write end of stream marker by writing a record with MAX_TXID, so we 
need to allow using
      * max txid when rolling for this case only.
      */
-    private Future<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() {
+    private CompletableFuture<BKLogSegmentWriter> 
getLogSegmentWriterForEndOfStream() {
         return getLogSegmentWriter(DistributedLogConstants.MAX_TXID,
                                      false /* bestEffort */,
                                      false /* roll log */,
                                      true /* allow max txid */);
     }
 
-    private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
+    private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriter(long 
firstTxid,
                                                            boolean bestEffort,
                                                            boolean rollLog) {
         return getLogSegmentWriter(firstTxid, bestEffort, rollLog, false /* 
allow max txid */);
     }
 
-    Future<DLSN> queueRequest(LogRecord record, boolean flush) {
+    CompletableFuture<DLSN> queueRequest(LogRecord record, boolean flush) {
         PendingLogRecord pendingLogRecord = new PendingLogRecord(record, 
flush);
         pendingRequests.add(pendingLogRecord);
         return pendingLogRecord.promise;
@@ -276,25 +264,25 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
     void startQueueingRequests() {
         assert(null == pendingRequests && null == rollingFuture);
         pendingRequests = new LinkedList<PendingLogRecord>();
-        rollingFuture = new Promise<BKLogSegmentWriter>();
+        rollingFuture = new CompletableFuture<BKLogSegmentWriter>();
     }
 
     // for ordering guarantee, we shouldn't send requests to next log segments 
until
     // previous log segment is done.
-    private synchronized Future<DLSN> asyncWrite(final LogRecord record,
+    private synchronized CompletableFuture<DLSN> asyncWrite(final LogRecord 
record,
                                                  boolean flush) {
         // The passed in writer may be stale since we acquire the writer 
outside of sync
         // lock. If we recently rolled and the new writer is cached, use that 
instead.
-        Future<DLSN> result = null;
+        CompletableFuture<DLSN> result = null;
         BKLogSegmentWriter w;
         try {
             w = getCachedLogSegmentWriter();
         } catch (WriteException we) {
-            return Future.exception(we);
+            return FutureUtils.exception(we);
         }
         if (null != rollingFuture) {
             if (streamFailFast) {
-                result = Future.exception(new StreamNotReadyException("Rolling 
log segment"));
+                result = FutureUtils.exception(new 
StreamNotReadyException("Rolling log segment"));
             } else {
                 result = queueRequest(record, flush);
             }
@@ -303,7 +291,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
             startQueueingRequests();
             if (null != w) {
                 LastPendingLogRecord lastLogRecordInCurrentSegment = new 
LastPendingLogRecord(record, flush);
-                w.asyncWrite(record, 
true).addEventListener(lastLogRecordInCurrentSegment);
+                w.asyncWrite(record, 
true).whenComplete(lastLogRecordInCurrentSegment);
                 result = lastLogRecordInCurrentSegment.promise;
             } else { // no log segment yet. roll the log segment and issue 
pending requests.
                 result = queueRequest(record, flush);
@@ -314,26 +302,22 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
         }
         // use map here rather than onSuccess because we want lastTxId to be 
updated before
         // satisfying the future
-        return result.map(new AbstractFunction1<DLSN, DLSN>() {
-            @Override
-            public DLSN apply(DLSN dlsn) {
-                setLastTxId(record.getTransactionId());
-                return dlsn;
-            }
+        return result.thenApply(dlsn -> {
+            setLastTxId(record.getTransactionId());
+            return dlsn;
         });
     }
 
-    private List<Future<DLSN>> asyncWriteBulk(List<LogRecord> records) {
-        final ArrayList<Future<DLSN>> results = new 
ArrayList<Future<DLSN>>(records.size());
+    private List<CompletableFuture<DLSN>> asyncWriteBulk(List<LogRecord> 
records) {
+        final ArrayList<CompletableFuture<DLSN>> results = new 
ArrayList<CompletableFuture<DLSN>>(records.size());
         Iterator<LogRecord> iterator = records.iterator();
         while (iterator.hasNext()) {
             LogRecord record = iterator.next();
-            Future<DLSN> future = asyncWrite(record, !iterator.hasNext());
+            CompletableFuture<DLSN> future = asyncWrite(record, 
!iterator.hasNext());
             results.add(future);
 
             // Abort early if an individual write has already failed.
-            Option<Try<DLSN>> result = future.poll();
-            if (result.isDefined() && result.get().isThrow()) {
+            if (future.isDone() && future.isCompletedExceptionally()) {
                 break;
             }
         }
@@ -343,18 +327,18 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
         return results;
     }
 
-    private void appendCancelledFutures(List<Future<DLSN>> futures, int 
numToAdd) {
+    private void appendCancelledFutures(List<CompletableFuture<DLSN>> futures, 
int numToAdd) {
         final WriteCancelledException cre =
             new WriteCancelledException(getStreamName());
         for (int i = 0; i < numToAdd; i++) {
-            Future<DLSN> cancelledFuture = Future.exception(cre);
+            CompletableFuture<DLSN> cancelledFuture = 
FutureUtils.exception(cre);
             futures.add(cancelledFuture);
         }
     }
 
     private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) {
         getLogSegmentWriter(firstTxId, true, true)
-                .addEventListener(new 
FutureEventListener<BKLogSegmentWriter>() {
+                .whenComplete(new FutureEventListener<BKLogSegmentWriter>() {
             @Override
             public void onSuccess(BKLogSegmentWriter writer) {
                 try {
@@ -362,7 +346,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
                         for (PendingLogRecord pendingLogRecord : 
pendingRequests) {
                             
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending);
                             writer.asyncWrite(pendingLogRecord.record, 
pendingLogRecord.flush)
-                                    .addEventListener(pendingLogRecord);
+                                    .whenComplete(pendingLogRecord);
                         }
                         // if there are no records in the pending queue, let's 
write a control record
                         // so that when a new log segment is rolled, a control 
record will be added and
@@ -373,10 +357,10 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
                             controlRecord.setControl();
                             PendingLogRecord controlReq = new 
PendingLogRecord(controlRecord, false);
                             writer.asyncWrite(controlReq.record, 
controlReq.flush)
-                                    .addEventListener(controlReq);
+                                    .whenComplete(controlReq);
                         }
                         if (null != rollingFuture) {
-                            FutureUtils.setValue(rollingFuture, writer);
+                            FutureUtils.complete(rollingFuture, writer);
                         }
                         rollingFuture = null;
                         pendingRequestDispatch.add(pendingRequests.size());
@@ -401,7 +385,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
             encounteredError = errorOutWriter;
             pendingRequests = null;
             if (null != rollingFuture) {
-                FutureUtils.setException(rollingFuture, cause);
+                FutureUtils.completeExceptionally(rollingFuture, cause);
             }
             rollingFuture = null;
         }
@@ -411,7 +395,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
         // After erroring out the writer above, no more requests
         // will be enqueued to pendingRequests
         for (PendingLogRecord pendingLogRecord : pendingRequestsSnapshot) {
-            pendingLogRecord.promise.setException(cause);
+            pendingLogRecord.promise.completeExceptionally(cause);
         }
     }
 
@@ -425,7 +409,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
      * @param record single log record
      */
     @Override
-    public Future<DLSN> write(final LogRecord record) {
+    public CompletableFuture<DLSN> write(final LogRecord record) {
         final Stopwatch stopwatch = Stopwatch.createStarted();
         return FutureUtils.stats(
                 asyncWrite(record, true),
@@ -442,30 +426,30 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
      * @param records list of records
      */
     @Override
-    public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) 
{
+    public CompletableFuture<List<CompletableFuture<DLSN>>> writeBulk(final 
List<LogRecord> records) {
         final Stopwatch stopwatch = Stopwatch.createStarted();
         return FutureUtils.stats(
-                Future.value(asyncWriteBulk(records)),
+                FutureUtils.value(asyncWriteBulk(records)),
                 bulkWriteOpStatsLogger,
                 stopwatch);
     }
 
     @Override
-    public Future<Boolean> truncate(final DLSN dlsn) {
+    public CompletableFuture<Boolean> truncate(final DLSN dlsn) {
         if (DLSN.InvalidDLSN == dlsn) {
-            return Future.value(false);
+            return FutureUtils.value(false);
         }
         BKLogWriteHandler writeHandler;
         try {
             writeHandler = getWriteHandler();
         } catch (IOException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
-        return 
writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).map(TruncationResultConverter);
+        return 
writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).thenApply(TruncationResultConverter);
     }
 
-    Future<Long> flushAndCommit() {
-        Future<BKLogSegmentWriter> writerFuture;
+    CompletableFuture<Long> flushAndCommit() {
+        CompletableFuture<BKLogSegmentWriter> writerFuture;
         synchronized (this) {
             if (null != this.rollingFuture) {
                 writerFuture = this.rollingFuture;
@@ -474,19 +458,14 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
             }
         }
         if (null == writerFuture) {
-            return Future.value(getLastTxId());
+            return FutureUtils.value(getLastTxId());
         }
-        return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, 
Future<Long>>() {
-            @Override
-            public Future<Long> apply(BKLogSegmentWriter writer) {
-                return writer.flushAndCommit();
-            }
-        });
+        return writerFuture.thenCompose(writer -> writer.flushAndCommit());
     }
 
-    Future<Long> markEndOfStream() {
+    CompletableFuture<Long> markEndOfStream() {
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        Future<BKLogSegmentWriter> logSegmentWriterFuture;
+        CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture;
         synchronized (this) {
             logSegmentWriterFuture = this.rollingFuture;
         }
@@ -495,19 +474,14 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
         }
 
         return FutureUtils.stats(
-                logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
-                    @Override
-                    public Future<Long> apply(BKLogSegmentWriter w) {
-                        return w.markEndOfStream();
-                    }
-                }),
+                logSegmentWriterFuture.thenCompose(w -> w.markEndOfStream()),
                 markEndOfStreamOpStatsLogger,
                 stopwatch);
     }
 
     @Override
-    protected Future<Void> asyncCloseAndComplete() {
-        Future<BKLogSegmentWriter> logSegmentWriterFuture;
+    protected CompletableFuture<Void> asyncCloseAndComplete() {
+        CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture;
         synchronized (this) {
             logSegmentWriterFuture = this.rollingFuture;
         }
@@ -515,18 +489,13 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
         if (null == logSegmentWriterFuture) {
             return super.asyncCloseAndComplete();
         } else {
-            return logSegmentWriterFuture.flatMap(new 
AbstractFunction1<BKLogSegmentWriter, Future<Void>>() {
-                @Override
-                public Future<Void> apply(BKLogSegmentWriter segmentWriter) {
-                    return BKAsyncLogWriter.super.asyncCloseAndComplete();
-                }
-            });
+            return logSegmentWriterFuture.thenCompose(segmentWriter1 -> 
super.asyncCloseAndComplete());
         }
     }
 
     @Override
     void closeAndComplete() throws IOException {
-        FutureUtils.result(asyncCloseAndComplete());
+        Utils.ioResult(asyncCloseAndComplete());
     }
 
     /**
@@ -539,12 +508,12 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
     }
 
     @Override
-    public Future<Void> asyncAbort() {
-        Future<Void> result = super.asyncAbort();
+    public CompletableFuture<Void> asyncAbort() {
+        CompletableFuture<Void> result = super.asyncAbort();
         synchronized (this) {
             if (pendingRequests != null) {
                 for (PendingLogRecord pendingLogRecord : pendingRequests) {
-                    pendingLogRecord.promise.setException(new 
WriteException(bkDistributedLogManager.getStreamName(),
+                    pendingLogRecord.promise.completeExceptionally(new 
WriteException(bkDistributedLogManager.getStreamName(),
                             "abort wring: writer has been closed due to 
error."));
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index 00e6b5c..cffe500 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -20,14 +20,22 @@ package org.apache.distributedlog;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.function.CloseAsyncCloseableFunction;
-import org.apache.distributedlog.function.GetVersionedValueFunction;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
@@ -41,40 +49,25 @@ import 
org.apache.distributedlog.logsegment.LogSegmentFilter;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
 import org.apache.distributedlog.namespace.NamespaceDriver;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
+import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 import org.apache.distributedlog.util.Allocator;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.MonitoredFuturePool;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
-import org.apache.distributedlog.util.PermitManager;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.PermitLimiter;
+import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER;
 import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
@@ -104,20 +97,10 @@ class BKDistributedLogManager implements 
DistributedLogManager {
     static final Logger LOG = 
LoggerFactory.getLogger(BKDistributedLogManager.class);
 
     static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
-            new Function<LogRecordWithDLSN, Long>() {
-                @Override
-                public Long apply(LogRecordWithDLSN record) {
-                    return record.getTransactionId();
-                }
-            };
+        record -> record.getTransactionId();
 
     static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION =
-            new Function<LogRecordWithDLSN, DLSN>() {
-                @Override
-                public DLSN apply(LogRecordWithDLSN record) {
-                    return record.getDlsn();
-                }
-            };
+        record -> record.getDlsn();
 
     private final URI uri;
     private final String name;
@@ -127,7 +110,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
     private final DistributedLogConfiguration conf;
     private final DynamicDistributedLogConfiguration dynConf;
     private final NamespaceDriver driver;
-    private Promise<Void> closePromise;
+    private CompletableFuture<Void> closePromise;
     private final OrderedScheduler scheduler;
     private final FeatureProvider featureProvider;
     private final AsyncFailureInjector failureInjector;
@@ -272,17 +255,18 @@ class BKDistributedLogManager implements 
DistributedLogManager {
 
     @Override
     public List<LogSegmentMetadata> getLogSegments() throws IOException {
-        return FutureUtils.result(getLogSegmentsAsync());
+        return Utils.ioResult(getLogSegmentsAsync());
     }
 
-    protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
+    protected CompletableFuture<List<LogSegmentMetadata>> 
getLogSegmentsAsync() {
         final BKLogReadHandler readHandler = createReadHandler();
         return readHandler.readLogSegmentsFromStore(
                 LogSegmentMetadata.COMPARATOR,
                 LogSegmentFilter.DEFAULT_FILTER,
-                null)
-                .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC)
-                .ensure(CloseAsyncCloseableFunction.of(readHandler));
+                null
+        )
+        .thenApply((versionedList) -> versionedList.getValue())
+        .whenComplete((value, cause) -> readHandler.asyncClose());
     }
 
     @Override
@@ -353,29 +337,26 @@ class BKDistributedLogManager implements 
DistributedLogManager {
 
     public BKLogWriteHandler createWriteHandler(boolean lockHandler)
             throws IOException {
-        return FutureUtils.result(asyncCreateWriteHandler(lockHandler));
+        return Utils.ioResult(asyncCreateWriteHandler(lockHandler));
     }
 
-    Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean 
lockHandler) {
+    CompletableFuture<BKLogWriteHandler> asyncCreateWriteHandler(final boolean 
lockHandler) {
         // Fetching Log Metadata (create if not exists)
         return driver.getLogStreamMetadataStore(WRITER).getLog(
                 uri,
                 name,
                 true,
                 conf.getCreateStreamIfNotExists()
-        ).flatMap(new AbstractFunction1<LogMetadataForWriter, 
Future<BKLogWriteHandler>>() {
-            @Override
-            public Future<BKLogWriteHandler> apply(LogMetadataForWriter 
logMetadata) {
-                Promise<BKLogWriteHandler> createPromise = new 
Promise<BKLogWriteHandler>();
-                createWriteHandler(logMetadata, lockHandler, createPromise);
-                return createPromise;
-            }
+        ).thenCompose(logMetadata -> {
+            CompletableFuture<BKLogWriteHandler> createPromise = new 
CompletableFuture<BKLogWriteHandler>();
+            createWriteHandler(logMetadata, lockHandler, createPromise);
+            return createPromise;
         });
     }
 
     private void createWriteHandler(LogMetadataForWriter logMetadata,
                                     boolean lockHandler,
-                                    final Promise<BKLogWriteHandler> 
createPromise) {
+                                    final CompletableFuture<BKLogWriteHandler> 
createPromise) {
         // Build the locks
         DistributedLock lock;
         if (conf.isWriteLockEnabled()) {
@@ -389,7 +370,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
             segmentAllocator = driver.getLogSegmentEntryStore(WRITER)
                     .newLogSegmentAllocator(logMetadata, dynConf);
         } catch (IOException ioe) {
-            FutureUtils.setException(createPromise, ioe);
+            FutureUtils.completeExceptionally(createPromise, ioe);
             return;
         }
 
@@ -412,25 +393,21 @@ class BKDistributedLogManager implements 
DistributedLogManager {
                 dynConf,
                 lock);
         if (lockHandler) {
-            writeHandler.lockHandler().addEventListener(new 
FutureEventListener<DistributedLock>() {
+            writeHandler.lockHandler().whenComplete(new 
FutureEventListener<DistributedLock>() {
                 @Override
                 public void onSuccess(DistributedLock lock) {
-                    FutureUtils.setValue(createPromise, writeHandler);
+                    FutureUtils.complete(createPromise, writeHandler);
                 }
 
                 @Override
                 public void onFailure(final Throwable cause) {
-                    writeHandler.asyncClose().ensure(new 
AbstractFunction0<BoxedUnit>() {
-                        @Override
-                        public BoxedUnit apply() {
-                            FutureUtils.setException(createPromise, cause);
-                            return BoxedUnit.UNIT;
-                        }
-                    });
+                    FutureUtils.ensure(
+                        writeHandler.asyncClose(),
+                        () -> FutureUtils.completeExceptionally(createPromise, 
cause));
                 }
             });
         } else {
-            FutureUtils.setValue(createPromise, writeHandler);
+            FutureUtils.complete(createPromise, writeHandler);
         }
     }
 
@@ -438,18 +415,15 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         return driver.getLogStreamMetadataStore(WRITER).getPermitManager();
     }
 
-    <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, 
Future<T>> func) {
-        return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() {
-            @Override
-            public BKLogReadHandler applyE() throws Throwable {
-                return getReadHandlerAndRegisterListener(true, null);
-            }
-        }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
-            @Override
-            public Future<T> applyE(final BKLogReadHandler readHandler) throws 
Throwable {
-                return func.apply(readHandler);
-            }
+    <T> CompletableFuture<T> processReaderOperation(final 
Function<BKLogReadHandler, CompletableFuture<T>> func) {
+        CompletableFuture<T> future = FutureUtils.createFuture();
+        scheduler.submit(() -> {
+            BKLogReadHandler readHandler = 
getReadHandlerAndRegisterListener(true, null);
+            FutureUtils.proxyTo(
+                func.apply(readHandler),
+                future);
         });
+        return future;
     }
 
     /**
@@ -461,7 +435,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
     @Override
     public boolean isEndOfStreamMarked() throws IOException {
         checkClosedOrInError("isEndOfStreamMarked");
-        long lastTxId = 
FutureUtils.result(getLastLogRecordAsyncInternal(false, 
true)).getTransactionId();
+        long lastTxId = Utils.ioResult(getLastLogRecordAsyncInternal(false, 
true)).getTransactionId();
         return lastTxId == DistributedLogConstants.MAX_TXID;
     }
 
@@ -473,7 +447,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
     public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws 
IOException {
         long position;
         try {
-            position = FutureUtils.result(getLastLogRecordAsyncInternal(true, 
false)).getTransactionId();
+            position = Utils.ioResult(getLastLogRecordAsyncInternal(true, 
false)).getTransactionId();
             if (DistributedLogConstants.INVALID_TXID == position ||
                 DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID == position) {
                 position = 0;
@@ -508,7 +482,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         try {
             writer.createAndCacheWriteHandler();
             BKLogWriteHandler writeHandler = writer.getWriteHandler();
-            FutureUtils.result(writeHandler.lockHandler());
+            Utils.ioResult(writeHandler.lockHandler());
             success = true;
             return writer;
         } finally {
@@ -525,75 +499,63 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      */
     @Override
     public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws 
IOException {
-        return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
+        return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter());
     }
 
     @Override
-    public Future<AsyncLogWriter> openAsyncLogWriter() {
+    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
         try {
             checkClosedOrInError("startLogSegmentNonPartitioned");
         } catch (AlreadyClosedException e) {
-            return Future.exception(e);
+            return FutureUtils.exception(e);
         }
 
-        Future<BKLogWriteHandler> createWriteHandleFuture;
+        CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
         synchronized (this) {
             // 1. create the locked write handler
             createWriteHandleFuture = asyncCreateWriteHandler(true);
         }
-        return createWriteHandleFuture.flatMap(new 
AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
-            @Override
-            public Future<AsyncLogWriter> apply(final BKLogWriteHandler 
writeHandler) {
-                final BKAsyncLogWriter writer;
-                synchronized (BKDistributedLogManager.this) {
-                    // 2. create the writer with the handler
-                    writer = new BKAsyncLogWriter(
-                            conf,
-                            dynConf,
-                            BKDistributedLogManager.this,
-                            writeHandler,
-                            featureProvider,
-                            statsLogger);
-                }
-                // 3. recover the incomplete log segments
-                return writeHandler.recoverIncompleteLogSegments()
-                        .map(new AbstractFunction1<Long, AsyncLogWriter>() {
-                            @Override
-                            public AsyncLogWriter apply(Long lastTxId) {
-                                // 4. update last tx id if successfully 
recovered
-                                writer.setLastTxId(lastTxId);
-                                return writer;
-                            }
-                        }).onFailure(new AbstractFunction1<Throwable, 
BoxedUnit>() {
-                            @Override
-                            public BoxedUnit apply(Throwable cause) {
-                                // 5. close the writer if recovery failed
-                                writer.asyncAbort();
-                                return BoxedUnit.UNIT;
-                            }
-                        });
+        return createWriteHandleFuture.thenCompose(writeHandler -> {
+            final BKAsyncLogWriter writer;
+            synchronized (BKDistributedLogManager.this) {
+                // 2. create the writer with the handler
+                writer = new BKAsyncLogWriter(
+                        conf,
+                        dynConf,
+                        BKDistributedLogManager.this,
+                        writeHandler,
+                        featureProvider,
+                        statsLogger);
             }
+            // 3. recover the incomplete log segments
+            return writeHandler.recoverIncompleteLogSegments()
+                .thenApply(lastTxId -> {
+                    // 4. update last tx id if successfully recovered
+                    writer.setLastTxId(lastTxId);
+                    return (AsyncLogWriter) writer;
+                })
+                .whenComplete((lastTxId, cause) -> {
+                    if (null != cause) {
+                        // 5. close the writer if recovery failed
+                        writer.asyncAbort();
+                    }
+                });
         });
     }
 
     @Override
-    public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
-        return getLogSegmentsAsync().flatMap(new 
AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
-            @Override
-            public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
-                return getDLSNNotLessThanTxId(fromTxnId, segments);
-            }
-        });
+    public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long 
fromTxnId) {
+        return getLogSegmentsAsync().thenCompose(segments -> 
getDLSNNotLessThanTxId(fromTxnId, segments));
     }
 
-    private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
+    private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
                                                 final List<LogSegmentMetadata> 
segments) {
         if (segments.isEmpty()) {
             return getLastDLSNAsync();
         }
         final int segmentIdx = 
DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId);
         if (segmentIdx < 0) {
-            return Future.value(new 
DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
+            return FutureUtils.value(new 
DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
         }
         return getDLSNNotLessThanTxIdInSegment(
                 fromTxnId,
@@ -603,7 +565,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         );
     }
 
-    private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId,
+    private CompletableFuture<DLSN> getDLSNNotLessThanTxIdInSegment(final long 
fromTxnId,
                                                          final int segmentIdx,
                                                          final 
List<LogSegmentMetadata> segments,
                                                          final 
LogSegmentEntryStore entryStore) {
@@ -615,29 +577,23 @@ class BKDistributedLogManager implements 
DistributedLogManager {
                 scheduler,
                 entryStore,
                 Math.max(2, dynConf.getReadAheadBatchSize())
-        ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, 
Future<DLSN>>() {
-            @Override
-            public Future<DLSN> apply(Optional<LogRecordWithDLSN> foundRecord) 
{
-                if (foundRecord.isPresent()) {
-                    return Future.value(foundRecord.get().getDlsn());
-                }
-                if ((segments.size() - 1) == segmentIdx) {
-                    return getLastLogRecordAsync().map(new 
AbstractFunction1<LogRecordWithDLSN, DLSN>() {
-                        @Override
-                        public DLSN apply(LogRecordWithDLSN record) {
-                            if (record.getTransactionId() >= fromTxnId) {
-                                return record.getDlsn();
-                            }
-                            return record.getDlsn().getNextDLSN();
-                        }
-                    });
-                } else {
-                    return getDLSNNotLessThanTxIdInSegment(
-                            fromTxnId,
-                            segmentIdx + 1,
-                            segments,
-                            entryStore);
-                }
+        ).thenCompose(foundRecord -> {
+            if (foundRecord.isPresent()) {
+                return FutureUtils.value(foundRecord.get().getDlsn());
+            }
+            if ((segments.size() - 1) == segmentIdx) {
+                return getLastLogRecordAsync().thenApply(record -> {
+                    if (record.getTransactionId() >= fromTxnId) {
+                        return record.getDlsn();
+                    }
+                    return record.getDlsn().getNextDLSN();
+                });
+            } else {
+                return getDLSNNotLessThanTxIdInSegment(
+                        fromTxnId,
+                        segmentIdx + 1,
+                        segments,
+                        entryStore);
             }
         });
     }
@@ -662,7 +618,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
 
     @Override
     public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException 
{
-        return FutureUtils.result(openAsyncLogReader(fromTxnId));
+        return Utils.ioResult(openAsyncLogReader(fromTxnId));
     }
 
     /**
@@ -687,39 +643,34 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * @return future representing the open result.
      */
     @Override
-    public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) {
-        final Promise<DLSN> dlsnPromise = new Promise<DLSN>();
-        getDLSNNotLessThanTxId(fromTxnId).addEventListener(new 
FutureEventListener<DLSN>() {
+    public CompletableFuture<AsyncLogReader> openAsyncLogReader(long 
fromTxnId) {
+        final CompletableFuture<DLSN> dlsnPromise = new 
CompletableFuture<DLSN>();
+        getDLSNNotLessThanTxId(fromTxnId).whenComplete(new 
FutureEventListener<DLSN>() {
 
             @Override
             public void onSuccess(DLSN dlsn) {
-                dlsnPromise.setValue(dlsn);
+                dlsnPromise.complete(dlsn);
             }
 
             @Override
             public void onFailure(Throwable cause) {
                 if (cause instanceof LogEmptyException) {
-                    dlsnPromise.setValue(DLSN.InitialDLSN);
+                    dlsnPromise.complete(DLSN.InitialDLSN);
                 } else {
-                    dlsnPromise.setException(cause);
+                    dlsnPromise.completeExceptionally(cause);
                 }
             }
         });
-        return dlsnPromise.flatMap(new AbstractFunction1<DLSN, 
Future<AsyncLogReader>>() {
-            @Override
-            public Future<AsyncLogReader> apply(DLSN dlsn) {
-                return openAsyncLogReader(dlsn);
-            }
-        });
+        return dlsnPromise.thenCompose(dlsn -> openAsyncLogReader(dlsn));
     }
 
     @Override
     public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException {
-        return FutureUtils.result(openAsyncLogReader(fromDLSN));
+        return Utils.ioResult(openAsyncLogReader(fromDLSN));
     }
 
     @Override
-    public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
+    public CompletableFuture<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) 
{
         Optional<String> subscriberId = Optional.absent();
         AsyncLogReader reader = new BKAsyncLogReader(
                 this,
@@ -729,7 +680,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
                 false,
                 statsLogger);
         pendingReaders.add(reader);
-        return Future.value(reader);
+        return FutureUtils.value(reader);
     }
 
     /**
@@ -738,26 +689,26 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * blocked.
      */
     @Override
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN 
fromDLSN) {
+    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final 
DLSN fromDLSN) {
         Optional<String> subscriberId = Optional.absent();
         return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId);
     }
 
     @Override
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN 
fromDLSN, final String subscriberId) {
+    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final 
DLSN fromDLSN, final String subscriberId) {
         return getAsyncLogReaderWithLock(Optional.of(fromDLSN), 
Optional.of(subscriberId));
     }
 
     @Override
-    public Future<AsyncLogReader> getAsyncLogReaderWithLock(String 
subscriberId) {
+    public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(String 
subscriberId) {
         Optional<DLSN> fromDLSN = Optional.absent();
         return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId));
     }
 
-    protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final 
Optional<DLSN> fromDLSN,
+    protected CompletableFuture<AsyncLogReader> 
getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN,
                                                                final 
Optional<String> subscriberId) {
         if (!fromDLSN.isPresent() && !subscriberId.isPresent()) {
-            return Future.exception(new UnexpectedException("Neither from dlsn 
nor subscriber id is provided."));
+            return FutureUtils.exception(new UnexpectedException("Neither from 
dlsn nor subscriber id is provided."));
         }
         final BKAsyncLogReader reader = new BKAsyncLogReader(
                 BKDistributedLogManager.this,
@@ -767,55 +718,50 @@ class BKDistributedLogManager implements 
DistributedLogManager {
                 false,
                 statsLogger);
         pendingReaders.add(reader);
-        final Future<Void> lockFuture = reader.lockStream();
-        final Promise<AsyncLogReader> createPromise = new 
Promise<AsyncLogReader>(
-                new Function<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable cause) {
+        final CompletableFuture<Void> lockFuture = reader.lockStream();
+        final CompletableFuture<AsyncLogReader> createPromise = 
FutureUtils.createFuture();
+        createPromise.whenComplete((value, cause) -> {
+            if (cause instanceof CancellationException) {
                 // cancel the lock when the creation future is cancelled
-                lockFuture.cancel();
-                return BoxedUnit.UNIT;
+                lockFuture.cancel(true);
             }
         });
         // lock the stream - fetch the last commit position on success
-        lockFuture.flatMap(new Function<Void, Future<AsyncLogReader>>() {
+        lockFuture.thenCompose(new Function<Void, 
CompletableFuture<AsyncLogReader>>() {
             @Override
-            public Future<AsyncLogReader> apply(Void complete) {
+            public CompletableFuture<AsyncLogReader> apply(Void complete) {
                 if (fromDLSN.isPresent()) {
-                    return Future.value((AsyncLogReader) reader);
+                    return FutureUtils.value(reader);
                 }
                 LOG.info("Reader {} @ {} reading last commit position from 
subscription store after acquired lock.",
                         subscriberId.get(), name);
                 // we acquired lock
                 final SubscriptionsStore subscriptionsStore = 
driver.getSubscriptionsStore(getStreamName());
                 return 
subscriptionsStore.getLastCommitPosition(subscriberId.get())
-                        .map(new ExceptionalFunction<DLSN, AsyncLogReader>() {
-                    @Override
-                    public AsyncLogReader applyE(DLSN lastCommitPosition) 
throws UnexpectedException {
-                        LOG.info("Reader {} @ {} positioned to last commit 
position {}.",
-                                new Object[] { subscriberId.get(), name, 
lastCommitPosition });
-                        reader.setStartDLSN(lastCommitPosition);
-                        return reader;
-                    }
-                });
+                        .thenCompose(lastCommitPosition -> {
+                            LOG.info("Reader {} @ {} positioned to last commit 
position {}.",
+                                    new Object[] { subscriberId.get(), name, 
lastCommitPosition });
+                            try {
+                                reader.setStartDLSN(lastCommitPosition);
+                            } catch (UnexpectedException e) {
+                                return FutureUtils.exception(e);
+                            }
+                            return FutureUtils.value(reader);
+                        });
             }
-        }).addEventListener(new FutureEventListener<AsyncLogReader>() {
+        }).whenComplete(new FutureEventListener<AsyncLogReader>() {
             @Override
             public void onSuccess(AsyncLogReader r) {
                 pendingReaders.remove(reader);
-                FutureUtils.setValue(createPromise, r);
+                FutureUtils.complete(createPromise, r);
             }
 
             @Override
             public void onFailure(final Throwable cause) {
                 pendingReaders.remove(reader);
-                reader.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply() {
-                        FutureUtils.setException(createPromise, cause);
-                        return BoxedUnit.UNIT;
-                    }
-                });
+                FutureUtils.ensure(
+                    reader.asyncClose(),
+                    () -> FutureUtils.completeExceptionally(createPromise, 
cause));
             }
         });
         return createPromise;
@@ -833,7 +779,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         throws IOException {
         DLSN fromDLSN;
         try {
-            fromDLSN = FutureUtils.result(getDLSNNotLessThanTxId(fromTxnId));
+            fromDLSN = Utils.ioResult(getDLSNNotLessThanTxId(fromTxnId));
         } catch (LogEmptyException lee) {
             fromDLSN = DLSN.InitialDLSN;
         }
@@ -861,25 +807,25 @@ class BKDistributedLogManager implements 
DistributedLogManager {
     @Override
     public LogRecordWithDLSN getLastLogRecord() throws IOException {
         checkClosedOrInError("getLastLogRecord");
-        return FutureUtils.result(getLastLogRecordAsync());
+        return Utils.ioResult(getLastLogRecordAsync());
     }
 
     @Override
     public long getFirstTxId() throws IOException {
         checkClosedOrInError("getFirstTxId");
-        return 
FutureUtils.result(getFirstRecordAsyncInternal()).getTransactionId();
+        return 
Utils.ioResult(getFirstRecordAsyncInternal()).getTransactionId();
     }
 
     @Override
     public long getLastTxId() throws IOException {
         checkClosedOrInError("getLastTxId");
-        return FutureUtils.result(getLastTxIdAsync());
+        return Utils.ioResult(getLastTxIdAsync());
     }
 
     @Override
     public DLSN getLastDLSN() throws IOException {
         checkClosedOrInError("getLastDLSN");
-        return FutureUtils.result(getLastLogRecordAsyncInternal(false, 
false)).getDlsn();
+        return Utils.ioResult(getLastLogRecordAsyncInternal(false, 
false)).getDlsn();
     }
 
     /**
@@ -888,15 +834,15 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * @return latest log record
      */
     @Override
-    public Future<LogRecordWithDLSN> getLastLogRecordAsync() {
+    public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync() {
         return getLastLogRecordAsyncInternal(false, false);
     }
 
-    private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final 
boolean recover,
+    private CompletableFuture<LogRecordWithDLSN> 
getLastLogRecordAsyncInternal(final boolean recover,
                                                                     final 
boolean includeEndOfStream) {
-        return processReaderOperation(new Function<BKLogReadHandler, 
Future<LogRecordWithDLSN>>() {
+        return processReaderOperation(new Function<BKLogReadHandler, 
CompletableFuture<LogRecordWithDLSN>>() {
             @Override
-            public Future<LogRecordWithDLSN> apply(final BKLogReadHandler 
ledgerHandler) {
+            public CompletableFuture<LogRecordWithDLSN> apply(final 
BKLogReadHandler ledgerHandler) {
                 return ledgerHandler.getLastLogRecordAsync(recover, 
includeEndOfStream);
             }
         });
@@ -908,9 +854,9 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * @return latest transaction id
      */
     @Override
-    public Future<Long> getLastTxIdAsync() {
+    public CompletableFuture<Long> getLastTxIdAsync() {
         return getLastLogRecordAsyncInternal(false, false)
-                .map(RECORD_2_TXID_FUNCTION);
+                .thenApply(RECORD_2_TXID_FUNCTION);
     }
 
     /**
@@ -919,14 +865,14 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * @return first dlsn in the stream
      */
     @Override
-    public Future<DLSN> getFirstDLSNAsync() {
-        return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION);
+    public CompletableFuture<DLSN> getFirstDLSNAsync() {
+        return getFirstRecordAsyncInternal().thenApply(RECORD_2_DLSN_FUNCTION);
     }
 
-    private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
-        return processReaderOperation(new Function<BKLogReadHandler, 
Future<LogRecordWithDLSN>>() {
+    private CompletableFuture<LogRecordWithDLSN> getFirstRecordAsyncInternal() 
{
+        return processReaderOperation(new Function<BKLogReadHandler, 
CompletableFuture<LogRecordWithDLSN>>() {
             @Override
-            public Future<LogRecordWithDLSN> apply(final BKLogReadHandler 
ledgerHandler) {
+            public CompletableFuture<LogRecordWithDLSN> apply(final 
BKLogReadHandler ledgerHandler) {
                 return ledgerHandler.asyncGetFirstLogRecord();
             }
         });
@@ -938,9 +884,9 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * @return latest transaction id
      */
     @Override
-    public Future<DLSN> getLastDLSNAsync() {
+    public CompletableFuture<DLSN> getLastDLSNAsync() {
         return getLastLogRecordAsyncInternal(false, false)
-                .map(RECORD_2_DLSN_FUNCTION);
+                .thenApply(RECORD_2_DLSN_FUNCTION);
     }
 
     /**
@@ -953,7 +899,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
     @Override
     public long getLogRecordCount() throws IOException {
         checkClosedOrInError("getLogRecordCount");
-        return FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN));
+        return Utils.ioResult(getLogRecordCountAsync(DLSN.InitialDLSN));
     }
 
     /**
@@ -964,10 +910,10 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * @throws IOException
      */
     @Override
-    public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN) {
-        return processReaderOperation(new Function<BKLogReadHandler, 
Future<Long>>() {
+    public CompletableFuture<Long> getLogRecordCountAsync(final DLSN 
beginDLSN) {
+        return processReaderOperation(new Function<BKLogReadHandler, 
CompletableFuture<Long>>() {
                     @Override
-                    public Future<Long> apply(BKLogReadHandler ledgerHandler) {
+                    public CompletableFuture<Long> apply(BKLogReadHandler 
ledgerHandler) {
                         return ledgerHandler.asyncGetLogRecordCount(beginDLSN);
                     }
                 });
@@ -991,7 +937,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         checkClosedOrInError("recoverInternal");
         BKLogWriteHandler ledgerHandler = createWriteHandler(true);
         try {
-            FutureUtils.result(ledgerHandler.recoverIncompleteLogSegments());
+            Utils.ioResult(ledgerHandler.recoverIncompleteLogSegments());
         } finally {
             Utils.closeQuietly(ledgerHandler);
         }
@@ -1004,7 +950,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      */
     @Override
     public void delete() throws IOException {
-        FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+        Utils.ioResult(driver.getLogStreamMetadataStore(WRITER)
                 .deleteLog(uri, getStreamName()));
     }
 
@@ -1025,7 +971,7 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         BKLogWriteHandler ledgerHandler = createWriteHandler(true);
         try {
             LOG.info("Purging logs for {} older than {}", 
ledgerHandler.getFullyQualifiedName(), minTxIdToKeep);
-            
FutureUtils.result(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep));
+            
Utils.ioResult(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep));
         } finally {
             Utils.closeQuietly(ledgerHandler);
         }
@@ -1049,14 +995,11 @@ class BKDistributedLogManager implements 
DistributedLogManager {
         }
 
         @Override
-        public Future<Void> asyncClose() {
+        public CompletableFuture<Void> asyncClose() {
             return Utils.closeSequence(executorService, true, 
readers.toArray(new AsyncLogReader[readers.size()]))
-                    .onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
-                        @Override
-                        public BoxedUnit apply(Void value) {
-                            readers.clear();
-                            return BoxedUnit.UNIT;
-                        }
+                    .thenApply(value -> {
+                        readers.clear();
+                        return null;
                     });
         }
     };
@@ -1065,28 +1008,28 @@ class BKDistributedLogManager implements 
DistributedLogManager {
      * Close the distributed log manager, freeing any resources it may hold.
      */
     @Override
-    public Future<Void> asyncClose() {
-        Promise<Void> closeFuture;
+    public CompletableFuture<Void> asyncClose() {
+        CompletableFuture<Void> closeFuture;
         BKLogReadHandler readHandlerToClose;
         synchronized (this) {
             if (null != closePromise) {
                 return closePromise;
             }
-            closeFuture = closePromise = new Promise<Void>();
+            closeFuture = closePromise = new CompletableFuture<Void>();
             readHandlerToClose = readHandlerForListener;
         }
 
-        Future<Void> closeResult = Utils.closeSequence(null, true,
+        CompletableFuture<Void> closeResult = Utils.closeSequence(null, true,
                 readHandlerToClose,
                 pendingReaders,
                 resourcesCloseable.or(AsyncCloseable.NULL));
-        closeResult.proxyTo(closeFuture);
+        FutureUtils.proxyTo(closeResult, closeFuture);
         return closeFuture;
     }
 
     @Override
     public void close() throws IOException {
-        FutureUtils.result(asyncClose());
+        Utils.ioResult(asyncClose());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index adb591f..60ad916 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -20,6 +20,8 @@ package org.apache.distributedlog;
 import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
 import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
@@ -28,14 +30,11 @@ import 
org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.io.AsyncCloseable;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.PermitLimiter;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import org.apache.distributedlog.util.Utils;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -52,7 +51,7 @@ import static 
org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
 import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
 
 /**
- * BKDistributedLogNamespace is the default implementation of {@link 
DistributedLogNamespace}. It uses
+ * BKDistributedLogNamespace is the default implementation of {@link 
Namespace}. It uses
  * zookeeper for metadata storage and bookkeeper for data storage.
  * <h3>Metrics</h3>
  *
@@ -74,8 +73,6 @@ import static 
org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
  * <ul>
  * <li> `scope`/factory/thread_pool/* : stats about the ordered scheduler used 
by this namespace.
  * See {@link OrderedScheduler}.
- * <li> `scope`/factory/readahead_thread_pool/* : stats about the readahead 
thread pool executor
- * used by this namespace. See {@link MonitoredScheduledThreadPoolExecutor}.
  * <li> `scope`/writeLimiter/* : stats about the global write limiter used by 
this namespace.
  * See {@link PermitLimiter}.
  * </ul>
@@ -84,7 +81,7 @@ import static 
org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
  *
  * All the core stats about reader and writer are exposed under current scope 
via {@link BKDistributedLogManager}.
  */
-public class BKDistributedLogNamespace implements DistributedLogNamespace {
+public class BKDistributedLogNamespace implements Namespace {
     static final Logger LOG = 
LoggerFactory.getLogger(BKDistributedLogNamespace.class);
 
     private final String clientId;
@@ -149,8 +146,8 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             throws InvalidStreamNameException, IOException {
         checkState();
         logName = validateAndNormalizeName(logName);
-        URI uri = 
FutureUtils.result(driver.getLogMetadataStore().createLog(logName));
-        
FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, 
logName, true, true));
+        URI uri = 
Utils.ioResult(driver.getLogMetadataStore().createLog(logName));
+        Utils.ioResult(driver.getLogStreamMetadataStore(WRITER).getLog(uri, 
logName, true, true));
     }
 
     @Override
@@ -158,7 +155,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             throws InvalidStreamNameException, LogNotFoundException, 
IOException {
         checkState();
         logName = validateAndNormalizeName(logName);
-        Optional<URI> uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+        Optional<URI> uri = 
Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
         if (!uri.isPresent()) {
             throw new LogNotFoundException("Log " + logName + " isn't found.");
         }
@@ -187,7 +184,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             throws InvalidStreamNameException, IOException {
         checkState();
         logName = validateAndNormalizeName(logName);
-        Optional<URI> uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+        Optional<URI> uri = 
Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
         if (!uri.isPresent()) {
             throw new LogNotFoundException("Log " + logName + " isn't found.");
         }
@@ -202,10 +199,10 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
     public boolean logExists(String logName)
         throws IOException, IllegalArgumentException {
         checkState();
-        Optional<URI> uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+        Optional<URI> uri = 
Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
         if (uri.isPresent()) {
             try {
-                FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+                Utils.ioResult(driver.getLogStreamMetadataStore(WRITER)
                         .logExists(uri.get(), logName));
                 return true;
             } catch (LogNotFoundException lnfe) {
@@ -219,7 +216,7 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
     @Override
     public Iterator<String> getLogs() throws IOException {
         checkState();
-        return FutureUtils.result(driver.getLogMetadataStore().getLogs());
+        return Utils.ioResult(driver.getLogMetadataStore().getLogs());
     }
 
     @Override

Reply via email to