http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index eb81900..721bfbf 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog.impl;
 
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LogSegmentMetadata;
@@ -30,15 +31,13 @@ import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.metadata.LogMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -137,7 +136,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> createTxn = lsmStore.transaction();
         lsmStore.createLogSegment(createTxn, segment, null);
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
@@ -145,7 +144,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> createTxn2 = lsmStore.transaction();
         lsmStore.createLogSegment(createTxn2, segment2, null);
         try {
-            FutureUtils.result(createTxn2.execute());
+            Utils.ioResult(createTxn2.execute());
             fail("Should fail if log segment exists");
         } catch (Throwable t) {
             // expected
@@ -162,13 +161,13 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> createTxn = lsmStore.transaction();
         lsmStore.createLogSegment(createTxn, segment, null);
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
         Transaction<Object> deleteTxn = lsmStore.transaction();
         lsmStore.deleteLogSegment(deleteTxn, segment, null);
-        FutureUtils.result(deleteTxn.execute());
+        Utils.ioResult(deleteTxn.execute());
         assertNull("LogSegment " + segment + " should be deleted",
                 zkc.get().exists(segment.getZkPath(), false));
     }
@@ -179,7 +178,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> deleteTxn = lsmStore.transaction();
         lsmStore.deleteLogSegment(deleteTxn, segment, null);
         try {
-            FutureUtils.result(deleteTxn.execute());
+            Utils.ioResult(deleteTxn.execute());
             fail("Should fail deletion if log segment doesn't exist");
         } catch (Throwable t) {
             assertTrue("Should throw NoNodeException if log segment doesn't 
exist",
@@ -196,7 +195,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         lsmStore.updateLogSegment(updateTxn, segment);
         try {
-            FutureUtils.result(updateTxn.execute());
+            Utils.ioResult(updateTxn.execute());
             fail("Should fail update if log segment doesn't exist");
         } catch (Throwable t) {
             assertTrue("Should throw NoNodeException if log segment doesn't 
exist",
@@ -212,17 +211,17 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         LogSegmentMetadata segment = createLogSegment(1L, 99L);
         Transaction<Object> createTxn = lsmStore.transaction();
         lsmStore.createLogSegment(createTxn, segment, null);
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
         LogSegmentMetadata modifiedSegment = createLogSegment(1L, 999L);
         Transaction<Object> updateTxn = lsmStore.transaction();
         lsmStore.updateLogSegment(updateTxn, modifiedSegment);
-        FutureUtils.result(updateTxn.execute());
+        Utils.ioResult(updateTxn.execute());
         // the log segment should be updated
         LogSegmentMetadata readSegment =
-                FutureUtils.result(LogSegmentMetadata.read(zkc, 
segment.getZkPath(), true));
+                Utils.ioResult(LogSegmentMetadata.read(zkc, 
segment.getZkPath(), true));
         assertEquals("Last entry id should be changed from 99L to 999L",
                 999L, readSegment.getLastEntryId());
     }
@@ -234,7 +233,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         // create log segment 1
         Transaction<Object> createTxn = lsmStore.transaction();
         lsmStore.createLogSegment(createTxn, segment1, null);
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment1 + " should be created",
                 zkc.get().exists(segment1.getZkPath(), false));
@@ -242,7 +241,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         Transaction<Object> createDeleteTxn = lsmStore.transaction();
         lsmStore.createLogSegment(createDeleteTxn, segment2, null);
         lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
-        FutureUtils.result(createDeleteTxn.execute());
+        Utils.ioResult(createDeleteTxn.execute());
         // segment 1 should be deleted, segment 2 should be created
         assertNull("LogSegment " + segment1 + " should be deleted",
                 zkc.get().exists(segment1.getZkPath(), false));
@@ -258,7 +257,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         // create log segment 1
         Transaction<Object> createTxn = lsmStore.transaction();
         lsmStore.createLogSegment(createTxn, segment1, null);
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment1 + " should be created",
                 zkc.get().exists(segment1.getZkPath(), false));
@@ -268,7 +267,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         lsmStore.deleteLogSegment(createDeleteTxn, segment2, null);
         lsmStore.createLogSegment(createDeleteTxn, segment3, null);
         try {
-            FutureUtils.result(createDeleteTxn.execute());
+            Utils.ioResult(createDeleteTxn.execute());
             fail("Should fail transaction if one operation failed");
         } catch (Throwable t) {
             assertTrue("Transaction is aborted",
@@ -290,12 +289,12 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         LogSegmentMetadata segment = createLogSegment(1L, 99L);
         Transaction<Object> createTxn = lsmStore.transaction();
         lsmStore.createLogSegment(createTxn, segment, null);
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
         LogSegmentMetadata readSegment =
-                
FutureUtils.result(lsmStore.getLogSegment(segment.getZkPath()));
+                Utils.ioResult(lsmStore.getLogSegment(segment.getZkPath()));
         assertEquals("Log segment should match",
                 segment, readSegment);
     }
@@ -309,24 +308,24 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             createdSegments.add(segment);
             lsmStore.createLogSegment(createTxn, segment, null);
         }
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
         List<String> children = zkc.get().getChildren(rootPath, false);
         Collections.sort(children);
         assertEquals("Should find 10 log segments",
                 10, children.size());
         List<String> logSegmentNames =
-                FutureUtils.result(lsmStore.getLogSegmentNames(rootPath, 
null)).getValue();
+                Utils.ioResult(lsmStore.getLogSegmentNames(rootPath, 
null)).getValue();
         Collections.sort(logSegmentNames);
         assertEquals("Should find 10 log segments",
                 10, logSegmentNames.size());
         assertEquals(children, logSegmentNames);
-        List<Future<LogSegmentMetadata>> getFutures = 
Lists.newArrayListWithExpectedSize(10);
+        List<CompletableFuture<LogSegmentMetadata>> getFutures = 
Lists.newArrayListWithExpectedSize(10);
         for (int i = 0; i < 10; i++) {
             getFutures.add(lsmStore.getLogSegment(rootPath + "/" + 
logSegmentNames.get(i)));
         }
         List<LogSegmentMetadata> segments =
-                FutureUtils.result(Future.collect(getFutures));
+                Utils.ioResult(FutureUtils.collect(getFutures));
         for (int i = 0; i < 10; i++) {
             assertEquals(createdSegments.get(i), segments.get(i));
         }
@@ -358,7 +357,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.createLogSegment(createTxn, segment, null);
         }
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
         List<String> children = zkc.get().getChildren(rootPath, false);
         Collections.sort(children);
@@ -399,7 +398,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.createLogSegment(anotherCreateTxn, segment, null);
         }
-        FutureUtils.result(anotherCreateTxn.execute());
+        Utils.ioResult(anotherCreateTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         logger.info("All log segments become {}", newChildren);
@@ -424,7 +423,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.createLogSegment(createTxn, segment, null);
         }
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
         List<String> children = zkc.get().getChildren(rootPath, false);
         Collections.sort(children);
