http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java 
b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
index 347f041..99a4155 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Utils.java
@@ -20,8 +20,11 @@ package org.apache.distributedlog.util;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 
@@ -29,18 +32,18 @@ import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.exceptions.InvalidStreamNameException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.function.VoidFunctions;
+import org.apache.distributedlog.common.functions.VoidFunctions;
+import org.apache.distributedlog.io.AsyncAbortable;
 import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.ZooKeeper;
@@ -49,17 +52,13 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
 
 /**
  * Basic Utilities.
  */
+@Slf4j
 public class Utils {
 
-    private static final Logger logger = LoggerFactory.getLogger(Utils.class);
-
     /**
      * Current time from some arbitrary time base in the past, counting in
      * nanoseconds, and not affected by settimeofday or similar system clock
@@ -115,16 +114,15 @@ public class Utils {
         String path,
         byte[] data,
         final List<ACL> acl,
-        final CreateMode createMode)
-        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, 
InterruptedException {
+        final CreateMode createMode) throws IOException, KeeperException {
         try {
-            Await.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, 
createMode));
+            FutureUtils.result(zkAsyncCreateFullPathOptimistic(zkc, path, 
data, acl, createMode));
         } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
             throw zkce;
         } catch (KeeperException ke) {
             throw ke;
         } catch (InterruptedException ie) {
-            throw ie;
+            throw new DLInterruptedException("Interrupted on create zookeeper 
path " + path, ie);
         } catch (RuntimeException rte) {
             throw rte;
         } catch (Exception exc) {
@@ -208,7 +206,7 @@ public class Utils {
      * @param acl Acl of the zk path
      * @param createMode Create mode of zk path
      */
-    public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic(
+    public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
         final ZooKeeperClient zkc,
         final String pathToCreate,
         final byte[] data,
@@ -234,14 +232,14 @@ public class Utils {
      * @param acl Acl of the zk path
      * @param createMode Create mode of zk path
      */
-    public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic(
+    public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
         final ZooKeeperClient zkc,
         final String pathToCreate,
         final Optional<String> parentPathShouldNotCreate,
         final byte[] data,
         final List<ACL> acl,
         final CreateMode createMode) {
-        final Promise<BoxedUnit> result = new Promise<BoxedUnit>();
+        final CompletableFuture<Void> result = new CompletableFuture<Void>();
 
         zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, 
parentPathShouldNotCreate,
                 data, acl, createMode, new AsyncCallback.StringCallback() {
@@ -263,13 +261,13 @@ public class Utils {
      * @param acl Acl of the zk path
      * @param createMode Create mode of zk path
      */
-    public static Future<BoxedUnit> zkAsyncCreateFullPathOptimisticAndSetData(
+    public static CompletableFuture<Void> 
zkAsyncCreateFullPathOptimisticAndSetData(
         final ZooKeeperClient zkc,
         final String pathToCreate,
         final byte[] data,
         final List<ACL> acl,
         final CreateMode createMode) {
-        final Promise<BoxedUnit> result = new Promise<BoxedUnit>();
+        final CompletableFuture<Void> result = new CompletableFuture<Void>();
 
         try {
             zkc.get().setData(pathToCreate, data, -1, new 
AsyncCallback.StatCallback() {
@@ -291,32 +289,32 @@ public class Utils {
                 }
             }, result);
         } catch (Exception exc) {
-            result.setException(exc);
+            result.completeExceptionally(exc);
         }
 
         return result;
     }
 
-    private static void handleKeeperExceptionCode(int rc, String 
pathOrMessage, Promise<BoxedUnit> result) {
+    private static void handleKeeperExceptionCode(int rc, String 
pathOrMessage, CompletableFuture<Void> result) {
         if (KeeperException.Code.OK.intValue() == rc) {
-            result.setValue(BoxedUnit.UNIT);
+            result.complete(null);
         } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE 
== rc) {
-            result.setException(new 
ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage));
+            result.completeExceptionally(new 
ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage));
         } else if 
(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
-            result.setException(new DLInterruptedException(pathOrMessage));
+            result.completeExceptionally(new 
DLInterruptedException(pathOrMessage));
         } else {
-            
result.setException(KeeperException.create(KeeperException.Code.get(rc), 
pathOrMessage));
+            
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc),
 pathOrMessage));
         }
     }
 
-    public static Future<Versioned<byte[]>> zkGetData(ZooKeeperClient zkc, 
String path, boolean watch) {
+    public static CompletableFuture<Versioned<byte[]>> 
zkGetData(ZooKeeperClient zkc, String path, boolean watch) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         }
         return zkGetData(zk, path, watch);
     }
@@ -330,35 +328,35 @@ public class Utils {
      *          whether to watch the path
      * @return future representing the versioned value. null version or null 
value means path doesn't exist.
      */
-    public static Future<Versioned<byte[]>> zkGetData(ZooKeeper zk, String 
path, boolean watch) {
-        final Promise<Versioned<byte[]>> promise = new 
Promise<Versioned<byte[]>>();
+    public static CompletableFuture<Versioned<byte[]>> zkGetData(ZooKeeper zk, 
String path, boolean watch) {
+        final CompletableFuture<Versioned<byte[]>> promise = new 
CompletableFuture<Versioned<byte[]>>();
         zk.getData(path, watch, new AsyncCallback.DataCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, byte[] 
data, Stat stat) {
                 if (KeeperException.Code.OK.intValue() == rc) {
                     if (null == stat) {
-                        promise.setValue(new Versioned<byte[]>(null, null));
+                        promise.complete(new Versioned<byte[]>(null, null));
                     } else {
-                        promise.setValue(new Versioned<byte[]>(data, new 
ZkVersion(stat.getVersion())));
+                        promise.complete(new Versioned<byte[]>(data, new 
ZkVersion(stat.getVersion())));
                     }
                 } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                    promise.setValue(new Versioned<byte[]>(null, null));
+                    promise.complete(new Versioned<byte[]>(null, null));
                 } else {
-                    
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                    
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                 }
             }
         }, null);
         return promise;
     }
 
-    public static Future<ZkVersion> zkSetData(ZooKeeperClient zkc, String 
path, byte[] data, ZkVersion version) {
+    public static CompletableFuture<ZkVersion> zkSetData(ZooKeeperClient zkc, 
String path, byte[] data, ZkVersion version) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         }
         return zkSetData(zk, path, data, version);
     }
