http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
index ee9b00e..71a1f98 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java
@@ -18,16 +18,15 @@
 package org.apache.distributedlog.metadata;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
 import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
 public class LogSegmentMetadataStoreUpdater implements MetadataUpdater {
 
@@ -57,8 +56,8 @@ public class LogSegmentMetadataStoreUpdater implements 
MetadataUpdater {
     }
 
     @Override
-    public Future<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata 
segment,
-                                                       LogRecordWithDLSN 
record) {
+    public CompletableFuture<LogSegmentMetadata> 
updateLastRecord(LogSegmentMetadata segment,
+                                                                  
LogRecordWithDLSN record) {
         DLSN dlsn = record.getDlsn();
         Preconditions.checkState(!segment.isInProgress(),
                 "Updating last dlsn for an inprogress log segment isn't 
supported.");
@@ -73,7 +72,7 @@ public class LogSegmentMetadataStoreUpdater implements 
MetadataUpdater {
     }
 
     @Override
-    public Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata 
segment,
+    public CompletableFuture<LogSegmentMetadata> 
changeSequenceNumber(LogSegmentMetadata segment,
                                                            long 
logSegmentSeqNo) {
         String newZkPath = segment.getZkPath()
                 
.replace(formatLogSegmentSequenceNumber(segment.getLogSegmentSequenceNumber()),
@@ -92,7 +91,7 @@ public class LogSegmentMetadataStoreUpdater implements 
MetadataUpdater {
      * @return new log segment
      */
     @Override
-    public Future<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata 
segment) {
+    public CompletableFuture<LogSegmentMetadata> 
setLogSegmentActive(LogSegmentMetadata segment) {
         final LogSegmentMetadata newSegment = segment.mutator()
             .setTruncationStatus(LogSegmentMetadata.TruncationStatus.ACTIVE)
             .build();
@@ -106,7 +105,7 @@ public class LogSegmentMetadataStoreUpdater implements 
MetadataUpdater {
      * @return new log segment
      */
     @Override
-    public Future<LogSegmentMetadata> 
setLogSegmentTruncated(LogSegmentMetadata segment) {
+    public CompletableFuture<LogSegmentMetadata> 
setLogSegmentTruncated(LogSegmentMetadata segment) {
         final LogSegmentMetadata newSegment = segment.mutator()
             .setTruncationStatus(LogSegmentMetadata.TruncationStatus.TRUNCATED)
             .build();
@@ -130,7 +129,7 @@ public class LogSegmentMetadataStoreUpdater implements 
MetadataUpdater {
      * @return new log segment
      */
     @Override
-    public Future<LogSegmentMetadata> 
setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, DLSN minActiveDLSN) 
{
+    public CompletableFuture<LogSegmentMetadata> 
setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, DLSN minActiveDLSN) 
{
         final LogSegmentMetadata newSegment = segment.mutator()
             
.setTruncationStatus(LogSegmentMetadata.TruncationStatus.PARTIALLY_TRUNCATED)
             .setMinActiveDLSN(minActiveDLSN)
@@ -150,28 +149,18 @@ public class LogSegmentMetadataStoreUpdater implements 
MetadataUpdater {
         return newSegment;
     }
 
-    protected Future<LogSegmentMetadata> updateSegmentMetadata(final 
LogSegmentMetadata segment) {
+    protected CompletableFuture<LogSegmentMetadata> 
updateSegmentMetadata(final LogSegmentMetadata segment) {
         Transaction<Object> txn = transaction();
         metadataStore.updateLogSegment(txn, segment);
-        return txn.execute().map(new AbstractFunction1<Void, 
LogSegmentMetadata>() {
-            @Override
-            public LogSegmentMetadata apply(Void value) {
-                return segment;
-            }
-        });
+        return txn.execute().thenApply((value) -> segment);
     }
 
-    protected Future<LogSegmentMetadata> addNewSegmentAndDeleteOldSegment(
+    protected CompletableFuture<LogSegmentMetadata> 
addNewSegmentAndDeleteOldSegment(
             final LogSegmentMetadata newSegment, LogSegmentMetadata 
oldSegment) {
         LOG.info("old segment {} new segment {}", oldSegment, newSegment);
         Transaction<Object> txn = transaction();
         addNewSegmentAndDeleteOldSegment(txn, newSegment, oldSegment);
-        return txn.execute().map(new AbstractFunction1<Void, 
LogSegmentMetadata>() {
-            @Override
-            public LogSegmentMetadata apply(Void value) {
-                return newSegment;
-            }
-        });
+        return txn.execute().thenApply((value) -> newSegment);
     }
 
     protected void addNewSegmentAndDeleteOldSegment(Transaction<Object> txn,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
index f1e8f06..37ecab4 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
@@ -19,11 +19,11 @@ package org.apache.distributedlog.metadata;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.lock.DistributedLock;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
 
 import java.io.Closeable;
 import java.net.URI;
@@ -47,10 +47,10 @@ public interface LogStreamMetadataStore extends Closeable {
      *
      * @param uri the location of the log stream
      * @param logName the name of the log stream
-     * @return future represents the existence of a log stream. {@link 
org.apache.distributedlog.LogNotFoundException}
-     *         is thrown if the log doesn't exist
+     * @return future represents the existence of a log stream.
+     *         {@link 
org.apache.distributedlog.exceptions.LogNotFoundException} is thrown if the log 
doesn't exist
      */
-    Future<Void> logExists(URI uri, String logName);
+    CompletableFuture<Void> logExists(URI uri, String logName);
 
     /**
      * Create the read lock for the log stream.
@@ -59,7 +59,7 @@ public interface LogStreamMetadataStore extends Closeable {
      * @param readerId the reader id used for lock
      * @return the read lock
      */
-    Future<DistributedLock> createReadLock(LogMetadataForReader metadata,
+    CompletableFuture<DistributedLock> createReadLock(LogMetadataForReader 
metadata,
                                            Optional<String> readerId);
 
     /**
@@ -79,7 +79,7 @@ public interface LogStreamMetadataStore extends Closeable {
      * @param createIfNotExists flag to create the stream if it doesn't exist
      * @return the metadata of the log
      */
-    Future<LogMetadataForWriter> getLog(URI uri,
+    CompletableFuture<LogMetadataForWriter> getLog(URI uri,
                                         String streamName,
                                         boolean ownAllocator,
                                         boolean createIfNotExists);
@@ -91,7 +91,7 @@ public interface LogStreamMetadataStore extends Closeable {
      * @param streamName the name of the log stream
      * @return future represents the result of the deletion.
      */
-    Future<Void> deleteLog(URI uri, String streamName);
+    CompletableFuture<Void> deleteLog(URI uri, String streamName);
 
     /**
      * Get the log segment metadata store.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java
index 06a0600..793a2c9 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java
@@ -17,11 +17,11 @@
  */
 package org.apache.distributedlog.metadata;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
 
 /**
  * An updater to update metadata. It contains utility functions on mutating 
metadata.
@@ -44,8 +44,8 @@ public interface MetadataUpdater {
      *          correct last record.
      * @return new log segment
      */
-    Future<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata segment,
-                                                LogRecordWithDLSN record);
+    CompletableFuture<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata 
segment,
+                                                           LogRecordWithDLSN 
record);
 
     /**
      * Change ledger sequence number of <i>segment</i> to given 
<i>logSegmentSeqNo</i>.
@@ -56,7 +56,7 @@ public interface MetadataUpdater {
      *          ledger sequence number to change.
      * @return new log segment
      */
-    Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment,
+    CompletableFuture<LogSegmentMetadata> 
changeSequenceNumber(LogSegmentMetadata segment,
                                                     long logSegmentSeqNo);
 
     /**
@@ -66,7 +66,7 @@ public interface MetadataUpdater {
      *          log segment to change truncation status to active.
      * @return new log segment
      */
-    Future<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata segment);
+    CompletableFuture<LogSegmentMetadata> 
setLogSegmentActive(LogSegmentMetadata segment);
 
     /**
      * Change the truncation status of a <i>log segment</i> to truncated
@@ -75,7 +75,7 @@ public interface MetadataUpdater {
      *          log segment to change truncation status to truncated.
      * @return new log segment
      */
-    Future<LogSegmentMetadata> setLogSegmentTruncated(LogSegmentMetadata 
segment);
+    CompletableFuture<LogSegmentMetadata> 
setLogSegmentTruncated(LogSegmentMetadata segment);
 
     /**
      * Change the truncation status of a <i>log segment</i> to truncated. The 
operation won't be executed
@@ -98,7 +98,7 @@ public interface MetadataUpdater {
      *          DLSN within the log segment before which log has been truncated
      * @return new log segment
      */
-    Future<LogSegmentMetadata> 
setLogSegmentPartiallyTruncated(LogSegmentMetadata segment,
+    CompletableFuture<LogSegmentMetadata> 
setLogSegmentPartiallyTruncated(LogSegmentMetadata segment,
                                                                DLSN 
minActiveDLSN);
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
deleted file mode 100644
index 4cbee98..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.namespace;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Optional;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.callback.NamespaceListener;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.InvalidStreamNameException;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * A namespace is the basic unit for managing a set of distributedlogs.
- *
- * <h4>Namespace Interface</h4>
- *
- * <P>
- * The <code>DistributedLogNamespace</code> interface is implemented by 
different backend providers.
- * There are several components are required for an implementation:
- * <OL>
- *     <LI>Log Management -- manage logs in a given namespace. e.g. 
create/open/delete log, list of logs,
- *         watch the changes of logs.
- *     <LI>Access Control -- manage the access controls for logs in the 
namespace.
- * </OL>
- * </P>
- *
- * <h4>Namespace Location</h4>
- *
- * At the highest level, a <code>DistributedLogNamespace</code> is located by 
a <code>URI</code>. The location
- * URI is in string form has the syntax
- *
- * <blockquote>
- * 
distributedlog[<tt><b>-</b></tt><i>provider</i>]<tt><b>:</b></tt><i>provider-specific-path</i>
- * </blockquote>
- *
- * where square brackets [...] delineate optional components and the 
characters <tt><b>-</b></tt> and <tt><b>:</b></tt>
- * stand for themselves.
- *
- * The <code>provider</code> part in the URI indicates what is the backend 
used for this namespace. For example:
- * <i>distributedlog-bk</i> URI is storing logs in bookkeeper, while 
<i>distributedlog-mem</i> URI is storing logs in
- * memory. The <code>provider</code> part is optional. It would use bookkeeper 
backend if the <i>provider</i> part
- * is omitted.
- *
- * @see DistributedLogManager
- * @since 0.3.32
- */
-@Beta
-public interface DistributedLogNamespace {
-
-    /**
-     * Get the namespace driver used by this namespace.
-     *
-     * @return namespace driver
-     */
-    NamespaceDriver getNamespaceDriver();
-
-    //
-    // Method to operate logs
-    //
-
-    /**
-     * Create a log named <i>logName</i>.
-     *
-     * @param logName
-     *          name of the log
-     * @throws InvalidStreamNameException if log name is invalid.
-     * @throws IOException when encountered issues with backend.
-     */
-    void createLog(String logName)
-            throws InvalidStreamNameException, IOException;
-
-    /**
-     * Delete a log named <i>logName</i>.
-     *
-     * @param logName
-     *          name of the log
-     * @throws InvalidStreamNameException if log name is invalid
-     * @throws LogNotFoundException if log doesn't exist
-     * @throws IOException when encountered issues with backend
-     */
-    void deleteLog(String logName)
-            throws InvalidStreamNameException, LogNotFoundException, 
IOException;
-
-    /**
-     * Open a log named <i>logName</i>.
-     * A distributedlog manager is returned to access log <i>logName</i>.
-     *
-     * @param logName
-     *          name of the log
-     * @return distributedlog manager instance.
-     * @throws InvalidStreamNameException if log name is invalid.
-     * @throws IOException when encountered issues with backend.
-     */
-    DistributedLogManager openLog(String logName)
-            throws InvalidStreamNameException, IOException;
-
-    /**
-     * Open a log named <i>logName</i> with specific log configurations.
-     *
-     * <p>This method allows the caller to override global configuration 
settings by
-     * supplying log configuration overrides. Log config overrides come in two 
flavors,
-     * static and dynamic. Static config never changes in the lifecyle of 
<code>DistributedLogManager</code>,
-     * dynamic config changes by reloading periodically and safe to access 
from any context.</p>
-     *
-     * @param logName
-     *          name of the log
-     * @param logConf
-     *          static log configuration
-     * @param dynamicLogConf
-     *          dynamic log configuration
-     * @return distributedlog manager instance.
-     * @throws InvalidStreamNameException if log name is invalid.
-     * @throws IOException when encountered issues with backend.
-     */
-    DistributedLogManager openLog(String logName,
-                                  Optional<DistributedLogConfiguration> 
logConf,
-                                  Optional<DynamicDistributedLogConfiguration> 
dynamicLogConf,
-                                  Optional<StatsLogger> perStreamStatsLogger)
-            throws InvalidStreamNameException, IOException;
-
-    /**
-     * Check whether the log <i>logName</i> exist.
-     *
-     * @param logName
-     *          name of the log
-     * @return <code>true</code> if the log exists, otherwise 
<code>false</code>.
-     * @throws IOException when encountered exceptions on checking
-     */
-    boolean logExists(String logName)
-            throws IOException;
-
-    /**
-     * Retrieve the logs under the namespace.
-     *
-     * @return iterator of the logs under the namespace.
-     * @throws IOException when encountered issues with backend.
-     */
-    Iterator<String> getLogs()
-            throws IOException;
-
-    //
-    // Methods for namespace
-    //
-
-    /**
-     * Register namespace listener on stream updates under the namespace.
-     *
-     * @param listener
-     *          listener to receive stream updates under the namespace
-     */
-    void registerNamespaceListener(NamespaceListener listener);
-
-    /**
-     * Create an access control manager to manage/check acl for logs.
-     *
-     * @return access control manager for logs under the namespace.
-     * @throws IOException
-     */
-    AccessControlManager createAccessControlManager()
-            throws IOException;
-
-    /**
-     * Close the namespace.
-     */
-    void close();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
deleted file mode 100644
index 2706201..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.namespace;
-
-import com.google.common.base.Preconditions;
-import org.apache.distributedlog.BKDistributedLogNamespace;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.feature.CoreFeatureKeys;
-import org.apache.distributedlog.injector.AsyncFailureInjector;
-import org.apache.distributedlog.injector.AsyncRandomFailureInjector;
-import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
-import org.apache.distributedlog.util.SimplePermitLimiter;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.feature.SettableFeatureProvider;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * Builder to construct a <code>DistributedLogNamespace</code>.
- * The builder takes the responsibility of loading backend according to the 
uri.
- *
- * @see DistributedLogNamespace
- * @since 0.3.32
- */
-public class DistributedLogNamespaceBuilder {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(DistributedLogNamespaceBuilder.class);
-
-    public static DistributedLogNamespaceBuilder newBuilder() {
-        return new DistributedLogNamespaceBuilder();
-    }
-
-    private DistributedLogConfiguration _conf = null;
-    private DynamicDistributedLogConfiguration _dynConf = null;
-    private URI _uri = null;
-    private StatsLogger _statsLogger = NullStatsLogger.INSTANCE;
-    private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE;
-    private FeatureProvider _featureProvider = null;
-    private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID;
-    private int _regionId = DistributedLogConstants.LOCAL_REGION_ID;
-
-    // private constructor
-    private DistributedLogNamespaceBuilder() {}
-
-    /**
-     * DistributedLog Configuration used for the namespace.
-     *
-     * @param conf
-     *          distributedlog configuration
-     * @return namespace builder.
-     */
-    public DistributedLogNamespaceBuilder conf(DistributedLogConfiguration 
conf) {
-        this._conf = conf;
-        return this;
-    }
-
-    /**
-     * Dynamic DistributedLog Configuration used for the namespace
-     *
-     * @param dynConf dynamic distributedlog configuration
-     * @return namespace builder
-     */
-    public DistributedLogNamespaceBuilder 
dynConf(DynamicDistributedLogConfiguration dynConf) {
-        this._dynConf = dynConf;
-        return this;
-    }
-
-    /**
-     * Namespace Location.
-     *
-     * @param uri
-     *          namespace location uri.
-     * @see DistributedLogNamespace
-     * @return namespace builder.
-     */
-    public DistributedLogNamespaceBuilder uri(URI uri) {
-        this._uri = uri;
-        return this;
-    }
-
-    /**
-     * Stats Logger used for stats collection
-     *
-     * @param statsLogger
-     *          stats logger
-     * @return namespace builder.
-     */
-    public DistributedLogNamespaceBuilder statsLogger(StatsLogger statsLogger) 
{
-        this._statsLogger = statsLogger;
-        return this;
-    }
-
-    /**
-     * Stats Logger used for collecting per log stats.
-     *
-     * @param statsLogger
-     *          stats logger for collecting per log stats
-     * @return namespace builder.
-     */
-    public DistributedLogNamespaceBuilder perLogStatsLogger(StatsLogger 
statsLogger) {
-        this._perLogStatsLogger = statsLogger;
-        return this;
-    }
-
-    /**
-     * Feature provider used to control the availabilities of features in the 
namespace.
-     *
-     * @param featureProvider
-     *          feature provider to control availabilities of features.
-     * @return namespace builder.
-     */
-    public DistributedLogNamespaceBuilder featureProvider(FeatureProvider 
featureProvider) {
-        this._featureProvider = featureProvider;
-        return this;
-    }
-
-    /**
-     * Client Id used for accessing the namespace
-     *
-     * @param clientId
-     *          client id used for accessing the namespace
-     * @return namespace builder.
-     */
-    public DistributedLogNamespaceBuilder clientId(String clientId) {
-        this._clientId = clientId;
-        return this;
-    }
-
-    /**
-     * Region Id used for encoding logs in the namespace. The region id
-     * is useful when the namespace is globally spanning over regions.
-     *
-     * @param regionId
-     *          region id.
-     * @return namespace builder.
-     */
-    public DistributedLogNamespaceBuilder regionId(int regionId) {
-        this._regionId = regionId;
-        return this;
-    }
-
-    @SuppressWarnings("deprecation")
-    private static StatsLogger normalizePerLogStatsLogger(StatsLogger 
statsLogger,
-                                                          StatsLogger 
perLogStatsLogger,
-                                                          
DistributedLogConfiguration conf) {
-        StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger;
-        if (perLogStatsLogger == NullStatsLogger.INSTANCE &&
-                conf.getEnablePerStreamStat()) {
-            normalizedPerLogStatsLogger = statsLogger.scope("stream");
-        }
-        return normalizedPerLogStatsLogger;
-    }
-
-    /**
-     * Build the namespace.
-     *
-     * @return the namespace instance.
-     * @throws IllegalArgumentException when there is illegal argument 
provided in the builder
-     * @throws NullPointerException when there is null argument provided in 
the builder
-     * @throws IOException when fail to build the backend
-     */
-    public DistributedLogNamespace build()
-            throws IllegalArgumentException, NullPointerException, IOException 
{
-        // Check arguments
-        Preconditions.checkNotNull(_conf, "No DistributedLog Configuration.");
-        Preconditions.checkNotNull(_uri, "No DistributedLog URI");
-
-        // validate the configuration
-        _conf.validate();
-        if (null == _dynConf) {
-            _dynConf = ConfUtils.getConstDynConf(_conf);
-        }
-
-        // retrieve the namespace driver
-        NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri);
-        URI normalizedUri = DLUtils.normalizeURI(_uri);
-
-        // build the feature provider
-        FeatureProvider featureProvider;
-        if (null == _featureProvider) {
-            featureProvider = new SettableFeatureProvider("", 0);
-            logger.info("No feature provider is set. All features are disabled 
now.");
-        } else {
-            featureProvider = _featureProvider;
-        }
-
-        // build the failure injector
-        AsyncFailureInjector failureInjector = 
AsyncRandomFailureInjector.newBuilder()
-                .injectDelays(_conf.getEIInjectReadAheadDelay(),
-                              _conf.getEIInjectReadAheadDelayPercent(),
-                              _conf.getEIInjectMaxReadAheadDelayMs())
-                .injectErrors(false, 10)
-                .injectStops(_conf.getEIInjectReadAheadStall(), 10)
-                .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries())
-                .build();
-
-        // normalize the per log stats logger
-        StatsLogger perLogStatsLogger = 
normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf);
-
-        // build the scheduler
-        StatsLogger schedulerStatsLogger = 
_statsLogger.scope("factory").scope("thread_pool");
-        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
-                .name("DLM-" + normalizedUri.getPath())
-                .corePoolSize(_conf.getNumWorkerThreads())
-                .statsLogger(schedulerStatsLogger)
-                .perExecutorStatsLogger(schedulerStatsLogger)
-                .traceTaskExecution(_conf.getEnableTaskExecutionStats())
-                
.traceTaskExecutionWarnTimeUs(_conf.getTaskExecutionWarnTimeMicros())
-                .build();
-
-        // initialize the namespace driver
-        driver.initialize(
-                _conf,
-                _dynConf,
-                normalizedUri,
-                scheduler,
-                featureProvider,
-                failureInjector,
-                _statsLogger,
-                perLogStatsLogger,
-                DLUtils.normalizeClientId(_clientId),
-                _regionId);
-
-        // initialize the write limiter
-        PermitLimiter writeLimiter;
-        if (_conf.getGlobalOutstandingWriteLimit() < 0) {
-            writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
-        } else {
-            Feature disableWriteLimitFeature = featureProvider.getFeature(
-                CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
-            writeLimiter = new SimplePermitLimiter(
-                _conf.getOutstandingWriteLimitDarkmode(),
-                _conf.getGlobalOutstandingWriteLimit(),
-                _statsLogger.scope("writeLimiter"),
-                true /* singleton */,
-                disableWriteLimitFeature);
-        }
-
-        return new BKDistributedLogNamespace(
-                _conf,
-                normalizedUri,
-                driver,
-                scheduler,
-                featureProvider,
-                writeLimiter,
-                failureInjector,
-                _statsLogger,
-                perLogStatsLogger,
-                DLUtils.normalizeClientId(_clientId),
-                _regionId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
index 2f5adc6..cf970ef 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
@@ -18,16 +18,15 @@
 package org.apache.distributedlog.namespace;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.MetadataAccessor;
+import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.acl.AccessControlManager;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.injector.AsyncFailureInjector;
 import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
-import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
 import org.apache.distributedlog.metadata.LogMetadataStore;
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.StatsLogger;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java
deleted file mode 100644
index 14db547..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.rate;
-
-public interface MovingAverageRate {
-    double get();
-    void add(long amount);
-    void inc();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java
deleted file mode 100644
index cd33e24..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.rate;
-
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.TimerTask;
-import com.twitter.util.Timer;
-import com.twitter.util.Time;
-import java.util.concurrent.CopyOnWriteArrayList;
-import scala.runtime.BoxedUnit;
-
-public class MovingAverageRateFactory {
-
-    private static final int DEFAULT_INTERVAL_SECS = 1;
-
-    private final Timer timer;
-    private final TimerTask timerTask;
-    private final CopyOnWriteArrayList<SampledMovingAverageRate> avgs;
-
-    public MovingAverageRateFactory(Timer timer) {
-        this.avgs = new CopyOnWriteArrayList<SampledMovingAverageRate>();
-        this.timer = timer;
-        Function0<BoxedUnit> sampleTask = new Function0<BoxedUnit>() {
-            public BoxedUnit apply() {
-                sampleAll();
-                return null;
-            }
-        };
-        this.timerTask = timer.schedulePeriodically(
-            Time.now(), Duration.fromSeconds(DEFAULT_INTERVAL_SECS), 
sampleTask);
-    }
-
-    public MovingAverageRate create(int intervalSecs) {
-        SampledMovingAverageRate avg = new 
SampledMovingAverageRate(intervalSecs);
-        avgs.add(avg);
-        return avg;
-    }
-
-    public void close() {
-        timerTask.cancel();
-        avgs.clear();
-    }
-
-    private void sampleAll() {
-        for (SampledMovingAverageRate avg : avgs) {
-            avg.sample();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java
deleted file mode 100644
index 0b3ccac..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.rate;
-
-import com.twitter.common.stats.Rate;
-import com.twitter.util.TimerTask;
-import com.twitter.util.Timer;
-import com.twitter.util.Time;
-import java.util.concurrent.atomic.AtomicLong;
-
-class SampledMovingAverageRate implements MovingAverageRate {
-    private final Rate rate;
-    private final AtomicLong total;
-
-    private double value;
-
-    public SampledMovingAverageRate(int intervalSecs) {
-        this.total = new AtomicLong(0);
-        this.rate = Rate.of("Ignore", total)
-            .withWindowSize(intervalSecs)
-            .build();
-        this.value = 0;
-    }
-
-    @Override
-    public double get() {
-        return value;
-    }
-
-    @Override
-    public void add(long amount) {
-        total.getAndAdd(amount);
-    }
-
-    @Override
-    public void inc() {
-        add(1);
-    }
-
-    void sample() {
-        value = rate.doSample();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java
deleted file mode 100644
index d81f8a4..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * ReadAhead Mechanism for distributedlog streaming reads
- */
-package org.apache.distributedlog.readahead;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java
deleted file mode 100644
index 199aa4c..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.stats;
-
-import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A Util to logger stats on bk exceptions.
- */
-public class BKExceptionStatsLogger {
-
-    public static String getMessage(int code) {
-        switch (code) {
-            case Code.OK:
-                return "OK";
-            case Code.ReadException:
-                return "ReadException";
-            case Code.QuorumException:
-                return "QuorumException";
-            case Code.NoBookieAvailableException:
-                return "NoBookieAvailableException";
-            case Code.DigestNotInitializedException:
-                return "DigestNotInitializedException";
-            case Code.DigestMatchException:
-                return "DigestMatchException";
-            case Code.NotEnoughBookiesException:
-                return "NotEnoughBookiesException";
-            case Code.NoSuchLedgerExistsException:
-                return "NoSuchLedgerExistsException";
-            case Code.BookieHandleNotAvailableException:
-                return "BookieHandleNotAvailableException";
-            case Code.ZKException:
-                return "ZKException";
-            case Code.LedgerRecoveryException:
-                return "LedgerRecoveryException";
-            case Code.LedgerClosedException:
-                return "LedgerClosedException";
-            case Code.WriteException:
-                return "WriteException";
-            case Code.NoSuchEntryException:
-                return "NoSuchEntryException";
-            case Code.IncorrectParameterException:
-                return "IncorrectParameterException";
-            case Code.InterruptedException:
-                return "InterruptedException";
-            case Code.ProtocolVersionException:
-                return "ProtocolVersionException";
-            case Code.MetadataVersionException:
-                return "MetadataVersionException";
-            case Code.LedgerFencedException:
-                return "LedgerFencedException";
-            case Code.UnauthorizedAccessException:
-                return "UnauthorizedAccessException";
-            case Code.UnclosedFragmentException:
-                return "UnclosedFragmentException";
-            case Code.WriteOnReadOnlyBookieException:
-                return "WriteOnReadOnlyBookieException";
-            case Code.IllegalOpException:
-                return "IllegalOpException";
-            default:
-                return "UnexpectedException";
-        }
-    }
-
-    private final StatsLogger parentLogger;
-    private final Map<Integer, Counter> exceptionCounters;
-
-    public BKExceptionStatsLogger(StatsLogger parentLogger) {
-        this.parentLogger = parentLogger;
-        this.exceptionCounters = new HashMap<Integer, Counter>();
-    }
-
-    public Counter getExceptionCounter(int rc) {
-        Counter counter = exceptionCounters.get(rc);
-        if (null != counter) {
-            return counter;
-        }
-        // TODO: it would be better to have BKException.Code.get(rc)
-        synchronized (exceptionCounters) {
-            counter = exceptionCounters.get(rc);
-            if (null != counter) {
-                return counter;
-            }
-            counter = parentLogger.getCounter(getMessage(rc));
-            exceptionCounters.put(rc, counter);
-        }
-        return counter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java
deleted file mode 100644
index b6ca733..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.stats;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.bookkeeper.stats.CachingStatsLogger;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsData;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * Stats Loggers that broadcast stats to multiple {@link StatsLogger}.
- */
-public class BroadCastStatsLogger {
-
-    /**
-     * Create a broadcast stats logger of two stats loggers 
`<code>first</code>` and
-     * `<code>second</code>`. The returned stats logger doesn't allow 
registering any
-     * {@link Gauge}.
-     *
-     * @param first
-     *          first stats logger
-     * @param second
-     *          second stats logger
-     * @return broadcast stats logger
-     */
-    public static StatsLogger two(StatsLogger first, StatsLogger second) {
-        return new CachingStatsLogger(new Two(first, second));
-    }
-
-    static class Two implements StatsLogger {
-        protected final StatsLogger first;
-        protected final StatsLogger second;
-
-        private Two(StatsLogger first, StatsLogger second) {
-            super();
-            Preconditions.checkNotNull(first);
-            Preconditions.checkNotNull(second);
-            this.first = first;
-            this.second = second;
-        }
-
-        @Override
-        public OpStatsLogger getOpStatsLogger(final String statName) {
-            final OpStatsLogger firstLogger = first.getOpStatsLogger(statName);
-            final OpStatsLogger secondLogger = 
second.getOpStatsLogger(statName);
-            return new OpStatsLogger() {
-                @Override
-                public void registerFailedEvent(long l) {
-                    firstLogger.registerFailedEvent(l);
-                    secondLogger.registerFailedEvent(l);
-                }
-
-                @Override
-                public void registerSuccessfulEvent(long l) {
-                    firstLogger.registerSuccessfulEvent(l);
-                    secondLogger.registerSuccessfulEvent(l);
-                }
-
-                @Override
-                public OpStatsData toOpStatsData() {
-                    // Eventually consistent.
-                    return firstLogger.toOpStatsData();
-                }
-
-                @Override
-                public void clear() {
-                    firstLogger.clear();
-                    secondLogger.clear();
-                }
-            };
-        }
-
-        @Override
-        public Counter getCounter(final String statName) {
-            final Counter firstCounter = first.getCounter(statName);
-            final Counter secondCounter = second.getCounter(statName);
-            return new Counter() {
-                @Override
-                public void clear() {
-                    firstCounter.clear();
-                    secondCounter.clear();
-                }
-
-                @Override
-                public void inc() {
-                    firstCounter.inc();
-                    secondCounter.inc();
-                }
-
-                @Override
-                public void dec() {
-                    firstCounter.dec();
-                    secondCounter.dec();
-                }
-
-                @Override
-                public void add(long l) {
-                    firstCounter.add(l);
-                    secondCounter.add(l);
-                }
-
-                @Override
-                public Long get() {
-                    // Eventually consistent.
-                    return firstCounter.get();
-                }
-            };
-        }
-
-        @Override
-        public <T extends Number> void registerGauge(String statName, Gauge<T> 
gauge) {
-            // Different underlying stats loggers have different semantics 
wrt. gauge registration.
-            throw new RuntimeException("Cannot register a gauge on 
BroadCastStatsLogger.Two");
-        }
-
-        @Override
-        public <T extends Number> void unregisterGauge(String statName, 
Gauge<T> gauge) {
-            // no-op
-        }
-
-        @Override
-        public StatsLogger scope(final String scope) {
-            return new Two(first.scope(scope), second.scope(scope));
-        }
-
-        @Override
-        public void removeScope(String scope, StatsLogger statsLogger) {
-            if (!(statsLogger instanceof Two)) {
-                return;
-            }
-
-            Two another = (Two) statsLogger;
-
-            first.removeScope(scope, another.first);
-            second.removeScope(scope, another.second);
-        }
-    }
-
-    /**
-     * Create a broadcast stats logger of two stats loggers 
<code>master</code> and <code>slave</code>.
-     * It is similar as {@link #two(StatsLogger, StatsLogger)}, but it allows 
registering {@link Gauge}s.
-     * The {@link Gauge} will be registered under master.
-     *
-     * @param master
-     *          master stats logger to receive {@link Counter}, {@link 
OpStatsLogger} and {@link Gauge}.
-     * @param slave
-     *          slave stats logger to receive only {@link Counter} and {@link 
OpStatsLogger}.
-     * @return broadcast stats logger
-     */
-    public static StatsLogger masterslave(StatsLogger master, StatsLogger 
slave) {
-        return new CachingStatsLogger(new MasterSlave(master, slave));
-    }
-
-    static class MasterSlave extends Two {
-
-        private MasterSlave(StatsLogger master, StatsLogger slave) {
-            super(master, slave);
-        }
-
-        @Override
-        public <T extends Number> void registerGauge(String statName, Gauge<T> 
gauge) {
-            first.registerGauge(statName, gauge);
-        }
-
-        @Override
-        public <T extends Number> void unregisterGauge(String statName, 
Gauge<T> gauge) {
-            first.unregisterGauge(statName, gauge);
-        }
-
-        @Override
-        public StatsLogger scope(String scope) {
-            return new MasterSlave(first.scope(scope), second.scope(scope));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
deleted file mode 100644
index 43641f0..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.stats;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.util.FutureEventListener;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import java.util.concurrent.TimeUnit;
-
-public class OpStatsListener<T> implements FutureEventListener<T> {
-    OpStatsLogger opStatsLogger;
-    Stopwatch stopwatch;
-
-    public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) {
-        this.opStatsLogger = opStatsLogger;
-        if (null == stopwatch) {
-            this.stopwatch = Stopwatch.createStarted();
-        } else {
-            this.stopwatch = stopwatch;
-        }
-    }
-
-    public OpStatsListener(OpStatsLogger opStatsLogger) {
-        this(opStatsLogger, null);
-    }
-
-    @Override
-    public void onSuccess(T value) {
-        
opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-    }
-
-    @Override
-    public void onFailure(Throwable cause) {
-        
opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java
deleted file mode 100644
index ebfc32a..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.subscription;
-
-import java.io.Closeable;
-
-import scala.runtime.BoxedUnit;
-
-import org.apache.distributedlog.DLSN;
-import com.twitter.util.Future;
-
-public interface SubscriptionStateStore extends Closeable {
-    /**
-     * Get the last committed position stored for this subscription
-     *
-     * @return future represents the last commit position
-     */
-    public Future<DLSN> getLastCommitPosition();
-
-    /**
-     * Advances the position associated with the subscriber
-     *
-     * @param newPosition - new commit position
-     * @return future represents the advance result
-     */
-    public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
deleted file mode 100644
index 1974f1e..0000000
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.subscription;
-
-import org.apache.distributedlog.DLSN;
-import com.twitter.util.Future;
-import scala.runtime.BoxedUnit;
-
-import java.io.Closeable;
-import java.util.Map;
-
-/**
- * Store to manage subscriptions
- */
-public interface SubscriptionsStore extends Closeable {
-
-    /**
-     * Get the last committed position stored for <i>subscriberId</i>.
-     *
-     * @param subscriberId
-     *          subscriber id
-     * @return future representing last committed position.
-     */
-    public Future<DLSN> getLastCommitPosition(String subscriberId);
-
-    /**
-     * Get the last committed positions for all subscribers.
-     *
-     * @return future representing last committed positions for all 
subscribers.
-     */
-    public Future<Map<String, DLSN>> getLastCommitPositions();
-
-    /**
-     * Advance the last committed position for <i>subscriberId</i>.
-     *
-     * @param subscriberId
-     *          subscriber id.
-     * @param newPosition
-     *          new committed position.
-     * @return future representing advancing result.
-     */
-    public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN 
newPosition);
-
-    /**
-     * Delete the subscriber <i>subscriberId</i> permanently. Once the 
subscriber is deleted, all the
-     * data stored under this subscriber will be lost.
-     * @param subscriberId subscriber id
-     * @return future represent success or failure.
-     * return true only if there's such subscriber and we removed it 
successfully.
-     * return false if there's no such subscriber, or we failed to remove.
-     */
-    public Future<Boolean> deleteSubscriber(String subscriberId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java
index efee3ca..ffe2303 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java
@@ -18,4 +18,4 @@
 /**
  * Generated thrift code.
  */
-package org.apache.distributedlog.thrift;
\ No newline at end of file
+package org.apache.distributedlog.thrift;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 2c27088..64229d1 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -55,13 +55,15 @@ import java.util.regex.Pattern;
 import com.google.common.base.Preconditions;
 import org.apache.distributedlog.BKDistributedLogNamespace;
 import org.apache.distributedlog.Entry;
-import org.apache.distributedlog.MetadataAccessor;
+import org.apache.distributedlog.api.MetadataAccessor;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
 import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Utils;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -86,16 +88,16 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.AsyncLogReader;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.LogSegmentMetadata;
@@ -107,9 +109,7 @@ import org.apache.distributedlog.bk.LedgerAllocatorUtils;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.MetadataUpdater;
 import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Await;
-import com.twitter.util.FutureEventListener;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -163,7 +163,7 @@ public class DistributedLogTool extends Tool {
         protected URI uri;
         protected String zkAclId = null;
         protected boolean force = false;
-        protected DistributedLogNamespace namespace = null;
+        protected Namespace namespace = null;
 
         protected PerDLCommand(String name, String description) {
             super(name, description);
@@ -252,9 +252,9 @@ public class DistributedLogTool extends Tool {
             this.force = force;
         }
 
-        protected DistributedLogNamespace getNamespace() throws IOException {
+        protected Namespace getNamespace() throws IOException {
             if (null == this.namespace) {
-                this.namespace = DistributedLogNamespaceBuilder.newBuilder()
+                this.namespace = NamespaceBuilder.newBuilder()
                         .uri(getUri())
                         .conf(getConf())
                         .build();
@@ -464,7 +464,7 @@ public class DistributedLogTool extends Tool {
             return 0;
         }
 
-        protected void printStreams(DistributedLogNamespace namespace) throws 
Exception {
+        protected void printStreams(Namespace namespace) throws Exception {
             Iterator<String> streams = namespace.getLogs();
             System.out.println("Streams under " + getUri() + " : ");
             System.out.println("--------------------------------");
@@ -536,7 +536,7 @@ public class DistributedLogTool extends Tool {
             System.out.println("");
         }
 
-        protected void watchAndReportChanges(DistributedLogNamespace 
namespace) throws Exception {
+        protected void watchAndReportChanges(Namespace namespace) throws 
Exception {
             namespace.registerNamespaceListener(this);
         }
     }
@@ -783,7 +783,7 @@ public class DistributedLogTool extends Tool {
             return truncateStreams(getNamespace());
         }
 
-        private int truncateStreams(final DistributedLogNamespace namespace) 
throws Exception {
+        private int truncateStreams(final Namespace namespace) throws 
Exception {
             Iterator<String> streamCollection = namespace.getLogs();
             final List<String> streams = new ArrayList<String>();
             while (streamCollection.hasNext()) {
@@ -827,7 +827,7 @@ public class DistributedLogTool extends Tool {
             return 0;
         }
 
-        private void truncateStreams(DistributedLogNamespace namespace, 
List<String> streams,
+        private void truncateStreams(Namespace namespace, List<String> streams,
                                      int tid, int numStreamsPerThreads) throws 
IOException {
             int startIdx = tid * numStreamsPerThreads;
             int endIdx = Math.min(streams.size(), (tid + 1) * 
numStreamsPerThreads);
@@ -957,7 +957,7 @@ public class DistributedLogTool extends Tool {
         }
 
         private void printHeader(DistributedLogManager dlm) throws Exception {
-            DLSN firstDlsn = Await.result(dlm.getFirstDLSNAsync());
+            DLSN firstDlsn = FutureUtils.result(dlm.getFirstDLSNAsync());
             boolean endOfStreamMarked = dlm.isEndOfStreamMarked();
             DLSN lastDlsn = dlm.getLastDLSN();
             long firstTxnId = dlm.getFirstTxId();
@@ -1121,7 +1121,7 @@ public class DistributedLogTool extends Tool {
         }
 
         long countToLastRecord(DistributedLogManager dlm) throws Exception {
-            return 
Await.result(dlm.getLogRecordCountAsync(startDLSN)).longValue();
+            return 
FutureUtils.result(dlm.getLogRecordCountAsync(startDLSN)).longValue();
         }
 
         @Override
@@ -1439,7 +1439,7 @@ public class DistributedLogTool extends Tool {
                 AsyncLogReader reader;
                 Object startOffset;
                 try {
-                    DLSN lastDLSN = Await.result(dlm.getLastDLSNAsync());
+                    DLSN lastDLSN = FutureUtils.result(dlm.getLastDLSNAsync());
                     System.out.println("Last DLSN : " + lastDLSN);
                     if (null == fromDLSN) {
                         reader = dlm.getAsyncLogReader(fromTxnId);
@@ -1468,7 +1468,7 @@ public class DistributedLogTool extends Tool {
 
         private void dumpRecords(AsyncLogReader reader) throws Exception {
             int numRead = 0;
-            LogRecord record = Await.result(reader.readNext());
+            LogRecord record = FutureUtils.result(reader.readNext());
             while (record != null) {
                 // dump the record
                 dumpRecord(record);
@@ -1476,7 +1476,7 @@ public class DistributedLogTool extends Tool {
                 if (numRead >= count) {
                     break;
                 }
-                record = Await.result(reader.readNext());
+                record = FutureUtils.result(reader.readNext());
             }
             if (numRead == 0) {
                 System.out.println("No records.");
@@ -2641,18 +2641,18 @@ public class DistributedLogTool extends Tool {
             return truncateStream(getNamespace(), getStreamName(), dlsn);
         }
 
-        private int truncateStream(final DistributedLogNamespace namespace, 
String streamName, DLSN dlsn) throws Exception {
+        private int truncateStream(final Namespace namespace, String 
streamName, DLSN dlsn) throws Exception {
             DistributedLogManager dlm = namespace.openLog(streamName);
             try {
                 long totalRecords = dlm.getLogRecordCount();
-                long recordsAfterTruncate = 
Await.result(dlm.getLogRecordCountAsync(dlsn));
+                long recordsAfterTruncate = 
FutureUtils.result(dlm.getLogRecordCountAsync(dlsn));
                 long recordsToTruncate = totalRecords - recordsAfterTruncate;
                 if (!getForce() && !IOUtils.confirmPrompt("Do you want to 
truncate " + streamName + " at dlsn " + dlsn + " (" + recordsToTruncate + " 
records)?")) {
                     return 0;
                 } else {
                     AsyncLogWriter writer = 
dlm.startAsyncLogSegmentNonPartitioned();
                     try {
-                        if (!Await.result(writer.truncate(dlsn))) {
+                        if (!FutureUtils.result(writer.truncate(dlsn))) {
                             System.out.println("Failed to truncate.");
                         }
                         return 0;
@@ -2764,7 +2764,7 @@ public class DistributedLogTool extends Tool {
             return deleteSubscriber(getNamespace());
         }
 
-        private int deleteSubscriber(final DistributedLogNamespace namespace) 
throws Exception {
+        private int deleteSubscriber(final Namespace namespace) throws 
Exception {
             Iterator<String> streamCollection = namespace.getLogs();
             final List<String> streams = new ArrayList<String>();
             while (streamCollection.hasNext()) {
@@ -2809,7 +2809,7 @@ public class DistributedLogTool extends Tool {
             return 0;
         }
 
-        private void deleteSubscriber(DistributedLogNamespace namespace, 
List<String> streams,
+        private void deleteSubscriber(Namespace namespace, List<String> 
streams,
                                       int tid, int numStreamsPerThreads) 
throws Exception {
             int startIdx = tid * numStreamsPerThreads;
             int endIdx = Math.min(streams.size(), (tid + 1) * 
numStreamsPerThreads);
@@ -2818,7 +2818,7 @@ public class DistributedLogTool extends Tool {
                 DistributedLogManager dlm = namespace.openLog(s);
                 final CountDownLatch countDownLatch = new CountDownLatch(1);
                 dlm.getSubscriptionsStore().deleteSubscriber(subscriberId)
-                    .addEventListener(new FutureEventListener<Boolean>() {
+                    .whenComplete(new FutureEventListener<Boolean>() {
                         @Override
                         public void onFailure(Throwable cause) {
                             System.out.println("Failed to delete subscriber 
for stream " + s);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java
index c0da29a..a9b81e2 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java
@@ -17,12 +17,11 @@
  */
 package org.apache.distributedlog.util;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.io.AsyncCloseable;
 import org.apache.distributedlog.io.AsyncDeleteable;
 import org.apache.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
-
-import java.io.IOException;
 
 /**
  * A common interface to allocate <i>I</i> under transaction <i>T</i>.
@@ -47,7 +46,7 @@ import java.io.IOException;
  * final Transaction<T> txn = ...;
  *
  * // Try obtain object I
- * Future<I> tryObtainFuture = allocator.tryObtain(txn, new OpListener<I>() {
+ * CompletableFuture<I> tryObtainFuture = allocator.tryObtain(txn, new 
OpListener<I>() {
  *     public void onCommit(I resource) {
  *         // the obtain succeed, process with the resource
  *     }
@@ -97,6 +96,6 @@ public interface Allocator<I, T> extends AsyncCloseable, 
AsyncDeleteable {
      *          transaction.
      * @return future result returning <i>I</i> that would be obtained under 
transaction <code>txn</code>.
      */
-    Future<I> tryObtain(Transaction<T> txn, OpListener<I> listener);
+    CompletableFuture<I> tryObtain(Transaction<T> txn, OpListener<I> listener);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java
index 41c2be7..aa7cefe 100644
--- 
a/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java
@@ -18,7 +18,7 @@
 package org.apache.distributedlog.util;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.commons.configuration.Configuration;
 

Reply via email to