@@ -464,7 +463,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.deleteLogSegment(deleteTxn, segment, null);
         }
-        FutureUtils.result(deleteTxn.execute());
+        Utils.ioResult(deleteTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         while (numNotifications.get() < 2) {
@@ -496,7 +495,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.createLogSegment(createTxn, segment, null);
         }
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
         List<String> children = zkc.get().getChildren(rootPath, false);
         Collections.sort(children);
@@ -541,7 +540,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.createLogSegment(anotherCreateTxn, segment, null);
         }
-        FutureUtils.result(anotherCreateTxn.execute());
+        Utils.ioResult(anotherCreateTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         logger.info("All log segments become {}", newChildren);
@@ -566,7 +565,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.createLogSegment(createTxn, segment, null);
         }
-        FutureUtils.result(createTxn.execute());
+        Utils.ioResult(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
         List<String> children = zkc.get().getChildren(rootPath, false);
         Collections.sort(children);
@@ -607,7 +606,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
             LogSegmentMetadata segment = createLogSegment(i);
             lsmStore.deleteLogSegment(deleteTxn, segment, null);
         }
-        FutureUtils.result(deleteTxn.execute());
+        Utils.ioResult(deleteTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         while (numNotifications.get() < 2) {
@@ -636,23 +635,23 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     public void testStoreMaxLogSegmentSequenceNumber() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
-        final Promise<Version> result = new Promise<Version>();
+        final CompletableFuture<Version> result = new 
CompletableFuture<Version>();
         LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version r) {
-                result.setValue(r);
+                result.complete(r);
             }
 
             @Override
             public void onAbort(Throwable t) {
-                result.setException(t);
+                result.completeExceptionally(t);
             }
         });
-        FutureUtils.result(updateTxn.execute());
-        assertEquals(1, ((ZkVersion) 
FutureUtils.result(result)).getZnodeVersion());
+        Utils.ioResult(updateTxn.execute());
+        assertEquals(1, ((ZkVersion) 
Utils.ioResult(result)).getZnodeVersion());
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(rootZkPath, false, stat);
         assertEquals(999L, DLUtils.deserializeLogSegmentSequenceNumber(data));
@@ -663,32 +662,32 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     public void testStoreMaxLogSegmentSequenceNumberBadVersion() throws 
Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
-        final Promise<Version> result = new Promise<Version>();
+        final CompletableFuture<Version> result = new 
CompletableFuture<Version>();
         LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
-                        result.setValue(r);
+                        result.complete(r);
                     }
 
                     @Override
                     public void onAbort(Throwable t) {
-                        result.setException(t);
+                        result.completeExceptionally(t);
                     }
                 });
         try {
-            FutureUtils.result(updateTxn.execute());
+            Utils.ioResult(updateTxn.execute());
             fail("Should fail on storing log segment sequence number if 
providing bad version");
         } catch (ZKException zke) {
             assertEquals(KeeperException.Code.BADVERSION, 
zke.getKeeperExceptionCode());
         }
         try {
-            Await.result(result);
+            Utils.ioResult(result);
             fail("Should fail on storing log segment sequence number if 
providing bad version");
-        } catch (KeeperException ke) {
-            assertEquals(KeeperException.Code.BADVERSION, ke.code());
+        } catch (ZKException ze) {
+            assertEquals(KeeperException.Code.BADVERSION, 
ze.getKeeperExceptionCode());
         }
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(rootZkPath, false, stat);
@@ -700,7 +699,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     public void testStoreMaxLogSegmentSequenceNumberOnNonExistentPath() throws 
Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
-        final Promise<Version> result = new Promise<Version>();
+        final CompletableFuture<Version> result = new 
CompletableFuture<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
         LogMetadata metadata = mock(LogMetadata.class);
         when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath);
@@ -708,25 +707,25 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
-                        result.setValue(r);
+                        result.complete(r);
                     }
 
                     @Override
                     public void onAbort(Throwable t) {
-                        result.setException(t);
+                        result.completeExceptionally(t);
                     }
                 });
         try {
-            FutureUtils.result(updateTxn.execute());
+            Utils.ioResult(updateTxn.execute());
             fail("Should fail on storing log segment sequence number if path 
doesn't exist");
         } catch (ZKException zke) {
             assertEquals(KeeperException.Code.NONODE, 
zke.getKeeperExceptionCode());
         }
         try {
-            Await.result(result);
+            Utils.ioResult(result);
             fail("Should fail on storing log segment sequence number if path 
doesn't exist");
-        } catch (KeeperException ke) {
-            assertEquals(KeeperException.Code.NONODE, ke.code());
+        } catch (ZKException ke) {
+            assertEquals(KeeperException.Code.NONODE, 
ke.getKeeperExceptionCode());
         }
     }
 
@@ -734,23 +733,23 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     public void testStoreMaxTxnId() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
-        final Promise<Version> result = new Promise<Version>();
+        final CompletableFuture<Version> result = new 
CompletableFuture<Version>();
         LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version r) {
-                result.setValue(r);
+                result.complete(r);
             }
 
             @Override
             public void onAbort(Throwable t) {
-                result.setException(t);
+                result.completeExceptionally(t);
             }
         });
-        FutureUtils.result(updateTxn.execute());
-        assertEquals(1, ((ZkVersion) 
FutureUtils.result(result)).getZnodeVersion());
+        Utils.ioResult(updateTxn.execute());
+        assertEquals(1, ((ZkVersion) 
Utils.ioResult(result)).getZnodeVersion());
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(rootZkPath, false, stat);
         assertEquals(999L, DLUtils.deserializeTransactionId(data));
@@ -761,32 +760,32 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     public void testStoreMaxTxnIdBadVersion() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
-        final Promise<Version> result = new Promise<Version>();
+        final CompletableFuture<Version> result = new 
CompletableFuture<Version>();
         LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
         lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