@@ -376,31 +374,31 @@ public class Utils {
      *          version used to set data
      * @return future representing the version after this operation.
      */
-    public static Future<ZkVersion> zkSetData(ZooKeeper zk, String path, 
byte[] data, ZkVersion version) {
-        final Promise<ZkVersion> promise = new Promise<ZkVersion>();
+    public static CompletableFuture<ZkVersion> zkSetData(ZooKeeper zk, String 
path, byte[] data, ZkVersion version) {
+        final CompletableFuture<ZkVersion> promise = new 
CompletableFuture<ZkVersion>();
         zk.setData(path, data, version.getZnodeVersion(), new 
AsyncCallback.StatCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, Stat 
stat) {
                 if (KeeperException.Code.OK.intValue() == rc) {
-                    promise.updateIfEmpty(new Return<ZkVersion>(new 
ZkVersion(stat.getVersion())));
+                    promise.complete(new ZkVersion(stat.getVersion()));
                     return;
                 }
-                promise.updateIfEmpty(new Throw<ZkVersion>(
-                        KeeperException.create(KeeperException.Code.get(rc))));
+                promise.completeExceptionally(
+                        KeeperException.create(KeeperException.Code.get(rc)));
                 return;
             }
         }, null);
         return promise;
     }
 
-    public static Future<Void> zkDelete(ZooKeeperClient zkc, String path, 
ZkVersion version) {
+    public static CompletableFuture<Void> zkDelete(ZooKeeperClient zkc, String 
path, ZkVersion version) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         }
         return zkDelete(zk, path, version);
     }
@@ -416,17 +414,17 @@ public class Utils {
      *          version used to set data
      * @return future representing the version after this operation.
      */
-    public static Future<Void> zkDelete(ZooKeeper zk, String path, ZkVersion 
version) {
-        final Promise<Void> promise = new Promise<Void>();
+    public static CompletableFuture<Void> zkDelete(ZooKeeper zk, String path, 
ZkVersion version) {
+        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
         zk.delete(path, version.getZnodeVersion(), new 
AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (KeeperException.Code.OK.intValue() == rc) {
-                    promise.updateIfEmpty(new Return<Void>(null));
+                    promise.complete(null);
                     return;
                 }
-                promise.updateIfEmpty(new Throw<Void>(
-                        KeeperException.create(KeeperException.Code.get(rc))));
+                promise.completeExceptionally(
+                        KeeperException.create(KeeperException.Code.get(rc)));
                 return;
             }
         }, null);
@@ -446,35 +444,35 @@ public class Utils {
      * false if the node doesn't exist, otherwise future will throw exception
      *
      */
-    public static Future<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, 
String path, ZkVersion version) {
+    public static CompletableFuture<Boolean> 
zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) {
         ZooKeeper zk;
         try {
             zk = zkc.get();
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
+            return FutureUtils.exception(zkException(e, path));
         }
-        final Promise<Boolean> promise = new Promise<Boolean>();
+        final CompletableFuture<Boolean> promise = new 
CompletableFuture<Boolean>();
         zk.delete(path, version.getZnodeVersion(), new 
AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (KeeperException.Code.OK.intValue() == rc ) {
-                    promise.setValue(true);
+                    promise.complete(true);
                 } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                    promise.setValue(false);
+                    promise.complete(false);
                 } else {
-                    
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                    
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                 }
             }
         }, null);
         return promise;
     }
 