-                        result.setValue(r);
+                        result.complete(r);
                     }
 
                     @Override
                     public void onAbort(Throwable t) {
-                        result.setException(t);
+                        result.completeExceptionally(t);
                     }
                 });
         try {
-            FutureUtils.result(updateTxn.execute());
+            Utils.ioResult(updateTxn.execute());
             fail("Should fail on storing log record transaction id if 
providing bad version");
         } catch (ZKException zke) {
             assertEquals(KeeperException.Code.BADVERSION, 
zke.getKeeperExceptionCode());
         }
         try {
-            Await.result(result);
+            Utils.ioResult(result);
             fail("Should fail on storing log record transaction id if 
providing bad version");
-        } catch (KeeperException ke) {
-            assertEquals(KeeperException.Code.BADVERSION, ke.code());
+        } catch (ZKException ze) {
+            assertEquals(KeeperException.Code.BADVERSION, 
ze.getKeeperExceptionCode());
         }
         Stat stat = new Stat();
         byte[] data = zkc.get().getData(rootZkPath, false, stat);
@@ -798,7 +797,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
     public void testStoreMaxTxnIdOnNonExistentPath() throws Exception {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
-        final Promise<Version> result = new Promise<Version>();
+        final CompletableFuture<Version> result = new 
CompletableFuture<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
         LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
         when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath);
@@ -806,25 +805,25 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
-                        result.setValue(r);
+                        result.complete(r);
                     }
 
                     @Override
                     public void onAbort(Throwable t) {
-                        result.setException(t);
+                        result.completeExceptionally(t);
                     }
                 });
         try {
-            FutureUtils.result(updateTxn.execute());
+            Utils.ioResult(updateTxn.execute());
             fail("Should fail on storing log record transaction id if path 
doesn't exist");
         } catch (ZKException zke) {
             assertEquals(KeeperException.Code.NONODE, 
zke.getKeeperExceptionCode());
         }
         try {
-            Await.result(result);
+            Utils.ioResult(result);
             fail("Should fail on storing log record transaction id if path 
doesn't exist");
-        } catch (KeeperException ke) {
-            assertEquals(KeeperException.Code.NONODE, ke.code());
+        } catch (ZKException ze) {
+            assertEquals(KeeperException.Code.NONODE, 
ze.getKeeperExceptionCode());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
index 3c6e77c..4bd513b 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKNamespaceWatcher.java
@@ -24,7 +24,6 @@ import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientUtils;
 import org.apache.distributedlog.callback.NamespaceListener;
-import org.apache.distributedlog.util.DLUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.zookeeper.CreateMode;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
index 9c46d96..c81eb1d 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
@@ -25,16 +25,14 @@ import 
org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.ZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClientUtils;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.exceptions.LogExistsException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.metadata.LogMetadataStore;
-import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -153,7 +151,7 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
     }
 
     private void deleteLog(String logName) throws Exception {
-        Optional<URI> logUriOptional = 
FutureUtils.result(metadataStore.getLogLocation(logName));
+        Optional<URI> logUriOptional = 
Utils.ioResult(metadataStore.getLogLocation(logName));
         assertTrue(logUriOptional.isPresent());
         URI logUri = logUriOptional.get();
         zkc.get().delete(logUri.getPath() + "/" + logName, -1);
@@ -164,12 +162,12 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
         TestNamespaceListener listener = new TestNamespaceListener();
         metadataStore.registerNamespaceListener(listener);
         String logName = "test-log-1";
-        URI logUri = FutureUtils.result(metadataStore.createLog(logName));
+        URI logUri = Utils.ioResult(metadataStore.createLog(logName));
         assertEquals(uri, logUri);
-        Optional<URI> logLocation = 
FutureUtils.result(metadataStore.getLogLocation(logName));
+        Optional<URI> logLocation = 
Utils.ioResult(metadataStore.getLogLocation(logName));
         assertTrue(logLocation.isPresent());
         assertEquals(uri, logLocation.get());
-        Optional<URI> notExistLogLocation = 
FutureUtils.result(metadataStore.getLogLocation("non-existent-log"));
+        Optional<URI> notExistLogLocation = 
Utils.ioResult(metadataStore.getLogLocation("non-existent-log"));
         assertFalse(notExistLogLocation.isPresent());
         // listener should receive notification
         listener.waitForDone();
@@ -178,7 +176,7 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
         assertEquals(logName, logsIter.next());
         assertFalse(logsIter.hasNext());
         // get logs should return the log
-        Iterator<String> newLogsIter = 
FutureUtils.result(metadataStore.getLogs());
+        Iterator<String> newLogsIter = Utils.ioResult(metadataStore.getLogs());
         assertTrue(newLogsIter.hasNext());
         assertEquals(logName, newLogsIter.next());
         assertFalse(newLogsIter.hasNext());
@@ -191,7 +189,7 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
         metadataStore.registerNamespaceListener(listener1);
         metadataStore.registerNamespaceListener(listener2);
         String logName = "test-multiple-listeners";
-        URI logUri = FutureUtils.result(metadataStore.createLog(logName));
+        URI logUri = Utils.ioResult(metadataStore.createLog(logName));
         assertEquals(uri, logUri);
         listener1.waitForDone();
         listener2.waitForDone();
@@ -220,8 +218,8 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
                 checkStore = metadataStore;
             }
             String logName = "test-create-log-" + i;
-            URI logUri = FutureUtils.result(createStore.createLog(logName));
-            Optional<URI> logLocation = 
FutureUtils.result(checkStore.getLogLocation(logName));
+            URI logUri = Utils.ioResult(createStore.createLog(logName));
+            Optional<URI> logLocation = 
Utils.ioResult(checkStore.getLogLocation(logName));
             assertTrue("Log " + logName + " doesn't exist", 
logLocation.isPresent());
             assertEquals("Different log location " + logLocation.get() + " is 
found",
                     logUri, logLocation.get());
@@ -236,10 +234,10 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
         conf.addConfiguration(baseConf);
 
         String logName = "test-log";
-        FutureUtils.result(metadataStore.createLog(logName));
+        Utils.ioResult(metadataStore.createLog(logName));
 
-        URI subNs1 = FutureUtils.result(metadataStore.createSubNamespace());
-        URI subNs2 = FutureUtils.result(metadataStore.createSubNamespace());
+        URI subNs1 = Utils.ioResult(metadataStore.createSubNamespace());
+        URI subNs2 = Utils.ioResult(metadataStore.createSubNamespace());
 
         String duplicatedLogName = "test-duplicated-logs";
         // Create same log in different sub namespaces
@@ -247,35 +245,35 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
         metadataStore.createLogInNamespaceSync(subNs2, duplicatedLogName);
 
         try {
-            FutureUtils.result(metadataStore.createLog("non-existent-log"));
+            Utils.ioResult(metadataStore.createLog("non-existent-log"));
             fail("should throw exception when duplicated log found");
         } catch (UnexpectedException ue) {
             // should throw unexpected exception
             assertTrue(metadataStore.duplicatedLogFound.get());
         }
         try {
-            FutureUtils.result(metadataStore.getLogLocation(logName));
+            Utils.ioResult(metadataStore.getLogLocation(logName));
             fail("should throw exception when duplicated log found");
         } catch (UnexpectedException ue) {
             // should throw unexpected exception
             assertTrue(metadataStore.duplicatedLogFound.get());
         }
         try {
-            
FutureUtils.result(metadataStore.getLogLocation("non-existent-log"));
+            Utils.ioResult(metadataStore.getLogLocation("non-existent-log"));
             fail("should throw exception when duplicated log found");
         } catch (UnexpectedException ue) {
             // should throw unexpected exception
             assertTrue(metadataStore.duplicatedLogFound.get());
         }
         try {
-            
FutureUtils.result(metadataStore.getLogLocation(duplicatedLogName));
+            Utils.ioResult(metadataStore.getLogLocation(duplicatedLogName));
             fail("should throw exception when duplicated log found");
         } catch (UnexpectedException ue) {
             // should throw unexpected exception
             assertTrue(metadataStore.duplicatedLogFound.get());
         }
         try {
-            FutureUtils.result(metadataStore.getLogs());
+            Utils.ioResult(metadataStore.getLogs());
             fail("should throw exception when duplicated log found");
         } catch (UnexpectedException ue) {
             // should throw unexpected exception
@@ -286,10 +284,10 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
     @Test(timeout = 60000)
     public void testGetLogLocationWhenCacheMissed() throws Exception {
         String logName = "test-get-location-when-cache-missed";
-        URI logUri = FutureUtils.result(metadataStore.createLog(logName));
+        URI logUri = Utils.ioResult(metadataStore.createLog(logName));
         assertEquals(uri, logUri);
         metadataStore.removeLogFromCache(logName);
-        Optional<URI> logLocation = 
FutureUtils.result(metadataStore.getLogLocation(logName));
+        Optional<URI> logLocation = 
Utils.ioResult(metadataStore.getLogLocation(logName));
         assertTrue(logLocation.isPresent());
         assertEquals(logUri, logLocation.get());
     }
@@ -297,25 +295,25 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
     @Test(timeout = 60000, expected = LogExistsException.class)
     public void testCreateLogWhenCacheMissed() throws Exception {
         String logName = "test-create-log-when-cache-missed";
-        URI logUri = FutureUtils.result(metadataStore.createLog(logName));
+        URI logUri = Utils.ioResult(metadataStore.createLog(logName));
         assertEquals(uri, logUri);
         metadataStore.removeLogFromCache(logName);
-        FutureUtils.result(metadataStore.createLog(logName));
+        Utils.ioResult(metadataStore.createLog(logName));
     }
 
     @Test(timeout = 60000, expected = LogExistsException.class)
     public void testCreateLogWhenLogExists() throws Exception {
         String logName = "test-create-log-when-log-exists";
-        URI logUri = FutureUtils.result(metadataStore.createLog(logName));
+        URI logUri = Utils.ioResult(metadataStore.createLog(logName));
         assertEquals(uri, logUri);
-        FutureUtils.result(metadataStore.createLog(logName));
+        Utils.ioResult(metadataStore.createLog(logName));
     }
 
     private Set<String> createLogs(int numLogs, String prefix) throws 
Exception {
         Set<String> expectedLogs = Sets.newTreeSet();
         for (int i = 0; i < numLogs; i++) {
             String logName = prefix + i;
-            FutureUtils.result(metadataStore.createLog(logName));
+            Utils.ioResult(metadataStore.createLog(logName));
             expectedLogs.add(logName);
         }
         return expectedLogs;
@@ -339,7 +337,7 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
         do {
             TimeUnit.MILLISECONDS.sleep(20);
             receivedLogs = new TreeSet<String>();
-            Iterator<String> logs = 
FutureUtils.result(metadataStore.getLogs());
+            Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
             receivedLogs.addAll(Lists.newArrayList(logs));
         } while (receivedLogs.size() < numLogs);
         assertEquals(numLogs, receivedLogs.size());
@@ -372,8 +370,8 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
 
     @Test(timeout = 60000)
     public void testCreateLogPickingFirstAvailableSubNamespace() throws 
Exception {
-        URI subNs1 = FutureUtils.result(metadataStore.createSubNamespace());
-        URI subNs2 = FutureUtils.result(metadataStore.createSubNamespace());
+        URI subNs1 = Utils.ioResult(metadataStore.createSubNamespace());
+        URI subNs2 = Utils.ioResult(metadataStore.createSubNamespace());
 
         Set<String> logs0 = createLogs(uri, maxLogsPerSubnamespace - 1, 
"test-ns0-");
         Set<String> logs1 = createLogs(subNs1, maxLogsPerSubnamespace, 
"test-ns1-");
@@ -388,7 +386,7 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
         do {
             TimeUnit.MILLISECONDS.sleep(20);
             receivedLogs = new TreeSet<String>();
-            Iterator<String> logs = 
FutureUtils.result(metadataStore.getLogs());
+            Iterator<String> logs = Utils.ioResult(metadataStore.getLogs());
             receivedLogs.addAll(Lists.newArrayList(logs));
         } while (receivedLogs.size() < 3 * maxLogsPerSubnamespace - 1);
 
@@ -396,19 +394,19 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
                 new TestNamespaceListenerWithExpectedSize(3 * 
maxLogsPerSubnamespace + 1);
         metadataStore.registerNamespaceListener(listener);
 
-        Set<URI> uris = 
FutureUtils.result(metadataStore.fetchSubNamespaces(null));
+        Set<URI> uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null));
         assertEquals(3, uris.size());
         String testLogName = "test-pick-first-available-ns";
-        URI createdURI = 
FutureUtils.result(metadataStore.createLog(testLogName));
+        URI createdURI = Utils.ioResult(metadataStore.createLog(testLogName));
         allLogs.add(testLogName);
         assertEquals(uri, createdURI);
-        uris = FutureUtils.result(metadataStore.fetchSubNamespaces(null));
+        uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null));
         assertEquals(3, uris.size());
         testLogName = "test-create-new-ns";
-        URI newURI = FutureUtils.result(metadataStore.createLog(testLogName));
+        URI newURI = Utils.ioResult(metadataStore.createLog(testLogName));
         allLogs.add(testLogName);
         assertFalse(uris.contains(newURI));
-        uris = FutureUtils.result(metadataStore.fetchSubNamespaces(null));
+        uris = Utils.ioResult(metadataStore.fetchSubNamespaces(null));
         assertEquals(4, uris.size());
 
         listener.waitForDone();
@@ -435,7 +433,7 @@ public class TestFederatedZKLogMetadataStore extends 
TestDistributedLogBase {
                 .build();
         FederatedZKLogMetadataStore anotherMetadataStore =
                 new FederatedZKLogMetadataStore(anotherConf, uri, anotherZkc, 
scheduler);
-        FutureUtils.result(anotherMetadataStore.createLog(testLogName));
+        Utils.ioResult(anotherMetadataStore.createLog(testLogName));
 
         listener.waitForDone();
         Set<String> receivedLogs = listener.getResult();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
index 5505259..a70edf5 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
@@ -18,13 +18,14 @@
 package org.apache.distributedlog.impl.logsegment;
 
 import com.google.common.collect.Lists;
-import org.apache.distributedlog.AsyncLogWriter;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.Entry;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.LogRecordWithDLSN;
@@ -37,10 +38,8 @@ import 
org.apache.distributedlog.exceptions.ReadCancelledException;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.After;
 import org.junit.Before;
@@ -112,7 +111,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
                 null,
                 NullStatsLogger.INSTANCE,
                 AsyncFailureInjector.NULL);
-        return (BKLogSegmentEntryReader) 
FutureUtils.result(store.openReader(segment, startEntryId));
+        return (BKLogSegmentEntryReader) 
Utils.ioResult(store.openReader(segment, startEntryId));
     }
 
     void generateCompletedLogSegments(DistributedLogManager dlm,
@@ -121,12 +120,12 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
                                       long segmentSize) throws Exception {
         long txid = 1L;
         for (long i = 0; i < numCompletedSegments; i++) {
-            AsyncLogWriter writer = 
FutureUtils.result(dlm.openAsyncLogWriter());
+            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
             for (long j = 1; j <= segmentSize; j++) {
-                
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+                
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
                 LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
                 ctrlRecord.setControl();
-                FutureUtils.result(writer.write(ctrlRecord));
+                Utils.ioResult(writer.write(ctrlRecord));
             }
             Utils.close(writer);
         }
@@ -135,12 +134,12 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
     AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm,
                                               DistributedLogConfiguration conf,
                                               long segmentSize) throws 
Exception {
-        AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
         for (long i = 1L; i <= segmentSize; i++) {
-            
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
             LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
             ctrlRecord.setControl();
-            FutureUtils.result(writer.write(ctrlRecord));
+            Utils.ioResult(writer.write(ctrlRecord));
         }
         return writer;
     }
@@ -168,7 +167,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         while (!done) {
             Entry.Reader entryReader;
             try {
-                entryReader = FutureUtils.result(reader.readNext(1)).get(0);
+                entryReader = Utils.ioResult(reader.readNext(1)).get(0);
             } catch (EndOfLogSegmentException eol) {
                 done = true;
                 continue;
@@ -205,15 +204,15 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
                 1, segments.size());
 
         BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, 
confLocal);
-        List<Future<List<Entry.Reader>>> futures = Lists.newArrayList();
+        List<CompletableFuture<List<Entry.Reader>>> futures = 
Lists.newArrayList();
         for (int i = 0; i < 5; i++) {
             futures.add(reader.readNext(1));
         }
         assertFalse("Reader should not be closed yet", reader.isClosed());
         Utils.close(reader);
-        for (Future<List<Entry.Reader>> future : futures) {
+        for (CompletableFuture<List<Entry.Reader>> future : futures) {
             try {
-                FutureUtils.result(future);
+                Utils.ioResult(future);
                 fail("The read request should be cancelled");
             } catch (ReadCancelledException rce) {
                 // expected
@@ -253,7 +252,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         assertEquals(10, reader.getNextEntryId());
         assertFalse(reader.hasCaughtUpOnInprogress());
         // read first entry
-        Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+        Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
         LogRecordWithDLSN record = entryReader.nextRecord();
         while (null != record) {
             if (!record.isControl()) {
@@ -309,7 +308,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         assertEquals(5, reader.readAheadEntries.size());
         assertEquals(5, reader.getNextEntryId());
         // read first entry
-        Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+        Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
         LogRecordWithDLSN record = entryReader.nextRecord();
         while (null != record) {
             if (!record.isControl()) {
@@ -365,7 +364,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         assertEquals((reader.getLastAddConfirmed() + 1), 
reader.readAheadEntries.size());
         assertEquals((reader.getLastAddConfirmed() + 1), 
reader.getNextEntryId());
         // read first entry
-        Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+        Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
         LogRecordWithDLSN record = entryReader.nextRecord();
         while (null != record) {
             if (!record.isControl()) {
@@ -415,7 +414,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         long txId = 1L;
         long entryId = 0L;
         while (true) {
-            Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+            Entry.Reader entryReader = 
Utils.ioResult(reader.readNext(1)).get(0);
             LogRecordWithDLSN record = entryReader.nextRecord();
             while (null != record) {
                 if (!record.isControl()) {
@@ -435,11 +434,11 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         }
         assertEquals(6L, txId);
 
-        Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
+        CompletableFuture<List<Entry.Reader>> nextReadFuture = 
reader.readNext(1);
         // write another record to commit previous writes
-        
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
         // the long poll will be satisfied
-        List<Entry.Reader> nextReadEntries = 
FutureUtils.result(nextReadFuture);
+        List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture);
         assertEquals(1, nextReadEntries.size());
         assertTrue(reader.hasCaughtUpOnInprogress());
         Entry.Reader entryReader = nextReadEntries.get(0);
@@ -486,7 +485,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         long txId = 1L;
         long entryId = 0L;
         while (true) {
-            Entry.Reader entryReader = 
FutureUtils.result(reader.readNext(1)).get(0);
+            Entry.Reader entryReader = 
Utils.ioResult(reader.readNext(1)).get(0);
             LogRecordWithDLSN record = entryReader.nextRecord();
             while (null != record) {
                 if (!record.isControl()) {
@@ -506,11 +505,11 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         }
         assertEquals(6L, txId);
 
-        Future<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
+        CompletableFuture<List<Entry.Reader>> nextReadFuture = 
reader.readNext(1);
         // write another record to commit previous writes
-        
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
+        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
         // the long poll will be satisfied
-        List<Entry.Reader> nextReadEntries = 
FutureUtils.result(nextReadFuture);
+        List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture);
         assertEquals(1, nextReadEntries.size());
         Entry.Reader entryReader = nextReadEntries.get(0);
         LogRecordWithDLSN record = entryReader.nextRecord();
@@ -528,7 +527,7 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         ++entryId;
         // close the writer, the write will be committed
         Utils.close(writer);
-        entryReader = FutureUtils.result(reader.readNext(1)).get(0);
+        entryReader = Utils.ioResult(reader.readNext(1)).get(0);
         record = entryReader.nextRecord();
         assertNotNull(record);
         assertFalse(record.isControl());
@@ -549,8 +548,8 @@ public class TestBKLogSegmentEntryReader extends 
TestDistributedLogBase {
         try {
             // when we closed the log segment, another control record will be
             // written, so we loop over the reader until we reach end of log 
segment.
-            FutureUtils.result(reader.readNext(1));
-            FutureUtils.result(reader.readNext(1));
+            Utils.ioResult(reader.readNext(1));
+            Utils.ioResult(reader.readNext(1));
             fail("Should reach end of log segment");
         } catch (EndOfLogSegmentException eol) {
             // expected

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index f67de35..813501b 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -20,19 +20,17 @@ package org.apache.distributedlog.impl.metadata;
 import com.google.common.collect.Lists;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.MetadataAccessor;
+import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
-import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.distributedlog.metadata.LogMetadataForWriter;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClusterTestCase;
 import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
 import org.apache.distributedlog.util.Utils;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -129,7 +127,7 @@ public class TestZKLogStreamMetadataStore extends 
ZooKeeperClusterTestCase {
     public void testCheckLogMetadataPathsWithAllocator() throws Exception {
         String logRootPath = "/" + testName.getMethodName();
         List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(checkLogMetadataPaths(
+                Utils.ioResult(checkLogMetadataPaths(
                         zkc.get(), logRootPath, true));
         assertEquals("Should have 8 paths",
                 8, metadatas.size());
@@ -143,7 +141,7 @@ public class TestZKLogStreamMetadataStore extends 
ZooKeeperClusterTestCase {
     public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
         String logRootPath = "/" + testName.getMethodName();
         List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(checkLogMetadataPaths(
+                Utils.ioResult(checkLogMetadataPaths(
                         zkc.get(), logRootPath, false));
         assertEquals("Should have 7 paths",
                 7, metadatas.size());
@@ -169,12 +167,12 @@ public class TestZKLogStreamMetadataStore extends 
ZooKeeperClusterTestCase {
         }
 
         LogMetadataForWriter logMetadata =
-                FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, 
ownAllocator, true));
+                Utils.ioResult(getLog(uri, logName, logIdentifier, zkc, 
ownAllocator, true));
 
         final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
 
         List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(checkLogMetadataPaths(zkc.get(), 
logRootPath, ownAllocator));
+                Utils.ioResult(checkLogMetadataPaths(zkc.get(), logRootPath, 
ownAllocator));
 
         if (ownAllocator) {
             assertEquals("Should have 8 paths : ownAllocator = " + 
ownAllocator,
@@ -301,7 +299,7 @@ public class TestZKLogStreamMetadataStore extends 
ZooKeeperClusterTestCase {
     public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws 
Exception {
         String logName = testName.getMethodName();
         String logIdentifier = "<default>";
-        FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, true, 
false));
+        Utils.ioResult(getLog(uri, logName, logIdentifier, zkc, true, false));
     }
 
     @Test(timeout = 60000)
@@ -312,7 +310,7 @@ public class TestZKLogStreamMetadataStore extends 
ZooKeeperClusterTestCase {
 
         DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri);
 
-        DistributedLogNamespace namespace = 
DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
             .conf(new DistributedLogConfiguration())
             .uri(uri)
             .build();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
 
b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
index b2eee34..26cf979 100644
--- 
a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
+++ 
b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestDistributedLock.java
@@ -17,20 +17,18 @@
  */
 package org.apache.distributedlog.lock;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.util.FailpointUtils;
 import org.apache.distributedlog.exceptions.LockingException;
 import org.apache.distributedlog.TestDistributedLogBase;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClientUtils;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.CreateMode;
@@ -192,9 +190,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
 
     private static void checkLockAndReacquire(ZKDistributedLock lock, boolean 
sync) throws Exception {
         lock.checkOwnershipAndReacquire();
-        Future<ZKDistributedLock> reacquireFuture = 
lock.getLockReacquireFuture();
+        CompletableFuture<ZKDistributedLock> reacquireFuture = 
lock.getLockReacquireFuture();
         if (null != reacquireFuture && sync) {
-            FutureUtils.result(reacquireFuture);
+            Utils.ioResult(reacquireFuture);
         }
     }
 
@@ -212,7 +210,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             try {
                 ZKDistributedLock lock = new 
ZKDistributedLock(lockStateExecutor, lockFactory, lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-                FutureUtils.result(lock.asyncAcquire());
+                Utils.ioResult(lock.asyncAcquire());
                 fail("Should fail on creating lock if couldn't establishing 
connections to zookeeper");
             } catch (IOException ioe) {
                 // expected.
@@ -228,7 +226,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             try {
                 ZKDistributedLock lock = new 
ZKDistributedLock(lockStateExecutor, lockFactory, lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-                FutureUtils.result(lock.asyncAcquire());
+                Utils.ioResult(lock.asyncAcquire());
                 fail("Should fail on creating lock if couldn't establishing 
connections to zookeeper after 3 retries");
             } catch (IOException ioe) {
                 // expected.
@@ -243,14 +241,14 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         try {
             ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
lockFactory, lockPath,
                 Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-            FutureUtils.result(lock.asyncAcquire());
+            Utils.ioResult(lock.asyncAcquire());
 
             Pair<String, Long> lockId1 = ((ZKSessionLock) 
lock.getInternalLock()).getLockId();
 
             List<String> children = getLockWaiters(zkc, lockPath);
             assertEquals(1, children.size());
             assertTrue(lock.haveLock());
-            assertEquals(lockId1, Await.result(asyncParseClientID(zkc0.get(), 
lockPath, children.get(0))));
+            assertEquals(lockId1, 
Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
 
             lock.asyncClose();
         } finally {
@@ -268,16 +266,16 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         SessionLockFactory lockFactory = createLockFactory(clientId, zkc);
         ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
lockFactory, lockPath,
                 Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-        FutureUtils.result(lock.asyncAcquire());
+        Utils.ioResult(lock.asyncAcquire());
 
         Pair<String, Long> lockId1 = ((ZKSessionLock) 
lock.getInternalLock()).getLockId();
 
         List<String> children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
         assertTrue(lock.haveLock());
-        assertEquals(lockId1, Await.result(asyncParseClientID(zkc0.get(), 
lockPath, children.get(0))));
+        assertEquals(lockId1, Utils.ioResult(asyncParseClientID(zkc0.get(), 
lockPath, children.get(0))));
 
-        FutureUtils.result(lock.asyncClose());
+        Utils.ioResult(lock.asyncClose());
 
         children = getLockWaiters(zkc, lockPath);
         assertEquals(0, children.size());
@@ -285,25 +283,25 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
 
         lock = new ZKDistributedLock(lockStateExecutor, lockFactory, lockPath,
                 Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-        FutureUtils.result(lock.asyncAcquire());
+        Utils.ioResult(lock.asyncAcquire());
 
         Pair<String, Long> lockId2 = ((ZKSessionLock) 
lock.getInternalLock()).getLockId();
 
         children = getLockWaiters(zkc, lockPath);
         assertEquals(1, children.size());
         assertTrue(lock.haveLock());
-        assertEquals(lockId2, Await.result(asyncParseClientID(zkc0.get(), 
lockPath, children.get(0))));
+        assertEquals(lockId2, Utils.ioResult(asyncParseClientID(zkc0.get(), 
lockPath, children.get(0))));
 
         assertEquals(lockId1, lockId2);
 
-        FutureUtils.result(lock.asyncClose());
+        Utils.ioResult(lock.asyncClose());
 
         children = getLockWaiters(zkc, lockPath);
         assertEquals(0, children.size());
         assertFalse(lock.haveLock());
 
         try {
-            FutureUtils.result(lock.asyncAcquire());
+            Utils.ioResult(lock.asyncAcquire());
             fail("Should fail on acquiring a closed lock");
         } catch (UnexpectedException le) {
             // expected.
@@ -324,7 +322,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         ZKDistributedLock lock0 =
                 new ZKDistributedLock(lockStateExecutor, lockFactory0, 
lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-        FutureUtils.result(lock0.asyncAcquire());
+        Utils.ioResult(lock0.asyncAcquire());
 
         Pair<String, Long> lockId0_1 = ((ZKSessionLock) 
lock0.getInternalLock()).getLockId();
 
@@ -332,7 +330,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertEquals(1, children.size());
         assertTrue(lock0.haveLock());
         assertEquals(lockId0_1,
-                Await.result(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
+                Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
 
         // expire the session
         ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
@@ -347,7 +345,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertEquals(1, children.size());
         assertTrue(lock0.haveLock());
         assertEquals(lockId0_2,
-                Await.result(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
+                Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
 
 
         SessionLockFactory lockFactory = createLockFactory(clientId, zkc);
@@ -359,9 +357,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             @Override
             public void run() {
                 try {
-                    FutureUtils.result(lock1.asyncAcquire());
+                    Utils.ioResult(lock1.asyncAcquire());
                     lockLatch.countDown();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     logger.error("Failed on locking lock1 : ", e);
                 }
             }
@@ -424,7 +422,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         ZKDistributedLock lock0 =
                 new ZKDistributedLock(lockStateExecutor, lockFactory0, 
lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-        FutureUtils.result(lock0.asyncAcquire());
+        Utils.ioResult(lock0.asyncAcquire());
 
         Pair<String, Long> lockId0_1 = ((ZKSessionLock) 
lock0.getInternalLock()).getLockId();
 
@@ -432,7 +430,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertEquals(1, children.size());
         assertTrue(lock0.haveLock());
         assertEquals(lockId0_1,
-                Await.result(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
+                Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
 
         ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
 
@@ -441,13 +439,13 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             checkLockAndReacquire(lock0, false);
         } else {
             // session expire will trigger lock re-acquisition
-            Future<ZKDistributedLock> asyncLockAcquireFuture;
+            CompletableFuture<ZKDistributedLock> asyncLockAcquireFuture;
             do {
                 Thread.sleep(1);
                 asyncLockAcquireFuture = lock0.getLockReacquireFuture();
             } while (null == asyncLockAcquireFuture && 
lock0.getReacquireCount() < 1);
             if (null != asyncLockAcquireFuture) {
-                Await.result(asyncLockAcquireFuture);
+                Utils.ioResult(asyncLockAcquireFuture);
             }
             checkLockAndReacquire(lock0, false);
         }
@@ -456,11 +454,11 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertTrue(lock0.haveLock());
         Pair<String, Long> lock0_2 = ((ZKSessionLock) 
lock0.getInternalLock()).getLockId();
         assertEquals(lock0_2,
-                Await.result(asyncParseClientID(zkc.get(), lockPath, 
children.get(0))));
+                Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, 
children.get(0))));
         assertEquals(clientId, lock0_2.getLeft());
         assertFalse(lockId0_1.equals(lock0_2));
 
-        FutureUtils.result(lock0.asyncClose());
+        Utils.ioResult(lock0.asyncClose());
 
         children = getLockWaiters(zkc, lockPath);
         assertEquals(0, children.size());
@@ -495,7 +493,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         ZKDistributedLock lock0 =
                 new ZKDistributedLock(lockStateExecutor, lockFactory0, 
lockPath,
                         Long.MAX_VALUE, NullStatsLogger.INSTANCE);
-        FutureUtils.result(lock0.asyncAcquire());
+        Utils.ioResult(lock0.asyncAcquire());
 
         final CountDownLatch lock1DoneLatch = new CountDownLatch(1);
         SessionLockFactory lockFactory1 = createLockFactory(clientId, zkc);
@@ -506,9 +504,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             @Override
             public void run() {
                 try {
-                    FutureUtils.result(lock1.asyncAcquire());
+                    Utils.ioResult(lock1.asyncAcquire());
                     lock1DoneLatch.countDown();
-                } catch (IOException e) {
+                } catch (Exception e) {
                     logger.error("Error on acquiring lock1 : ", e);
                 }
             }
@@ -524,9 +522,9 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertTrue(lock0.haveLock());
         assertFalse(lock1.haveLock());
         assertEquals(((ZKSessionLock) lock0.getInternalLock()).getLockId(),
-                Await.result(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
+                Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, 
children.get(0))));
         assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(),
-                Await.result(asyncParseClientID(zkc.get(), lockPath, 
children.get(1))));
+                Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, 
children.get(1))));
 
         logger.info("Expiring session on lock0");
         ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
@@ -553,14 +551,14 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         } else {
             logger.info("Waiting lock0 to attempt acquisition after session 
expired");
             // session expire will trigger lock re-acquisition
-            Future<ZKDistributedLock> asyncLockAcquireFuture;
+            CompletableFuture<ZKDistributedLock> asyncLockAcquireFuture;
             do {
                 Thread.sleep(1);
                 asyncLockAcquireFuture = lock0.getLockReacquireFuture();
             } while (null == asyncLockAcquireFuture);
 
             try {
-                Await.result(asyncLockAcquireFuture);
+                Utils.ioResult(asyncLockAcquireFuture);
                 fail("Should fail check write lock since lock is already held 
by other people");
             } catch (OwnershipAcquireFailedException oafe) {
                 assertEquals(((ZKSessionLock) 
lock1.getInternalLock()).getLockId().getLeft(),
@@ -579,10 +577,10 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         assertFalse(lock0.haveLock());
         assertTrue(lock1.haveLock());
         assertEquals(((ZKSessionLock) lock1.getInternalLock()).getLockId(),
-                Await.result(asyncParseClientID(zkc.get(), lockPath, 
children.get(0))));
+                Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, 
children.get(0))));
 
-        FutureUtils.result(lock0.asyncClose());
-        FutureUtils.result(lock1.asyncClose());
+        Utils.ioResult(lock0.asyncClose());
+        Utils.ioResult(lock1.asyncClose());
 
         children = getLockWaiters(zkc, lockPath);
         assertEquals(0, children.size());
@@ -597,7 +595,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         SessionLockFactory lockFactory = createLockFactory(clientId, zkc, 
conf.getLockTimeoutMilliSeconds(), 0);
         ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
lockFactory, lockPath,
             conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE);
-        FutureUtils.result(lock.asyncAcquire());
+        Utils.ioResult(lock.asyncAcquire());
 
         // try and cleanup the underlying lock
         lock.getInternalLock().unlock();
@@ -614,14 +612,14 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
 
         boolean exceptionEncountered = false;
         try {
-            FutureUtils.result(lock2.asyncAcquire());
+            Utils.ioResult(lock2.asyncAcquire());
         } catch (OwnershipAcquireFailedException exc) {
             assertEquals(clientId, exc.getCurrentOwner());
             exceptionEncountered = true;
         }
         assertTrue(exceptionEncountered);
-        FutureUtils.result(lock.asyncClose());
-        FutureUtils.result(lock2.asyncClose());
+        Utils.ioResult(lock.asyncClose());
+        Utils.ioResult(lock2.asyncClose());
     }
 
     @Test(timeout = 60000)
@@ -633,7 +631,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         SessionLockFactory factory = createLockFactory(clientId, zkc, 
conf.getLockTimeoutMilliSeconds(), 0);
         ZKDistributedLock lock = new ZKDistributedLock(lockStateExecutor, 
factory, lockPath,
             conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE);
-        FutureUtils.result(lock.asyncAcquire());
+        Utils.ioResult(lock.asyncAcquire());
 
         // try and cleanup the underlying lock
         lock.getInternalLock().unlock();
@@ -650,15 +648,15 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
 
         boolean exceptionEncountered = false;
         try {
-            FutureUtils.result(lock2.asyncAcquire());
+            Utils.ioResult(lock2.asyncAcquire());
         } catch (OwnershipAcquireFailedException exc) {
             assertEquals(clientId, exc.getCurrentOwner());
             exceptionEncountered = true;
         }
         assertTrue(exceptionEncountered);
-        FutureUtils.result(lock2.asyncClose());
+        Utils.ioResult(lock2.asyncClose());
 
-        FutureUtils.result(lock.asyncClose());
+        Utils.ioResult(lock.asyncClose());
         assertEquals(false, lock.haveLock());
         assertEquals(false, lock.getInternalLock().isLockHeld());
 
@@ -666,10 +664,10 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         ZKDistributedLock lock3 = new ZKDistributedLock(lockStateExecutor, 
factory, lockPath,
             0, NullStatsLogger.INSTANCE);
 
-        FutureUtils.result(lock3.asyncAcquire());
+        Utils.ioResult(lock3.asyncAcquire());
         assertEquals(true, lock3.haveLock());
         assertEquals(true, lock3.getInternalLock().isLockHeld());
-        FutureUtils.result(lock3.asyncClose());
+        Utils.ioResult(lock3.asyncClose());
     }
 
     void assertLatchesSet(CountDownLatch[] latches, int endIndex) {
@@ -697,8 +695,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), 
zkc, lockStateExecutor);
 
         int count = 3;
-        ArrayList<Future<ZKDistributedLock>> results =
-                new ArrayList<Future<ZKDistributedLock>>(count);
+        ArrayList<CompletableFuture<ZKDistributedLock>> results =
+                new ArrayList<CompletableFuture<ZKDistributedLock>>(count);
         ZKDistributedLock[] lockArray = new ZKDistributedLock[count];
         final CountDownLatch[] latches = new CountDownLatch[count];
 
@@ -708,7 +706,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             latches[i] = new CountDownLatch(1);
             lockArray[i] = locks.createLock(i, zkc);
             final int index = i;
-            results.add(lockArray[i].asyncAcquire().addEventListener(
+            results.add(lockArray[i].asyncAcquire().whenComplete(
                 new FutureEventListener<ZKDistributedLock>() {
                     @Override
                     public void onSuccess(ZKDistributedLock lock) {
@@ -727,8 +725,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         for (int i = 0; i < count; i++) {
             latches[i].await();
             assertLatchesSet(latches, i+1);
-            Await.result(results.get(i));
-            FutureUtils.result(lockArray[i].asyncClose());
+            Utils.ioResult(results.get(i));
+            Utils.ioResult(lockArray[i].asyncClose());
         }
     }
 
@@ -738,7 +736,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         final ZKDistributedLock lock0 = locks.createLock(0, zkc);
         final ZKDistributedLock lock1 = locks.createLock(1, zkc0);
 
-        FutureUtils.result(lock0.asyncAcquire());
+        Utils.ioResult(lock0.asyncAcquire());
 
         // Initial state.
         assertLockState(lock0, true, true, lock1, false, false, 1, 
locks.getLockPath());
@@ -747,8 +745,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
             @Override
             public void run() {
                 try {
-                    FutureUtils.result(lock1.asyncAcquire());
-                } catch (IOException e) {
+                    Utils.ioResult(lock1.asyncAcquire());
+                } catch (Exception e) {
                     fail("shouldn't fail to acquire");
                 }
             }
@@ -761,13 +759,13 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         }
         assertLockState(lock0, true, true, lock1, false, false, 2, 
locks.getLockPath());
 
-        FutureUtils.result(lock0.asyncClose());
-        Await.result(lock1.getLockAcquireFuture());
+        Utils.ioResult(lock0.asyncClose());
+        Utils.ioResult(lock1.getLockAcquireFuture());
 
         assertLockState(lock0, false, false, lock1, true, true, 1, 
locks.getLockPath());
 
         // Release lock1
-        FutureUtils.result(lock1.asyncClose());
+        Utils.ioResult(lock1.asyncClose());
         assertLockState(lock0, false, false, lock1, false, false, 0, 
locks.getLockPath());
     }
 
@@ -777,8 +775,8 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         final ZKDistributedLock lock0 = locks.createLock(0, zkc);
         final ZKDistributedLock lock1 = locks.createLock(1, zkc0);
 
-        FutureUtils.result(lock0.asyncAcquire());
-        Future<ZKDistributedLock> result = lock1.asyncAcquire();
+        Utils.ioResult(lock0.asyncAcquire());
+        CompletableFuture<ZKDistributedLock> result = lock1.asyncAcquire();
         // make sure we place a waiter for lock1
         while (null == lock1.getLockWaiter()) {
             TimeUnit.MILLISECONDS.sleep(20);
@@ -787,7 +785,7 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         // Expire causes acquire future to be failed and unset.
         ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
         try {
-            Await.result(result);
+            Utils.ioResult(result);
             fail("future should have been failed");
         } catch (OwnershipAcquireFailedException ex) {
         }
@@ -803,11 +801,11 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         final ZKDistributedLock lock0 = locks.createLock(0, zkc);
         final ZKDistributedLock lock1 = locks.createLock(1, zkc0);
 
-        FutureUtils.result(lock0.asyncAcquire());
-        Future<ZKDistributedLock> result = lock1.asyncAcquire();
-        FutureUtils.result(lock1.asyncClose());
+        Utils.ioResult(lock0.asyncAcquire());
+        CompletableFuture<ZKDistributedLock> result = lock1.asyncAcquire();
+        Utils.ioResult(lock1.asyncClose());
         try {
-            Await.result(result);
+            Utils.ioResult(result);
             fail("future should have been failed");
         } catch (LockClosedException ex) {
         }
@@ -821,12 +819,12 @@ public class TestDistributedLock extends 
TestDistributedLogBase {
         TestLockFactory locks = new TestLockFactory(runtime.getMethodName(), 
zkc, lockStateExecutor);
         final ZKDistributedLock lock0 = locks.createLock(0, zkc);
 
-        Future<ZKDistributedLock> result = lock0.asyncAcquire();
-        Await.result(result);
-        FutureUtils.result(lock0.asyncClose());
+        CompletableFuture<ZKDistributedLock> result = lock0.asyncAcquire();
+        Utils.ioResult(result);
+        Utils.ioResult(lock0.asyncClose());
 
         // Already have this, stays satisfied.
-        Await.result(result);
+        Utils.ioResult(result);
 
         // But we no longer have the lock.
         assertEquals(false, lock0.haveLock());

Reply via email to