-    public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable,
+    public static CompletableFuture<Void> asyncClose(@Nullable AsyncCloseable 
closeable,
                                           boolean swallowIOException) {
         if (null == closeable) {
-            return Future.Void();
+            return FutureUtils.Void();
         } else if (swallowIOException) {
             return FutureUtils.ignore(closeable.asyncClose());
         } else {
@@ -548,7 +546,7 @@ public class Utils {
         if (null == closeable) {
             return;
         }
-        FutureUtils.result(closeable.asyncClose());
+        Utils.ioResult(closeable.asyncClose());
     }
 
     /**
@@ -562,7 +560,7 @@ public class Utils {
             return;
         }
         try {
-            FutureUtils.result(closeable.asyncClose());
+            Utils.ioResult(closeable.asyncClose());
         } catch (IOException e) {
             // no-op. the exception is swallowed.
         }
@@ -575,7 +573,7 @@ public class Utils {
      *          closeables to close
      * @return future represents the close future
      */
-    public static Future<Void> closeSequence(ExecutorService executorService,
+    public static CompletableFuture<Void> closeSequence(ExecutorService 
executorService,
                                              AsyncCloseable... closeables) {
         return closeSequence(executorService, false, closeables);
     }
@@ -588,7 +586,7 @@ public class Utils {
      * @param closeables list of closeables
      * @return future represents the close future.
      */
-    public static Future<Void> closeSequence(ExecutorService executorService,
+    public static CompletableFuture<Void> closeSequence(ExecutorService 
executorService,
                                              boolean ignoreCloseError,
                                              AsyncCloseable... closeables) {
         List<AsyncCloseable> closeableList = 
Lists.newArrayListWithExpectedSize(closeables.length);
@@ -602,7 +600,8 @@ public class Utils {
         return FutureUtils.processList(
                 closeableList,
                 ignoreCloseError ? AsyncCloseable.CLOSE_FUNC_IGNORE_ERRORS : 
AsyncCloseable.CLOSE_FUNC,
-                executorService).map(VoidFunctions.LIST_TO_VOID_FUNC);
+                executorService
+        ).thenApply(VoidFunctions.LIST_TO_VOID_FUNC);
     }
 
     /**
@@ -636,4 +635,112 @@ public class Utils {
         return path.substring(0, lastIndex);
     }
 
+    /**
+     * Convert the <i>throwable</i> to zookeeper related exceptions.
+     *
+     * @param throwable cause
+     * @param path zookeeper path
+     * @return zookeeper related exceptions
+     */
+    public static Throwable zkException(Throwable throwable, String path) {
+        if (throwable instanceof KeeperException) {
+            return new ZKException("Encountered zookeeper exception on " + 
path, (KeeperException) throwable);
+        } else if (throwable instanceof 
ZooKeeperClient.ZooKeeperConnectionException) {
+            return new ZKException("Encountered zookeeper connection loss on " 
+ path,
+                    KeeperException.Code.CONNECTIONLOSS);
+        } else if (throwable instanceof InterruptedException) {
+            return new DLInterruptedException("Interrupted on operating " + 
path, throwable);
+        } else {
+            return new UnexpectedException("Encountered unexpected exception 
on operatiing " + path, throwable);
+        }
+    }
+
+    /**
+     * Create transmit exception from transmit result.
+     *
+     * @param transmitResult
+     *          transmit result (basically bk exception code)
+     * @return transmit exception
+     */
+    public static BKTransmitException transmitException(int transmitResult) {
+        return new BKTransmitException("Failed to write to bookkeeper; Error 
is ("
+            + transmitResult + ") "
+            + BKException.getMessage(transmitResult), transmitResult);
+    }
+
+    /**
+     * A specific version of {@link FutureUtils#result(CompletableFuture)} to 
handle known exception issues.
+     */
+    public static <T> T ioResult(CompletableFuture<T> result) throws 
IOException {
+        return FutureUtils.result(
+            result,
+            (cause) -> {
+                if (cause instanceof IOException) {
+                    return (IOException) cause;
+                } else if (cause instanceof KeeperException) {
+                    return new ZKException("Encountered zookeeper exception on 
waiting result",
+                        (KeeperException) cause);
+                } else if (cause instanceof BKException) {
+                    return new BKTransmitException("Encountered bookkeeper 
exception on waiting result",
+                        ((BKException) cause).getCode());
+                } else if (cause instanceof InterruptedException) {
+                    return new DLInterruptedException("Interrupted on waiting 
result", cause);
+                } else {
+                    return new IOException("Encountered exception on waiting 
result", cause);
+                }
+            });
+    }
+
+    /**
+     * A specific version of {@link FutureUtils#result(CompletableFuture, 
long, TimeUnit)}
+     * to handle known exception issues.
+     */
+    public static <T> T ioResult(CompletableFuture<T> result, long timeout, 
TimeUnit timeUnit)
+            throws IOException, TimeoutException {
+        return FutureUtils.result(
+            result,
+            (cause) -> {
+                if (cause instanceof IOException) {
+                    return (IOException) cause;
+                } else if (cause instanceof KeeperException) {
+                    return new ZKException("Encountered zookeeper exception on 
waiting result",
+                        (KeeperException) cause);
+                } else if (cause instanceof BKException) {
+                    return new BKTransmitException("Encountered bookkeeper 
exception on waiting result",
+                        ((BKException) cause).getCode());
+                } else if (cause instanceof InterruptedException) {
+                    return new DLInterruptedException("Interrupted on waiting 
result", cause);
+                } else {
+                    return new IOException("Encountered exception on waiting 
result", cause);
+                }
+            },
+            timeout,
+            timeUnit);
+    }
+
+    /**
+     * Abort async <i>abortable</i>
+     *
+     * @param abortable the {@code AsyncAbortable} object to be aborted, or 
null, in which case this method
+     *                  does nothing.
+     * @param swallowIOException if true, don't propagate IO exceptions thrown 
by the {@code abort} methods
+     * @throws IOException if {@code swallowIOException} is false and {@code 
abort} throws an {@code IOException}
+     */
+    public static void abort(@Nullable AsyncAbortable abortable,
+                             boolean swallowIOException)
+            throws IOException {
+        if (null == abortable) {
+            return;
+        }
+        try {
+            ioResult(abortable.asyncAbort());
+        } catch (Exception ioe) {
+            if (swallowIOException) {
+                log.warn("IOException thrown while aborting Abortable {} : ", 
abortable, ioe);
+            } else {
+                throw ioe;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
index 9a61c1c..1dd702f 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/LimitedPermitManager.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.zk;
 
-import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java
index a5da9c0..aeabbfa 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/zk/ZKTransaction.java
@@ -18,18 +18,17 @@
 package org.apache.distributedlog.zk;
 
 import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
+import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.OpResult;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * ZooKeeper Transaction
  */
@@ -38,14 +37,14 @@ public class ZKTransaction implements Transaction<Object>, 
AsyncCallback.MultiCa
     private final ZooKeeperClient zkc;
     private final List<ZKOp> ops;
     private final List<org.apache.zookeeper.Op> zkOps;
-    private final Promise<Void> result;
+    private final CompletableFuture<Void> result;
     private final AtomicBoolean done = new AtomicBoolean(false);
 
     public ZKTransaction(ZooKeeperClient zkc) {
         this.zkc = zkc;
         this.ops = Lists.newArrayList();
         this.zkOps = Lists.newArrayList();
-        this.result = new Promise<Void>();
+        this.result = new CompletableFuture<Void>();
     }
 
     @Override
@@ -60,16 +59,16 @@ public class ZKTransaction implements Transaction<Object>, 
AsyncCallback.MultiCa
     }
 
     @Override
-    public Future<Void> execute() {
+    public CompletableFuture<Void> execute() {
         if (!done.compareAndSet(false, true)) {
             return result;
         }
         try {
             zkc.get().multi(zkOps, this, result);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            result.setException(FutureUtils.zkException(e, ""));
+            result.completeExceptionally(Utils.zkException(e, ""));
         } catch (InterruptedException e) {
-            result.setException(FutureUtils.zkException(e, ""));
+            result.completeExceptionally(Utils.zkException(e, ""));
         }
         return result;
     }
@@ -82,7 +81,7 @@ public class ZKTransaction implements Transaction<Object>, 
AsyncCallback.MultiCa
         for (int i = 0; i < ops.size(); i++) {
             ops.get(i).abortOpResult(cause, null);
         }
-        FutureUtils.setException(result, cause);
+        FutureUtils.completeExceptionally(result, cause);
     }
 
     @Override
@@ -91,13 +90,13 @@ public class ZKTransaction implements Transaction<Object>, 
AsyncCallback.MultiCa
             for (int i = 0; i < ops.size(); i++) {
                 ops.get(i).commitOpResult(results.get(i));
             }
-            FutureUtils.setValue(result, null);
+            FutureUtils.complete(result, null);
         } else {
             KeeperException ke = 
KeeperException.create(KeeperException.Code.get(rc));
             for (int i = 0; i < ops.size(); i++) {
                 ops.get(i).abortOpResult(ke, null != results ? results.get(i) 
: null);
             }
-            FutureUtils.setException(result, ke);
+            FutureUtils.completeExceptionally(result, ke);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/resources/findbugsExclude.xml 
b/distributedlog-core/src/main/resources/findbugsExclude.xml
index 80adec8..40920db 100644
--- a/distributedlog-core/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-core/src/main/resources/findbugsExclude.xml
@@ -37,4 +37,77 @@
     <Method name="run" />
     <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
   </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.BKLogReadHandler$1" />
+    <Method name="onSuccess" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.BookKeeperClient$2" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.ReadUtils" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.ReadUtils$2" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.auditor.DLAuditor$2" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.auditor.DLAuditor$8" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$4" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$4$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$5" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.impl.acl.ZKAccessControl$4" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.impl.acl.ZKAccessControlManager$1" 
/>
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class 
name="org.apache.distributedlog.impl.acl.ZKAccessControlManager$1$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class 
name="org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore$1$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.lock.ZKSessionLock" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.lock.ZKSessionLock$12" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.lock.ZKSessionLock$13$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.util.Utils" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.util.Utils$6" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java 
b/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java
index 96d2d1c..d821b05 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/DLMTestUtil.java
@@ -17,20 +17,21 @@
  */
 package org.apache.distributedlog;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.MetadataAccessor;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.PermitLimiter;
-import org.apache.distributedlog.util.RetryPolicyUtils;
+import org.apache.distributedlog.common.util.PermitLimiter;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -78,7 +79,7 @@ public class DLMTestUtil {
             new HashMap<Long, LogSegmentMetadata>(children.size());
         for (String child : children) {
             LogSegmentMetadata segment =
-                    FutureUtils.result(LogSegmentMetadata.read(zkc, ledgerPath 
+ "/" + child));
+                    Utils.ioResult(LogSegmentMetadata.read(zkc, ledgerPath + 
"/" + child));
             LOG.info("Read segment {} : {}", child, segment);
             segments.put(segment.getLogSegmentSequenceNumber(), segment);
         }
@@ -92,7 +93,7 @@ public class DLMTestUtil {
     public static DistributedLogManager createNewDLM(String name,
                                                      
DistributedLogConfiguration conf,
                                                      URI uri) throws Exception 
{
-        DistributedLogNamespace namespace = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
         return namespace.openLog(name);
     }
@@ -102,7 +103,7 @@ public class DLMTestUtil {
                                                       URI uri) throws 
Exception {
         // TODO: Metadata Accessor seems to be a legacy object which only used 
by kestrel
         //       (we might consider deprecating this)
-        DistributedLogNamespace namespace = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf).uri(uri).build();
         return namespace.getNamespaceDriver().getMetadataAccessor(name);
     }
@@ -113,7 +114,7 @@ public class DLMTestUtil {
             List<LogSegmentMetadata> logSegmentList = dlm.getLogSegments();
             LogSegmentMetadata lastSegment = 
logSegmentList.get(logSegmentList.size() - 1);
             LogSegmentEntryStore entryStore = 
dlm.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER);
-            
Utils.close(FutureUtils.result(entryStore.openRandomAccessReader(lastSegment, 
true)));
+            
Utils.close(Utils.ioResult(entryStore.openRandomAccessReader(lastSegment, 
true)));
         } finally {
             dlm.close();
         }
@@ -313,12 +314,12 @@ public class DLMTestUtil {
         for (int i = 0; i < controlEntries; ++i) {
             LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
             record.setControl();
-            Await.result(out.write(record));
+            Utils.ioResult(out.write(record));
             txid += txidStep;
         }
         for (int i = 0; i < userEntries; ++i) {
             LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
-            Await.result(out.write(record));
+            Utils.ioResult(out.write(record));
             txid += txidStep;
         }
         Utils.close(out);
@@ -339,7 +340,7 @@ public class DLMTestUtil {
             throws Exception {
         BKDistributedLogManager dlm = (BKDistributedLogManager) manager;
         BKLogWriteHandler writeHandler = dlm.createWriteHandler(false);
-        FutureUtils.result(writeHandler.lockHandler());
+        Utils.ioResult(writeHandler.lockHandler());
         // Start a log segment with a given ledger seq number.
         BookKeeperClient bkc = getBookKeeperClient(dlm);
         LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), 
conf.getWriteQuorumSize(),
@@ -377,12 +378,12 @@ public class DLMTestUtil {
             for (long j = 1; j <= segmentSize; j++) {
                 writer.write(DLMTestUtil.getLogRecordInstance(txid++));
             }
-            FutureUtils.result(writer.flushAndCommit());
+            Utils.ioResult(writer.flushAndCommit());
         }
         if (completeLogSegment) {
-            
FutureUtils.result(writeHandler.completeAndCloseLogSegment(writer));
+            Utils.ioResult(writeHandler.completeAndCloseLogSegment(writer));
         }
-        FutureUtils.result(writeHandler.unlockHandler());
+        Utils.ioResult(writeHandler.unlockHandler());
     }
 
     public static void injectLogSegmentWithLastDLSN(DistributedLogManager 
manager, DistributedLogConfiguration conf,
@@ -390,7 +391,7 @@ public class DLMTestUtil {
                                                     boolean 
recordWrongLastDLSN) throws Exception {
         BKDistributedLogManager dlm = (BKDistributedLogManager) manager;
         BKLogWriteHandler writeHandler = dlm.createWriteHandler(false);
-        FutureUtils.result(writeHandler.lockHandler());
+        Utils.ioResult(writeHandler.lockHandler());
         // Start a log segment with a given ledger seq number.
         BookKeeperClient bkc = getBookKeeperClient(dlm);
         LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), 
conf.getWriteQuorumSize(),
@@ -425,14 +426,14 @@ public class DLMTestUtil {
         long txid = startTxID;
         DLSN wrongDLSN = null;
         for (long j = 1; j <= segmentSize; j++) {
-            DLSN dlsn = 
Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(txid++)));
+            DLSN dlsn = 
Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(txid++)));
             if (j == (segmentSize - 1)) {
                 wrongDLSN = dlsn;
             }
         }
         assertNotNull(wrongDLSN);
         if (recordWrongLastDLSN) {
-            FutureUtils.result(writer.asyncClose());
+            Utils.ioResult(writer.asyncClose());
             writeHandler.completeAndCloseLogSegment(
                     writeHandler.inprogressZNodeName(writer.getLogSegmentId(), 
writer.getStartTxId(), writer.getLogSegmentSequenceNumber()),
                     writer.getLogSegmentSequenceNumber(),
@@ -443,9 +444,9 @@ public class DLMTestUtil {
                     wrongDLSN.getEntryId(),
                     wrongDLSN.getSlotId());
         } else {
-            
FutureUtils.result(writeHandler.completeAndCloseLogSegment(writer));
+            Utils.ioResult(writeHandler.completeAndCloseLogSegment(writer));
         }
-        FutureUtils.result(writeHandler.unlockHandler());
+        Utils.ioResult(writeHandler.unlockHandler());
     }
 
     public static void updateSegmentMetadata(ZooKeeperClient zkc, 
LogSegmentMetadata segment) throws Exception {
@@ -469,18 +470,18 @@ public class DLMTestUtil {
         return conf;
     }
 
-    public static <T> void validateFutureFailed(Future<T> future, Class 
exClass) {
+    public static <T> void validateFutureFailed(CompletableFuture<T> future, 
Class exClass) {
         try {
-            Await.result(future);
+            Utils.ioResult(future);
         } catch (Exception ex) {
             LOG.info("Expected: {} Actual: {}", exClass.getName(), 
ex.getClass().getName());
             assertTrue("exceptions types equal", exClass.isInstance(ex));
         }
     }
 
-    public static <T> T validateFutureSucceededAndGetResult(Future<T> future) 
throws Exception {
+    public static <T> T 
validateFutureSucceededAndGetResult(CompletableFuture<T> future) throws 
Exception {
         try {
-            return Await.result(future, Duration.fromSeconds(10));
+            return Utils.ioResult(future, 10, TimeUnit.SECONDS);
         } catch (Exception ex) {
             fail("unexpected exception " + ex.getClass().getName());
             throw ex;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java
index 2dbef02..126d337 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/NonBlockingReadsTestUtil.java
@@ -17,10 +17,11 @@
  */
 package org.apache.distributedlog;
 
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.exceptions.LogReadException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,19 +124,19 @@ class NonBlockingReadsTestUtil {
         for (long i = 0; i < 3; i++) {
             BKAsyncLogWriter writer = (BKAsyncLogWriter) 
dlm.startAsyncLogSegmentNonPartitioned();
             for (long j = 1; j < segmentSize; j++) {
-                
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+                
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
             }
             if (recover) {
-                
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+                
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
                 TimeUnit.MILLISECONDS.sleep(300);
                 writer.abort();
                 LOG.debug("Recovering Segments");
                 BKLogWriteHandler blplm = ((BKDistributedLogManager) 
(dlm)).createWriteHandler(true);
-                FutureUtils.result(blplm.recoverIncompleteLogSegments());
-                FutureUtils.result(blplm.asyncClose());
+                Utils.ioResult(blplm.recoverIncompleteLogSegments());
+                Utils.ioResult(blplm.asyncClose());
                 LOG.debug("Recovered Segments");
             } else {
-                
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+                
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
                 writer.closeAndComplete();
             }
             TimeUnit.MILLISECONDS.sleep(300);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java
index 922d89e..ae77522 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamReader.java
@@ -17,17 +17,14 @@
  */
 package org.apache.distributedlog;
 
-import java.io.ByteArrayInputStream;
-import java.net.URI;
 import java.util.Arrays;
 
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
 import org.apache.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java
index b5498ba..fc1f241 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAppendOnlyStreamWriter.java
@@ -20,8 +20,11 @@ package org.apache.distributedlog;
 import java.io.ByteArrayInputStream;
 import java.net.URI;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.exceptions.BKTransmitException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.Utils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -29,9 +32,6 @@ import org.junit.rules.TestName;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.util.FailpointUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,14 +90,14 @@ public class TestAppendOnlyStreamWriter extends 
TestDistributedLogBase {
         // happen very quickly. But we can test that the mechanics of the 
future write and api are basically
         // correct.
         AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter();
-        Future<DLSN> dlsnFuture = writer.write(DLMTestUtil.repeatString("abc", 
11).getBytes());
+        CompletableFuture<DLSN> dlsnFuture = 
writer.write(DLMTestUtil.repeatString("abc", 11).getBytes());
 
         // The real problem is the fsync completes before writes are 
submitted, so it never takes effect.
         Thread.sleep(1000);
-        assertFalse(dlsnFuture.isDefined());
+        assertFalse(dlsnFuture.isDone());
         writer.force(false);
         // Must not throw.
-        Await.result(dlsnFuture, Duration.fromSeconds(5));
+        Utils.ioResult(dlsnFuture, 5, TimeUnit.SECONDS);
         writer.close();
         dlmwriter.close();
 
@@ -124,11 +124,11 @@ public class TestAppendOnlyStreamWriter extends 
TestDistributedLogBase {
         // happen very quickly. But we can test that the mechanics of the 
future write and api are basically
         // correct.
         AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter();
-        Future<DLSN> dlsnFuture = writer.write(byteStream);
+        CompletableFuture<DLSN> dlsnFuture = writer.write(byteStream);
         Thread.sleep(100);
 
         // Write hasn't been persisted, position better not be updated.
-        assertFalse(dlsnFuture.isDefined());
+        assertFalse(dlsnFuture.isDone());
         assertEquals(0, writer.position());
         writer.force(false);
         // Position guaranteed to be accurate after writer.force().
@@ -167,7 +167,7 @@ public class TestAppendOnlyStreamWriter extends 
TestDistributedLogBase {
         // Much much less than the flush time, small enough not to slow down 
tests too much, just
         // gives a little more confidence.
         Thread.sleep(500);
-        Future<DLSN> dlsnFuture = writer.write(byteStream);
+        CompletableFuture<DLSN> dlsnFuture = writer.write(byteStream);
         assertEquals(0, writer.position());
 
         writer.close();
@@ -188,7 +188,7 @@ public class TestAppendOnlyStreamWriter extends 
TestDistributedLogBase {
         AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter();
         assertEquals(0, writer.position());
 
-        Await.result(writer.write(byteStream));
+        Utils.ioResult(writer.write(byteStream));
         Thread.sleep(100); // let WriteCompleteListener have time to run
         assertEquals(33, writer.position());
 
@@ -205,12 +205,12 @@ public class TestAppendOnlyStreamWriter extends 
TestDistributedLogBase {
         BKDistributedLogManager dlm = (BKDistributedLogManager) 
createNewDLM(conf, name);
 
         URI uri = createDLMURI("/" + name);
-        FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, 
true, true));
+        Utils.ioResult(dlm.getWriterMetadataStore().getLog(uri, name, true, 
true));
 
         // Log exists but is empty, better not throw.
         AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
         byte[] byteStream = DLMTestUtil.repeatString("a", 1025).getBytes();
-        Await.result(writer.write(byteStream));
+        Utils.ioResult(writer.write(byteStream));
 
         writer.close();
         dlm.close();
@@ -266,7 +266,7 @@ public class TestAppendOnlyStreamWriter extends 
TestDistributedLogBase {
         BKDistributedLogManager dlm = (BKDistributedLogManager) 
createNewDLM(conf, name);
 
         URI uri = createDLMURI("/" + name);
-        FutureUtils.result(dlm.getWriterMetadataStore().getLog(uri, name, 
true, true));
+        Utils.ioResult(dlm.getWriterMetadataStore().getLog(uri, name, true, 
true));
 
         // Log exists but is empty, better not throw.
         AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java
index 139d935..6efd0c1 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncBulkWrite.java
@@ -17,14 +17,14 @@
  */
 package org.apache.distributedlog;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.exceptions.LogRecordTooLongException;
 import org.apache.distributedlog.exceptions.WriteCancelledException;
 import org.apache.distributedlog.exceptions.WriteException;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
+import org.apache.distributedlog.util.Utils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -83,8 +83,8 @@ public class TestAsyncBulkWrite extends 
TestDistributedLogBase {
         records.add(DLMTestUtil.getLogRecordInstance(goodRecs, 
MAX_LOGRECORD_SIZE + 1));
         records.addAll(DLMTestUtil.getLargeLogRecordInstanceList(1, goodRecs));
 
-        Future<List<Future<DLSN>>> futureResults = writer.writeBulk(records);
-        List<Future<DLSN>> results = 
validateFutureSucceededAndGetResult(futureResults);
+        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = 
writer.writeBulk(records);
+        List<CompletableFuture<DLSN>> results = 
validateFutureSucceededAndGetResult(futureResults);
 
         // One future returned for each write.
         assertEquals(2*goodRecs + 1, results.size());
@@ -160,14 +160,14 @@ public class TestAsyncBulkWrite extends 
TestDistributedLogBase {
         // Write one record larger than max seg size. Ledger doesn't roll 
until next write.
         int txid = 1;
         LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048);
-        Future<DLSN> result = writer.write(record);
+        CompletableFuture<DLSN> result = writer.write(record);
         DLSN dlsn = validateFutureSucceededAndGetResult(result);
         assertEquals(1, dlsn.getLogSegmentSequenceNo());
 
         // Write two more via bulk. Ledger doesn't roll because there's a 
partial failure.
         List<LogRecord> records = null;
-        Future<List<Future<DLSN>>> futureResults = null;
-        List<Future<DLSN>> results = null;
+        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = null;
+        List<CompletableFuture<DLSN>> results = null;
         records = new ArrayList<LogRecord>(2);
         records.add(DLMTestUtil.getLogRecordInstance(txid++, 2048));
         records.add(DLMTestUtil.getLogRecordInstance(txid++, 
MAX_LOGRECORD_SIZE + 1));
@@ -309,15 +309,15 @@ public class TestAsyncBulkWrite extends 
TestDistributedLogBase {
                                    long txIndex) throws Exception {
 
         List<LogRecord> records = 
DLMTestUtil.getLogRecordInstanceList(txIndex, batchSize, recSize);
-        Future<List<Future<DLSN>>> futureResults = writer.writeBulk(records);
+        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = 
writer.writeBulk(records);
         assertNotNull(futureResults);
-        List<Future<DLSN>> results = Await.result(futureResults, 
Duration.fromSeconds(10));
+        List<CompletableFuture<DLSN>> results = Utils.ioResult(futureResults, 
10, TimeUnit.SECONDS);
         assertNotNull(results);
         assertEquals(results.size(), records.size());
         long prevEntryId = 0;
         DLSN lastDlsn = null;
-        for (Future<DLSN> result : results) {
-            DLSN dlsn = Await.result(result, Duration.fromSeconds(10));
+        for (CompletableFuture<DLSN> result : results) {
+            DLSN dlsn = Utils.ioResult(result, 10, TimeUnit.SECONDS);
             lastDlsn = dlsn;
 
             // If we cross a transmission boundary, slot id gets reset.
@@ -338,12 +338,12 @@ public class TestAsyncBulkWrite extends 
TestDistributedLogBase {
                                             long txIndex) throws Exception {
 
         List<LogRecord> records = 
DLMTestUtil.getLogRecordInstanceList(txIndex, batchSize, recSize);
-        Future<List<Future<DLSN>>> futureResults = writer.writeBulk(records);
+        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = 
writer.writeBulk(records);
         assertNotNull(futureResults);
-        List<Future<DLSN>> results = Await.result(futureResults, 
Duration.fromSeconds(10));
+        List<CompletableFuture<DLSN>> results = Utils.ioResult(futureResults, 
10, TimeUnit.SECONDS);
         assertNotNull(results);
         assertEquals(results.size(), records.size());
-        for (Future<DLSN> result : results) {
+        for (CompletableFuture<DLSN> result : results) {
             validateFutureFailed(result, IOException.class);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java
index adceaf9..62ac5ef 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderLock.java
@@ -17,36 +17,34 @@
  */
 package org.apache.distributedlog;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.exceptions.LockCancelledException;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.lock.LockClosedException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.namespace.NamespaceDriver;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Await;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -76,9 +74,9 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         writer.write(DLMTestUtil.getLogRecordInstance(1L));
         writer.closeAndComplete();
 
-        Future<AsyncLogReader> futureReader1 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        BKAsyncLogReader reader1 = (BKAsyncLogReader) 
Await.result(futureReader1);
-        LogRecordWithDLSN record = Await.result(reader1.readNext());
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        BKAsyncLogReader reader1 = (BKAsyncLogReader) 
Utils.ioResult(futureReader1);
+        LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
         assertEquals(1L, record.getTransactionId());
         assertEquals(0L, record.getSequenceId());
         DLMTestUtil.verifyLogRecord(record);
@@ -89,9 +87,9 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         // simulate a old stream created without readlock path
         NamespaceDriver driver = dlm.getNamespaceDriver();
         ((BKNamespaceDriver) driver).getWriterZKC().get().delete(readLockPath, 
-1);
-        Future<AsyncLogReader> futureReader2 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        AsyncLogReader reader2 = Await.result(futureReader2);
-        record = Await.result(reader2.readNext());
+        CompletableFuture<AsyncLogReader> futureReader2 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        AsyncLogReader reader2 = Utils.ioResult(futureReader2);
+        record = Utils.ioResult(reader2.readNext());
         assertEquals(1L, record.getTransactionId());
         assertEquals(0L, record.getSequenceId());
         DLMTestUtil.verifyLogRecord(record);
@@ -107,19 +105,14 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
 
         final CountDownLatch latch = new CountDownLatch(1);
 
-        Future<AsyncLogReader> futureReader1 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        futureReader1.flatMap(new ExceptionalFunction<AsyncLogReader, 
Future<Void>>() {
-            @Override
-            public Future<Void> applyE(AsyncLogReader reader) throws 
IOException {
-                return reader.asyncClose().map(new AbstractFunction1<Void, 
Void>() {
-                    @Override
-                    public Void apply(Void result) {
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        futureReader1
+            .thenCompose(
+                reader -> reader.asyncClose()
+                    .thenApply(result -> {
                         latch.countDown();
                         return null;
-                    }
-                });
-            }
-        });
+                    }));
 
         latch.await();
         dlm.close();
@@ -133,8 +126,8 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         writer.write(DLMTestUtil.getLogRecordInstance(1L));
         writer.closeAndComplete();
 
-        Future<AsyncLogReader> futureReader1 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        AsyncLogReader reader1 = Await.result(futureReader1);
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
         reader1.readNext();
 
         final CountDownLatch acquiredLatch = new CountDownLatch(1);
@@ -142,12 +135,12 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         Thread acquireThread = new Thread(new Runnable() {
             @Override
             public void run() {
-                Future<AsyncLogReader> futureReader2 = null;
+                CompletableFuture<AsyncLogReader> futureReader2 = null;
                 DistributedLogManager dlm2 = null;
                 try {
                     dlm2 = createNewDLM(conf, name);
                     futureReader2 = 
dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-                    AsyncLogReader reader2 = Await.result(futureReader2);
+                    AsyncLogReader reader2 = Utils.ioResult(futureReader2);
                     acquired.set(true);
                     acquiredLatch.countDown();
                 } catch (Exception ex) {
@@ -172,10 +165,10 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         dlm.close();
     }
 
-    int countDefined(ArrayList<Future<AsyncLogReader>> readers) {
+    int countDefined(ArrayList<CompletableFuture<AsyncLogReader>> readers) {
         int done = 0;
-        for (Future<AsyncLogReader> futureReader : readers) {
-            if (futureReader.isDefined()) {
+        for (CompletableFuture<AsyncLogReader> futureReader : readers) {
+            if (futureReader.isDone()) {
                 done++;
             }
         }
@@ -193,7 +186,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
 
         int count = 5;
         final CountDownLatch acquiredLatch = new CountDownLatch(count);
-        final ArrayList<Future<AsyncLogReader>> readers = new 
ArrayList<Future<AsyncLogReader>>(count);
+        final ArrayList<CompletableFuture<AsyncLogReader>> readers = new 
ArrayList<CompletableFuture<AsyncLogReader>>(count);
         for (int i = 0; i < count; i++) {
             readers.add(null);
         }
@@ -201,7 +194,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         for (int i = 0; i < count; i++) {
             dlms[i] = createNewDLM(conf, name);
             readers.set(i, 
dlms[i].getAsyncLogReaderWithLock(DLSN.InitialDLSN));
-            readers.get(i).addEventListener(new 
FutureEventListener<AsyncLogReader>() {
+            readers.get(i).whenComplete(new 
FutureEventListener<AsyncLogReader>() {
                 @Override
                 public void onSuccess(AsyncLogReader reader) {
                     acquiredLatch.countDown();
@@ -232,16 +225,17 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         writer.closeAndComplete();
 
         DistributedLogManager dlm1 = createNewDLM(conf, name);
-        Future<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        AsyncLogReader reader1 = Await.result(futureReader1);
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
 
         BKDistributedLogManager dlm2 = (BKDistributedLogManager) 
createNewDLM(conf, name);
-        Future<AsyncLogReader> futureReader2 = 
dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        CompletableFuture<AsyncLogReader> futureReader2 = 
dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
 
         dlm2.close();
         try {
-            Await.result(futureReader2);
+            Utils.ioResult(futureReader2);
             fail("should have thrown exception!");
+        } catch (CancellationException ce) {
         } catch (LockClosedException ex) {
         } catch (LockCancelledException ex) {
         }
@@ -256,7 +250,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         String name = runtime.getMethodName();
         URI uri = createDLMURI("/" + name);
         ensureURICreated(uri);
-        DistributedLogNamespace ns0 = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace ns0 = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
@@ -266,13 +260,13 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         writer.write(DLMTestUtil.getLogRecordInstance(2L));
         writer.closeAndComplete();
 
-        DistributedLogNamespace ns1 = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace ns1 = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
         DistributedLogManager dlm1 = ns1.openLog(name);
-        Future<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        AsyncLogReader reader1 = Await.result(futureReader1);
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
         ZooKeeperClientUtils.expireSession(((BKNamespaceDriver) 
ns1.getNamespaceDriver()).getWriterZKC(), zkServers, 1000);
 
         // The result of expireSession is somewhat non-deterministic with this 
lock.
@@ -280,12 +274,12 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         // the moment rather than make it deterministic we accept either 
result.
         boolean success = false;
         try {
-            Await.result(reader1.readNext());
+            Utils.ioResult(reader1.readNext());
             success = true;
         } catch (LockingException ex) {
         }
         if (success) {
-            Await.result(reader1.readNext());
+            Utils.ioResult(reader1.readNext());
         }
 
         Utils.close(reader1);
@@ -305,15 +299,16 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         writer.closeAndComplete();
 
         DistributedLogManager dlm1 = createNewDLM(conf, name);
-        Future<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        AsyncLogReader reader1 = Await.result(futureReader1);
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
 
         DistributedLogManager dlm2 = createNewDLM(conf, name);
-        Future<AsyncLogReader> futureReader2 = 
dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        CompletableFuture<AsyncLogReader> futureReader2 = 
dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
         try {
-            FutureUtils.cancel(futureReader2);
-            Await.result(futureReader2);
+            futureReader2.cancel(true);
+            Utils.ioResult(futureReader2);
             fail("Should fail getting log reader as it is cancelled");
+        } catch (CancellationException ce) {
         } catch (LockClosedException ex) {
         } catch (LockCancelledException ex) {
         } catch (OwnershipAcquireFailedException oafe) {
@@ -322,7 +317,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         futureReader2 = dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
         Utils.close(reader1);
 
-        Await.result(futureReader2);
+        Utils.ioResult(futureReader2);
 
         dlm0.close();
         dlm1.close();
@@ -339,13 +334,13 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         writer.closeAndComplete();
 
         DistributedLogManager dlm1 = createNewDLM(conf, name);
-        Future<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
 
         // Must not throw or cancel or do anything bad, future already 
completed.
-        Await.result(futureReader1);
-        FutureUtils.cancel(futureReader1);
-        AsyncLogReader reader1 = Await.result(futureReader1);
-        Await.result(reader1.readNext());
+        Utils.ioResult(futureReader1);
+        futureReader1.cancel(true);
+        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
+        Utils.ioResult(reader1.readNext());
 
         dlm0.close();
         dlm1.close();
@@ -361,13 +356,13 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         writer.closeAndComplete();
 
         DistributedLogManager dlm1 = createNewDLM(conf, name);
-        Future<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        Future<AsyncLogReader> futureReader2 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        CompletableFuture<AsyncLogReader> futureReader2 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
 
         // Both use the same client id, so there's no lock conflict. Not 
necessarily ideal, but how the
         // system currently works.
-        Await.result(futureReader1);
-        Await.result(futureReader2);
+        Utils.ioResult(futureReader1);
+        Utils.ioResult(futureReader2);
 
         dlm0.close();
         dlm1.close();
@@ -413,7 +408,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         private void readEntries(AsyncLogReader reader) {
             try {
                 for (int i = 0; i < 300; i++) {
-                    LogRecordWithDLSN record = Await.result(reader.readNext());
+                    LogRecordWithDLSN record = 
Utils.ioResult(reader.readNext());
                     currentDLSN.set(record.getDlsn());
                 }
             } catch (Exception ex) {
@@ -446,7 +441,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         localConf.setNumWorkerThreads(2);
         localConf.setLockTimeout(Long.MAX_VALUE);
 
-        DistributedLogNamespace namespace = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(localConf).uri(uri).clientId("main").build();
 
         DistributedLogManager dlm0 = namespace.openLog(name);
@@ -457,27 +452,27 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         AtomicReference<DLSN> currentDLSN = new 
AtomicReference<DLSN>(DLSN.InitialDLSN);
 
         String clientId1 = "reader1";
-        DistributedLogNamespace namespace1 = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace1 = NamespaceBuilder.newBuilder()
                 .conf(localConf).uri(uri).clientId(clientId1).build();
         DistributedLogManager dlm1 = namespace1.openLog(name);
         String clientId2 = "reader2";
-        DistributedLogNamespace namespace2 = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace2 = NamespaceBuilder.newBuilder()
                 .conf(localConf).uri(uri).clientId(clientId2).build();
         DistributedLogManager dlm2 = namespace2.openLog(name);
         String clientId3 = "reader3";
-        DistributedLogNamespace namespace3 = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace3 = NamespaceBuilder.newBuilder()
                 .conf(localConf).uri(uri).clientId(clientId3).build();
         DistributedLogManager dlm3 = namespace3.openLog(name);
 
         LOG.info("{} is opening reader on stream {}", clientId1, name);
-        Future<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
-        AsyncLogReader reader1 = Await.result(futureReader1);
+        CompletableFuture<AsyncLogReader> futureReader1 = 
dlm1.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        AsyncLogReader reader1 = Utils.ioResult(futureReader1);
         LOG.info("{} opened reader on stream {}", clientId1, name);
 
         LOG.info("{} is opening reader on stream {}", clientId2, name);
-        Future<AsyncLogReader> futureReader2 = 
dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        CompletableFuture<AsyncLogReader> futureReader2 = 
dlm2.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
         LOG.info("{} is opening reader on stream {}", clientId3, name);
-        Future<AsyncLogReader> futureReader3 = 
dlm3.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
+        CompletableFuture<AsyncLogReader> futureReader3 = 
dlm3.getAsyncLogReaderWithLock(DLSN.InitialDLSN);
 
         ExecutorService executorService = Executors.newCachedThreadPool();
 
@@ -485,26 +480,26 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
                 new ReadRecordsListener(currentDLSN, clientId2, 
executorService);
         ReadRecordsListener listener3 =
                 new ReadRecordsListener(currentDLSN, clientId3, 
executorService);
-        futureReader2.addEventListener(listener2);
-        futureReader3.addEventListener(listener3);
+        futureReader2.whenComplete(listener2);
+        futureReader3.whenComplete(listener3);
 
         // Get reader1 and start reading.
         for ( ; recordCount < 200; recordCount++) {
-            LogRecordWithDLSN record = Await.result(reader1.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
             currentDLSN.set(record.getDlsn());
         }
 
         // Take a break, reader2 decides to stop waiting and cancels.
         Thread.sleep(1000);
         assertFalse(listener2.done());
-        FutureUtils.cancel(futureReader2);
+        futureReader2.cancel(true);
         listener2.getLatch().await();
         assertTrue(listener2.done());
         assertTrue(listener2.failed());
 
         // Reader1 starts reading again.
         for (; recordCount < 300; recordCount++) {
-            LogRecordWithDLSN record = Await.result(reader1.readNext());
+            LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
             currentDLSN.set(record.getDlsn());
         }
 
@@ -519,12 +514,12 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         assertEquals(new DLSN(3, 99, 0), currentDLSN.get());
 
         try {
-            Await.result(futureReader2);
+            Utils.ioResult(futureReader2);
         } catch (Exception ex) {
             // Can't get this one to close it--the dlm will take care of it.
         }
 
-        Utils.close(Await.result(futureReader3));
+        Utils.close(Utils.ioResult(futureReader3));
 
         dlm1.close();
         namespace1.close();
@@ -553,7 +548,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
         for (long i = 0; i < 3; i++) {
             BKAsyncLogWriter writer = (BKAsyncLogWriter) 
dlm.startAsyncLogSegmentNonPartitioned();
             for (long j = 1; j <= 10; j++) {
-                DLSN dlsn = 
Await.result(writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++)));
+                DLSN dlsn = 
Utils.ioResult(writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++)));
                 if (i == 1 && j == 1L) {
                     readDLSN = dlsn;
                 }
@@ -561,10 +556,10 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
             writer.closeAndComplete();
         }
 
-        BKAsyncLogReader reader0 = (BKAsyncLogReader) 
Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
+        BKAsyncLogReader reader0 = (BKAsyncLogReader) 
Utils.ioResult(dlm.getAsyncLogReaderWithLock(subscriberId));
         assertEquals(DLSN.NonInclusiveLowerBound, reader0.getStartDLSN());
         long numTxns = 0;
-        LogRecordWithDLSN record = Await.result(reader0.readNext());
+        LogRecordWithDLSN record = Utils.ioResult(reader0.readNext());
         while (null != record) {
             DLMTestUtil.verifyEmptyLogRecord(record);
             ++numTxns;
@@ -574,18 +569,18 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
             if (txid - 1 == numTxns) {
                 break;
             }
-            record = Await.result(reader0.readNext());
+            record = Utils.ioResult(reader0.readNext());
         }
         assertEquals(txid - 1, numTxns);
         Utils.close(reader0);
 
         SubscriptionsStore subscriptionsStore = dlm.getSubscriptionsStore();
-        Await.result(subscriptionsStore.advanceCommitPosition(subscriberId, 
readDLSN));
-        BKAsyncLogReader reader1 = (BKAsyncLogReader) 
Await.result(dlm.getAsyncLogReaderWithLock(subscriberId));
+        Utils.ioResult(subscriptionsStore.advanceCommitPosition(subscriberId, 
readDLSN));
+        BKAsyncLogReader reader1 = (BKAsyncLogReader) 
Utils.ioResult(dlm.getAsyncLogReaderWithLock(subscriberId));
         assertEquals(readDLSN, reader1.getStartDLSN());
         numTxns = 0;
         long startTxID =  10L;
-        record = Await.result(reader1.readNext());
+        record = Utils.ioResult(reader1.readNext());
         while (null != record) {
             DLMTestUtil.verifyEmptyLogRecord(record);
             ++numTxns;
@@ -596,7 +591,7 @@ public class TestAsyncReaderLock extends 
TestDistributedLogBase {
             if (startTxID == txid - 1) {
                 break;
             }
-            record = Await.result(reader1.readNext());
+            record = Utils.ioResult(reader1.readNext());
         }
         assertEquals(txid - 1, startTxID);
         assertEquals(20, numTxns);

Reply via email to