http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 1a23228..a8b1f77 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -17,71 +17,39 @@
  */
 package com.twitter.distributedlog;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import 
com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption;
 import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.acl.DefaultAccessControlManager;
-import com.twitter.distributedlog.acl.ZKAccessControlManager;
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
 import com.twitter.distributedlog.callback.NamespaceListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.feature.CoreFeatureKeys;
-import com.twitter.distributedlog.impl.ZKLogMetadataStore;
-import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
-import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
+import com.twitter.distributedlog.injector.AsyncFailureInjector;
+import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.LogMetadataStore;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.util.ConfUtils;
-import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.PermitLimiter;
 import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.distributedlog.util.SimplePermitLimiter;
 import com.twitter.distributedlog.util.Utils;
 import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.SettableFeatureProvider;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.data.Stat;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static com.twitter.distributedlog.impl.BKDLUtils.*;
+import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.WRITER;
+import static com.twitter.distributedlog.util.DLUtils.validateName;
 
 /**
  * BKDistributedLogNamespace is the default implementation of {@link 
DistributedLogNamespace}. It uses
@@ -119,344 +87,57 @@ import static com.twitter.distributedlog.impl.BKDLUtils.*;
 public class BKDistributedLogNamespace implements DistributedLogNamespace {
     static final Logger LOG = 
LoggerFactory.getLogger(BKDistributedLogNamespace.class);
 
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-        private DistributedLogConfiguration _conf = null;
-        private URI _uri = null;
-        private StatsLogger _statsLogger = NullStatsLogger.INSTANCE;
-        private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE;
-        private FeatureProvider _featureProvider = new 
SettableFeatureProvider("", 0);
-        private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID;
-        private int _regionId = DistributedLogConstants.LOCAL_REGION_ID;
-
-        private Builder() {}
-
-        public Builder conf(DistributedLogConfiguration conf) {
-            this._conf = conf;
-            return this;
-        }
-
-        public Builder uri(URI uri) {
-            this._uri = uri;
-            return this;
-        }
-
-        public Builder statsLogger(StatsLogger statsLogger) {
-            this._statsLogger = statsLogger;
-            return this;
-        }
-
-        public Builder perLogStatsLogger(StatsLogger perLogStatsLogger) {
-            this._perLogStatsLogger = perLogStatsLogger;
-            return this;
-        }
-
-        public Builder featureProvider(FeatureProvider featureProvider) {
-            this._featureProvider = featureProvider;
-            return this;
-        }
-
-        public Builder clientId(String clientId) {
-            this._clientId = clientId;
-            return this;
-        }
-
-        public Builder regionId(int regionId) {
-            this._regionId = regionId;
-            return this;
-        }
-
-        @SuppressWarnings("deprecation")
-        public BKDistributedLogNamespace build()
-                throws IOException, NullPointerException, 
IllegalArgumentException {
-            Preconditions.checkNotNull(_conf, "No DistributedLog 
Configuration");
-            Preconditions.checkNotNull(_uri, "No DistributedLog URI");
-            Preconditions.checkNotNull(_featureProvider, "No Feature 
Provider");
-            Preconditions.checkNotNull(_statsLogger, "No Stats Logger");
-            Preconditions.checkNotNull(_featureProvider, "No Feature 
Provider");
-            Preconditions.checkNotNull(_clientId, "No Client ID");
-            // validate conf and uri
-            validateConfAndURI(_conf, _uri);
-
-            // Build the namespace zookeeper client
-            ZooKeeperClientBuilder nsZkcBuilder = createDLZKClientBuilder(
-                    String.format("dlzk:%s:factory_writer_shared", _uri),
-                    _conf,
-                    DLUtils.getZKServersFromDLUri(_uri),
-                    _statsLogger.scope("dlzk_factory_writer_shared"));
-            ZooKeeperClient nsZkc = nsZkcBuilder.build();
-
-            // Resolve namespace binding
-            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(nsZkc, _uri);
-
-            // Backward Compatible to enable per log stats by configuration 
settings
-            StatsLogger perLogStatsLogger = _perLogStatsLogger;
-            if (perLogStatsLogger == NullStatsLogger.INSTANCE &&
-                    _conf.getEnablePerStreamStat()) {
-                perLogStatsLogger = _statsLogger.scope("stream");
-            }
-
-            return new BKDistributedLogNamespace(
-                    _conf,
-                    _uri,
-                    _featureProvider,
-                    _statsLogger,
-                    perLogStatsLogger,
-                    _clientId,
-                    _regionId,
-                    nsZkcBuilder,
-                    nsZkc,
-                    bkdlConfig);
-        }
-    }
-
-    static interface ZooKeeperClientHandler<T> {
-        T handle(ZooKeeperClient zkc) throws IOException;
-    }
-
-    /**
-     * Run given <i>handler</i> by providing an available new zookeeper client
-     *
-     * @param handler
-     *          Handler to process with provided zookeeper client.
-     * @param conf
-     *          Distributedlog Configuration.
-     * @param namespace
-     *          Distributedlog Namespace.
-     */
-    private static <T> T withZooKeeperClient(ZooKeeperClientHandler<T> handler,
-                                             DistributedLogConfiguration conf,
-                                             URI namespace) throws IOException 
{
-        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
-                .name(String.format("dlzk:%s:factory_static", namespace))
-                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                .uri(namespace)
-                .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                .requestRateLimit(conf.getZKRequestRateLimit())
-                .zkAclId(conf.getZkAclId())
-                .build();
-        try {
-            return handler.handle(zkc);
-        } finally {
-            zkc.close();
-        }
-    }
-
-    private static String getHostIpLockClientId() {
-        try {
-            return InetAddress.getLocalHost().toString();
-        } catch(Exception ex) {
-            return DistributedLogConstants.UNKNOWN_CLIENT_ID;
-        }
-    }
-
     private final String clientId;
     private final int regionId;
     private final DistributedLogConfiguration conf;
     private final URI namespace;
-    private final BKDLConfig bkdlConfig;
+    // namespace driver
+    private final NamespaceDriver driver;
+    // resources
     private final OrderedScheduler scheduler;
-    private final OrderedScheduler readAheadExecutor;
-    private final ClientSocketChannelFactory channelFactory;
-    private final HashedWheelTimer requestTimer;
-    // zookeeper clients
-    // NOTE: The actual zookeeper client is initialized lazily when it is 
referenced by
-    //       {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it 
is safe to
-    //       keep builders and their client wrappers here, as they will be 
used when
-    //       instantiating readers or writers.
-    private final ZooKeeperClientBuilder sharedWriterZKCBuilderForDL;
-    private final ZooKeeperClient sharedWriterZKCForDL;
-    private final ZooKeeperClientBuilder sharedReaderZKCBuilderForDL;
-    private final ZooKeeperClient sharedReaderZKCForDL;
-    private ZooKeeperClientBuilder sharedWriterZKCBuilderForBK = null;
-    private ZooKeeperClient sharedWriterZKCForBK = null;
-    private ZooKeeperClientBuilder sharedReaderZKCBuilderForBK = null;
-    private ZooKeeperClient sharedReaderZKCForBK = null;
-    // NOTE: The actual bookkeeper client is initialized lazily when it is 
referenced by
-    //       {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it 
is safe to
-    //       keep builders and their client wrappers here, as they will be 
used when
-    //       instantiating readers or writers.
-    private final BookKeeperClientBuilder sharedWriterBKCBuilder;
-    private final BookKeeperClient writerBKC;
-    private final BookKeeperClientBuilder sharedReaderBKCBuilder;
-    private final BookKeeperClient readerBKC;
-    // ledger allocator
-    private final LedgerAllocator allocator;
-    // access control manager
-    private AccessControlManager accessControlManager;
-    // log metadata store
-    private final LogMetadataStore metadataStore;
+    private final PermitLimiter writeLimiter;
+    private final AsyncFailureInjector failureInjector;
     // log segment metadata store
     private final LogSegmentMetadataCache logSegmentMetadataCache;
-    private final LogStreamMetadataStore writerStreamMetadataStore;
-    private final LogStreamMetadataStore readerStreamMetadataStore;
-
     // feature provider
     private final FeatureProvider featureProvider;
-
     // Stats Loggers
     private final StatsLogger statsLogger;
     private final StatsLogger perLogStatsLogger;
 
-    protected AtomicBoolean closed = new AtomicBoolean(false);
-
-    private final PermitLimiter writeLimiter;
+    protected final AtomicBoolean closed = new AtomicBoolean(false);
 
-    private BKDistributedLogNamespace(
+    public BKDistributedLogNamespace(
             DistributedLogConfiguration conf,
             URI uri,
+            NamespaceDriver driver,
+            OrderedScheduler scheduler,
             FeatureProvider featureProvider,
+            PermitLimiter writeLimiter,
+            AsyncFailureInjector failureInjector,
             StatsLogger statsLogger,
             StatsLogger perLogStatsLogger,
             String clientId,
-            int regionId,
-            ZooKeeperClientBuilder nsZkcBuilder,
-            ZooKeeperClient nsZkc,
-            BKDLConfig bkdlConfig)
-            throws IOException, IllegalArgumentException {
+            int regionId) {
         this.conf = conf;
         this.namespace = uri;
+        this.driver = driver;
+        this.scheduler = scheduler;
         this.featureProvider = featureProvider;
+        this.writeLimiter = writeLimiter;
+        this.failureInjector = failureInjector;
         this.statsLogger = statsLogger;
         this.perLogStatsLogger = perLogStatsLogger;
+        this.clientId = clientId;
         this.regionId = regionId;
-        this.bkdlConfig = bkdlConfig;
-        if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
-            this.clientId = getHostIpLockClientId();
-        } else {
-            this.clientId = clientId;
-        }
-
-        // Build resources
-        StatsLogger schedulerStatsLogger = 
statsLogger.scope("factory").scope("thread_pool");
-        this.scheduler = OrderedScheduler.newBuilder()
-                .name("DLM-" + uri.getPath())
-                .corePoolSize(conf.getNumWorkerThreads())
-                .statsLogger(schedulerStatsLogger)
-                .perExecutorStatsLogger(schedulerStatsLogger)
-                .traceTaskExecution(conf.getEnableTaskExecutionStats())
-                
.traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
-                .build();
-        if (conf.getNumReadAheadWorkerThreads() > 0) {
-            this.readAheadExecutor = OrderedScheduler.newBuilder()
-                    .name("DLM-" + uri.getPath() + "-readahead-executor")
-                    .corePoolSize(conf.getNumReadAheadWorkerThreads())
-                    
.statsLogger(statsLogger.scope("factory").scope("readahead_thread_pool"))
-                    
.traceTaskExecution(conf.getTraceReadAheadDeliveryLatency())
-                    
.traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
-                    .build();
-            LOG.info("Created dedicated readahead executor : threads = {}", 
conf.getNumReadAheadWorkerThreads());
-        } else {
-            this.readAheadExecutor = this.scheduler;
-            LOG.info("Used shared executor for readahead.");
-        }
-
-        this.channelFactory = new NioClientSocketChannelFactory(
-            Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
-            Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
-            conf.getBKClientNumberIOThreads());
-        this.requestTimer = new HashedWheelTimer(
-            new 
ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(),
-            conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
-            conf.getTimeoutTimerNumTicks());
 
-        // Build zookeeper client for writers
-        this.sharedWriterZKCBuilderForDL = nsZkcBuilder;
-        this.sharedWriterZKCForDL = nsZkc;
-
-        // Build zookeeper client for readers
-        if 
(bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader()))
 {
-            this.sharedReaderZKCBuilderForDL = 
this.sharedWriterZKCBuilderForDL;
-        } else {
-            this.sharedReaderZKCBuilderForDL = createDLZKClientBuilder(
-                    String.format("dlzk:%s:factory_reader_shared", namespace),
-                    conf,
-                    bkdlConfig.getDlZkServersForReader(),
-                    statsLogger.scope("dlzk_factory_reader_shared"));
-        }
-        this.sharedReaderZKCForDL = this.sharedReaderZKCBuilderForDL.build();
-
-        // Build bookkeeper client for writers
-        this.sharedWriterBKCBuilder = createBKCBuilder(
-                String.format("bk:%s:factory_writer_shared", namespace),
-                conf,
-                bkdlConfig.getBkZkServersForWriter(),
-                bkdlConfig.getBkLedgersPath(),
-                Optional.of(featureProvider.scope("bkc")));
-        this.writerBKC = this.sharedWriterBKCBuilder.build();
-
-        // Build bookkeeper client for readers
-        if 
(bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader()))
 {
-            this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder;
-        } else {
-            this.sharedReaderBKCBuilder = createBKCBuilder(
-                String.format("bk:%s:factory_reader_shared", namespace),
-                conf,
-                bkdlConfig.getBkZkServersForReader(),
-                bkdlConfig.getBkLedgersPath(),
-                Optional.<FeatureProvider>absent());
-        }
-        this.readerBKC = this.sharedReaderBKCBuilder.build();
-
-        if (conf.getGlobalOutstandingWriteLimit() < 0) {
-            this.writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
-        } else {
-            Feature disableWriteLimitFeature = featureProvider.getFeature(
-                CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
-            this.writeLimiter = new SimplePermitLimiter(
-                conf.getOutstandingWriteLimitDarkmode(),
-                conf.getGlobalOutstandingWriteLimit(),
-                statsLogger.scope("writeLimiter"),
-                true /* singleton */,
-                disableWriteLimitFeature);
-        }
-
-        // propagate bkdlConfig to configuration
-        BKDLConfig.propagateConfiguration(bkdlConfig, conf);
-
-        // Build the allocator
-        if (conf.getEnableLedgerAllocatorPool()) {
-            String allocatorPoolPath = 
validateAndGetFullLedgerAllocatorPoolPath(conf, uri);
-            allocator = 
LedgerAllocatorUtils.createLedgerAllocatorPool(allocatorPoolPath, 
conf.getLedgerAllocatorPoolCoreSize(),
-                    conf, sharedWriterZKCForDL, writerBKC, scheduler);
-            if (null != allocator) {
-                allocator.start();
-            }
-            LOG.info("Created ledger allocator pool under {} with size {}.", 
allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize());
-        } else {
-            allocator = null;
-        }
-
-        // log metadata store
-        if (bkdlConfig.isFederatedNamespace() || 
conf.isFederatedNamespaceEnabled()) {
-            this.metadataStore = new FederatedZKLogMetadataStore(conf, 
namespace, sharedReaderZKCForDL, scheduler);
-        } else {
-            this.metadataStore = new ZKLogMetadataStore(conf, namespace, 
sharedReaderZKCForDL, scheduler);
-        }
-
-        // create log stream metadata store
-        this.writerStreamMetadataStore =
-                new ZKLogStreamMetadataStore(
-                        clientId,
-                        conf,
-                        sharedWriterZKCForDL,
-                        scheduler,
-                        statsLogger);
-        this.readerStreamMetadataStore =
-                new ZKLogStreamMetadataStore(
-                        clientId,
-                        conf,
-                        sharedReaderZKCForDL,
-                        scheduler,
-                        statsLogger);
         // create a log segment metadata cache
         this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, 
Ticker.systemTicker());
+    }
 
-        LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, 
regionId = {}, federated = {}.",
-                new Object[] { clientId, regionId, 
bkdlConfig.isFederatedNamespace() });
+    @Override
+    public NamespaceDriver getNamespaceDriver() {
+        return driver;
     }
 
     //
@@ -468,8 +149,8 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             throws InvalidStreamNameException, IOException {
         checkState();
         validateName(logName);
-        URI uri = FutureUtils.result(metadataStore.createLog(logName));
-        FutureUtils.result(writerStreamMetadataStore.getLog(uri, logName, 
true, true));
+        URI uri = 
FutureUtils.result(driver.getLogMetadataStore().createLog(logName));
+        
FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, 
logName, true, true));
     }
 
     @Override
@@ -477,17 +158,15 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             throws InvalidStreamNameException, LogNotFoundException, 
IOException {
         checkState();
         validateName(logName);
-        Optional<URI> uri = 
FutureUtils.result(metadataStore.getLogLocation(logName));
+        Optional<URI> uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
         if (!uri.isPresent()) {
             throw new LogNotFoundException("Log " + logName + " isn't found.");
         }
-        DistributedLogManager dlm = createDistributedLogManager(
+        DistributedLogManager dlm = openLogInternal(
                 uri.get(),
                 logName,
-                ClientSharingOption.SharedClients,
                 Optional.<DistributedLogConfiguration>absent(),
-                Optional.<DynamicDistributedLogConfiguration>absent(),
-                Optional.<StatsLogger>absent());
+                Optional.<DynamicDistributedLogConfiguration>absent());
         dlm.delete();
     }
 
@@ -508,27 +187,26 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             throws InvalidStreamNameException, IOException {
         checkState();
         validateName(logName);
-        Optional<URI> uri = 
FutureUtils.result(metadataStore.getLogLocation(logName));
+        Optional<URI> uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
         if (!uri.isPresent()) {
             throw new LogNotFoundException("Log " + logName + " isn't found.");
         }
-        return createDistributedLogManager(
+        return openLogInternal(
                 uri.get(),
                 logName,
-                ClientSharingOption.SharedClients,
                 logConf,
-                dynamicLogConf,
-                perStreamStatsLogger);
+                dynamicLogConf);
     }
 
     @Override
     public boolean logExists(String logName)
         throws IOException, IllegalArgumentException {
         checkState();
-        Optional<URI> uri = 
FutureUtils.result(metadataStore.getLogLocation(logName));
+        Optional<URI> uri = 
FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
         if (uri.isPresent()) {
             try {
-                
FutureUtils.result(writerStreamMetadataStore.logExists(uri.get(), logName));
+                FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+                        .logExists(uri.get(), logName));
                 return true;
             } catch (LogNotFoundException lnfe) {
                 return false;
@@ -541,240 +219,18 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
     @Override
     public Iterator<String> getLogs() throws IOException {
         checkState();
-        return FutureUtils.result(metadataStore.getLogs());
+        return FutureUtils.result(driver.getLogMetadataStore().getLogs());
     }
 
     @Override
     public void registerNamespaceListener(NamespaceListener listener) {
-        metadataStore.registerNamespaceListener(listener);
+        driver.getLogMetadataStore().registerNamespaceListener(listener);
     }
 
     @Override
     public synchronized AccessControlManager createAccessControlManager() 
throws IOException {
         checkState();
-        if (null == accessControlManager) {
-            String aclRootPath = bkdlConfig.getACLRootPath();
-            // Build the access control manager
-            if (aclRootPath == null) {
-                accessControlManager = DefaultAccessControlManager.INSTANCE;
-                LOG.info("Created default access control manager for {}", 
namespace);
-            } else {
-                if (!isReservedStreamName(aclRootPath)) {
-                    throw new IOException("Invalid Access Control List Root 
Path : " + aclRootPath);
-                }
-                String zkRootPath = namespace.getPath() + "/" + aclRootPath;
-                LOG.info("Creating zk based access control manager @ {} for 
{}",
-                        zkRootPath, namespace);
-                accessControlManager = new ZKAccessControlManager(conf, 
sharedReaderZKCForDL,
-                        zkRootPath, scheduler);
-                LOG.info("Created zk based access control manager @ {} for {}",
-                        zkRootPath, namespace);
-            }
-        }
-        return accessControlManager;
-    }
-
-    //
-    // Legacy methods
-    //
-
-    static String 
validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI 
uri) throws IOException {
-        String poolPath = conf.getLedgerAllocatorPoolPath();
-        LOG.info("PoolPath is {}", poolPath);
-        if (null == poolPath || !poolPath.startsWith(".") || 
poolPath.endsWith("/")) {
-            LOG.error("Invalid ledger allocator pool path specified when 
enabling ledger allocator pool : {}", poolPath);
-            throw new IOException("Invalid ledger allocator pool path 
specified : " + poolPath);
-        }
-        String poolName = conf.getLedgerAllocatorPoolName();
-        if (null == poolName) {
-            LOG.error("No ledger allocator pool name specified when enabling 
ledger allocator pool.");
-            throw new IOException("No ledger allocator name specified when 
enabling ledger allocator pool.");
-        }
-        String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName;
-        try {
-            PathUtils.validatePath(rootPath);
-        } catch (IllegalArgumentException iae) {
-            LOG.error("Invalid ledger allocator pool path specified when 
enabling ledger allocator pool : {}", poolPath);
-            throw new IOException("Invalid ledger allocator pool path 
specified : " + poolPath);
-        }
-        return rootPath;
-    }
-
-    public static ZooKeeperClientBuilder createDLZKClientBuilder(String 
zkcName,
-                                                                 
DistributedLogConfiguration conf,
-                                                                 String 
zkServers,
-                                                                 StatsLogger 
statsLogger) {
-        RetryPolicy retryPolicy = null;
-        if (conf.getZKNumRetries() > 0) {
-            retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                conf.getZKRetryBackoffStartMillis(),
-                conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
-        }
-        ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
-            .name(zkcName)
-            .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-            .retryThreadCount(conf.getZKClientNumberRetryThreads())
-            .requestRateLimit(conf.getZKRequestRateLimit())
-            .zkServers(zkServers)
-            .retryPolicy(retryPolicy)
-            .statsLogger(statsLogger)
-            .zkAclId(conf.getZkAclId());
-        LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, 
numRetries = {}, sessionTimeout = {}, retryBackoff = {},"
-                + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { 
zkcName, zkServers, conf.getZKNumRetries(),
-                conf.getZKSessionTimeoutMilliseconds(), 
conf.getZKRetryBackoffStartMillis(),
-                conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() });
-        return builder;
-    }
-
-    private static ZooKeeperClientBuilder createBKZKClientBuilder(String 
zkcName,
-                                                                  
DistributedLogConfiguration conf,
-                                                                  String 
zkServers,
-                                                                  StatsLogger 
statsLogger) {
-        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                    conf.getBKClientZKRetryBackoffStartMillis(),
-                    conf.getBKClientZKRetryBackoffMaxMillis(),
-                    conf.getBKClientZKNumRetries());
-        ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
-                .name(zkcName)
-                
.sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds())
-                .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                .requestRateLimit(conf.getBKClientZKRequestRateLimit())
-                .zkServers(zkServers)
-                .retryPolicy(retryPolicy)
-                .statsLogger(statsLogger)
-                .zkAclId(conf.getZkAclId());
-        LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, 
numRetries = {}, sessionTimeout = {}, retryBackoff = {},"
-                + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { 
zkcName, zkServers, conf.getBKClientZKNumRetries(),
-                conf.getBKClientZKSessionTimeoutMilliSeconds(), 
conf.getBKClientZKRetryBackoffStartMillis(),
-                conf.getBKClientZKRetryBackoffMaxMillis(), conf.getZkAclId() 
});
-        return builder;
-    }
-
-    private BookKeeperClientBuilder createBKCBuilder(String bkcName,
-                                                     
DistributedLogConfiguration conf,
-                                                     String zkServers,
-                                                     String ledgersPath,
-                                                     Optional<FeatureProvider> 
featureProviderOptional) {
-        BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder()
-                .name(bkcName)
-                .dlConfig(conf)
-                .zkServers(zkServers)
-                .ledgersPath(ledgersPath)
-                .channelFactory(channelFactory)
-                .requestTimer(requestTimer)
-                .featureProvider(featureProviderOptional)
-                .statsLogger(statsLogger);
-        LOG.info("Created shared client builder {} : zkServers = {}, 
ledgersPath = {}, numIOThreads = {}",
-                new Object[] { bkcName, zkServers, ledgersPath, 
conf.getBKClientNumberIOThreads() });
-        return builder;
-    }
-
-    @VisibleForTesting
-    public ZooKeeperClient getSharedWriterZKCForDL() {
-        return sharedWriterZKCForDL;
-    }
-
-    @VisibleForTesting
-    public BookKeeperClient getReaderBKC() {
-        return readerBKC;
-    }
-
-    @VisibleForTesting
-    public LogStreamMetadataStore getWriterStreamMetadataStore() {
-        return writerStreamMetadataStore;
-    }
-
-    @VisibleForTesting
-    public LedgerAllocator getLedgerAllocator() {
-        return allocator;
-    }
-
-    /**
-     * Run given <i>handler</i> by providing an available zookeeper client.
-     *
-     * @param handler
-     *          Handler to process with provided zookeeper client.
-     * @return result processed by handler.
-     * @throws IOException
-     */
-    private <T> T withZooKeeperClient(ZooKeeperClientHandler<T> handler) 
throws IOException {
-        checkState();
-        return handler.handle(sharedWriterZKCForDL);
-    }
-
-    /**
-     * Create a DistributedLogManager for <i>nameOfLogStream</i>, with default 
shared clients.
-     *
-     * @param nameOfLogStream
-     *          name of log stream
-     * @return distributedlog manager
-     * @throws 
com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name 
is invalid
-     * @throws IOException
-     */
-    public DistributedLogManager 
createDistributedLogManagerWithSharedClients(String nameOfLogStream)
-        throws InvalidStreamNameException, IOException {
-        return createDistributedLogManager(nameOfLogStream, 
ClientSharingOption.SharedClients);
-    }
-
-    /**
-     * Create a DistributedLogManager for <i>nameOfLogStream</i>, with 
specified client sharing options.
-     *
-     * @param nameOfLogStream
-     *          name of log stream.
-     * @param clientSharingOption
-     *          specifies if the ZK/BK clients are shared
-     * @return distributedlog manager instance.
-     * @throws 
com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name 
is invalid
-     * @throws IOException
-     */
-    public DistributedLogManager createDistributedLogManager(
-            String nameOfLogStream,
-            ClientSharingOption clientSharingOption)
-        throws InvalidStreamNameException, IOException {
-        Optional<DistributedLogConfiguration> logConfiguration = 
Optional.absent();
-        Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration = 
Optional.absent();
-        return createDistributedLogManager(
-                nameOfLogStream,
-                clientSharingOption,
-                logConfiguration,
-                dynamicLogConfiguration);
-    }
-
-    /**
-     * Create a DistributedLogManager for <i>nameOfLogStream</i>, with 
specified client sharing options.
-     * Override whitelisted stream-level configuration settings with settings 
found in
-     * <i>logConfiguration</i>.
-     *
-     *
-     * @param nameOfLogStream
-     *          name of log stream.
-     * @param clientSharingOption
-     *          specifies if the ZK/BK clients are shared
-     * @param logConfiguration
-     *          stream configuration overrides.
-     * @param dynamicLogConfiguration
-     *          dynamic stream configuration overrides.
-     * @return distributedlog manager instance.
-     * @throws 
com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name 
is invalid
-     * @throws IOException
-     */
-    public DistributedLogManager createDistributedLogManager(
-            String nameOfLogStream,
-            ClientSharingOption clientSharingOption,
-            Optional<DistributedLogConfiguration> logConfiguration,
-            Optional<DynamicDistributedLogConfiguration> 
dynamicLogConfiguration)
-        throws InvalidStreamNameException, IOException {
-        if (bkdlConfig.isFederatedNamespace()) {
-            throw new UnsupportedOperationException("Use 
DistributedLogNamespace methods for federated namespace");
-        }
-        return createDistributedLogManager(
-                namespace,
-                nameOfLogStream,
-                clientSharingOption,
-                logConfiguration,
-                dynamicLogConfiguration,
-                Optional.<StatsLogger>absent()
-        );
+        return driver.getAccessControlManager();
     }
 
     /**
@@ -784,8 +240,6 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
      *          location to store the log
      * @param nameOfLogStream
      *          name of the log
-     * @param clientSharingOption
-     *          client sharing option
      * @param logConfiguration
      *          optional stream configuration
      * @param dynamicLogConfiguration
@@ -794,13 +248,11 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
      * @throws InvalidStreamNameException if the stream name is invalid
      * @throws IOException
      */
-    protected DistributedLogManager createDistributedLogManager(
+    protected DistributedLogManager openLogInternal(
             URI uri,
             String nameOfLogStream,
-            ClientSharingOption clientSharingOption,
             Optional<DistributedLogConfiguration> logConfiguration,
-            Optional<DynamicDistributedLogConfiguration> 
dynamicLogConfiguration,
-            Optional<StatsLogger> perStreamStatsLogger)
+            Optional<DynamicDistributedLogConfiguration> 
dynamicLogConfiguration)
         throws InvalidStreamNameException, IOException {
         // Make sure the name is well formed
         checkState();
@@ -817,172 +269,34 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
             dynConf = ConfUtils.getConstDynConf(mergedConfiguration);
         }
 
-        ZooKeeperClientBuilder writerZKCBuilderForDL = null;
-        ZooKeeperClientBuilder readerZKCBuilderForDL = null;
-        ZooKeeperClient writerZKCForBK = null;
-        ZooKeeperClient readerZKCForBK = null;
-        BookKeeperClientBuilder writerBKCBuilder = null;
-        BookKeeperClientBuilder readerBKCBuilder = null;
-
-        switch(clientSharingOption) {
-            case SharedClients:
-                writerZKCBuilderForDL = sharedWriterZKCBuilderForDL;
-                readerZKCBuilderForDL = sharedReaderZKCBuilderForDL;
-                writerBKCBuilder = sharedWriterBKCBuilder;
-                readerBKCBuilder = sharedReaderBKCBuilder;
-                break;
-            case SharedZKClientPerStreamBKClient:
-                writerZKCBuilderForDL = sharedWriterZKCBuilderForDL;
-                readerZKCBuilderForDL = sharedReaderZKCBuilderForDL;
-                synchronized (this) {
-                    if (null == this.sharedWriterZKCForBK) {
-                        this.sharedWriterZKCBuilderForBK = 
createBKZKClientBuilder(
-                            String.format("bkzk:%s:factory_writer_shared", 
uri),
-                            mergedConfiguration,
-                            bkdlConfig.getBkZkServersForWriter(),
-                            statsLogger.scope("bkzk_factory_writer_shared"));
-                        this.sharedWriterZKCForBK = 
this.sharedWriterZKCBuilderForBK.build();
-                    }
-                    if (null == this.sharedReaderZKCForBK) {
-                        if 
(bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader()))
 {
-                            this.sharedReaderZKCBuilderForBK = 
this.sharedWriterZKCBuilderForBK;
-                        } else {
-                            this.sharedReaderZKCBuilderForBK = 
createBKZKClientBuilder(
-                                String.format("bkzk:%s:factory_reader_shared", 
uri),
-                                mergedConfiguration,
-                                bkdlConfig.getBkZkServersForReader(),
-                                
statsLogger.scope("bkzk_factory_reader_shared"));
-                        }
-                        this.sharedReaderZKCForBK = 
this.sharedReaderZKCBuilderForBK.build();
-                    }
-                    writerZKCForBK = this.sharedWriterZKCForBK;
-                    readerZKCForBK = this.sharedReaderZKCForBK;
-                }
-                break;
-        }
-
-        LedgerAllocator dlmLedgerAlloctor = null;
-        if (ClientSharingOption.SharedClients == clientSharingOption) {
-            dlmLedgerAlloctor = this.allocator;
-        }
-        // if there's a specified perStreamStatsLogger, user it, otherwise use 
the default one.
-        StatsLogger perLogStatsLogger = 
perStreamStatsLogger.or(this.perLogStatsLogger);
-
         return new BKDistributedLogManager(
                 nameOfLogStream,                    /* Log Name */
                 mergedConfiguration,                /* Configuration */
                 dynConf,                            /* Dynamic Configuration */
-                uri,                                /* Namespace */
-                writerZKCBuilderForDL,              /* ZKC Builder for DL 
Writer */
-                readerZKCBuilderForDL,              /* ZKC Builder for DL 
Reader */
-                writerZKCForBK,                     /* ZKC for BookKeeper for 
DL Writers */
-                readerZKCForBK,                     /* ZKC for BookKeeper for 
DL Readers */
-                writerBKCBuilder,                   /* BookKeeper Builder for 
DL Writers */
-                readerBKCBuilder,                   /* BookKeeper Builder for 
DL Readers */
-                writerStreamMetadataStore,         /* Log Segment Metadata 
Store for DL Writers */
-                readerStreamMetadataStore,         /* Log Segment Metadata 
Store for DL Readers */
+                uri,                                /* Namespace URI */
+                driver,                             /* Namespace Driver */
                 logSegmentMetadataCache,            /* Log Segment Metadata 
Cache */
                 scheduler,                          /* DL scheduler */
-                readAheadExecutor,                  /* Read Aheader Executor */
-                channelFactory,                     /* Netty Channel Factory */
-                requestTimer,                       /* Request Timer */
                 clientId,                           /* Client Id */
                 regionId,                           /* Region Id */
-                dlmLedgerAlloctor,                  /* Ledger Allocator */
                 writeLimiter,                       /* Write Limiter */
                 featureProvider.scope("dl"),        /* Feature Provider */
+                failureInjector,                    /* Failure Injector */
                 statsLogger,                        /* Stats Logger */
-                perLogStatsLogger                   /* Per Log Stats Logger */
+                perLogStatsLogger,                  /* Per Log Stats Logger */
+                Optional.<AsyncCloseable>absent()   /* shared resources, we 
don't need to close any resources in dlm */
         );
     }
 
-    public MetadataAccessor createMetadataAccessor(String nameOfMetadataNode)
-            throws InvalidStreamNameException, IOException {
-        if (bkdlConfig.isFederatedNamespace()) {
-            throw new UnsupportedOperationException("Use 
DistributedLogNamespace methods for federated namespace");
-        }
-        checkState();
-        validateName(nameOfMetadataNode);
-        return new ZKMetadataAccessor(nameOfMetadataNode, conf, namespace,
-                sharedWriterZKCBuilderForDL, sharedReaderZKCBuilderForDL, 
statsLogger);
-    }
-
-    public Collection<String> enumerateAllLogsInNamespace()
-        throws IOException, IllegalArgumentException {
-        if (bkdlConfig.isFederatedNamespace()) {
-            throw new UnsupportedOperationException("Use 
DistributedLogNamespace methods for federated namespace");
-        }
-        return Sets.newHashSet(getLogs());
-    }
-
-    public Map<String, byte[]> enumerateLogsWithMetadataInNamespace()
-        throws IOException, IllegalArgumentException {
-        if (bkdlConfig.isFederatedNamespace()) {
-            throw new UnsupportedOperationException("Use 
DistributedLogNamespace methods for federated namespace");
-        }
-        return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, 
byte[]>>() {
-            @Override
-            public Map<String, byte[]> handle(ZooKeeperClient zkc) throws 
IOException {
-                return enumerateLogsWithMetadataInternal(zkc, conf, namespace);
-            }
-        });
-    }
-
-    private static void validateInput(DistributedLogConfiguration conf, URI 
uri, String nameOfStream)
-        throws IllegalArgumentException, InvalidStreamNameException {
-        validateConfAndURI(conf, uri);
-        validateName(nameOfStream);
-    }
-
-    public static Map<String, byte[]> 
enumerateLogsWithMetadataInNamespace(final DistributedLogConfiguration conf, 
final URI uri)
-        throws IOException, IllegalArgumentException {
-        return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, 
byte[]>>() {
-            @Override
-            public Map<String, byte[]> handle(ZooKeeperClient zkc) throws 
IOException {
-                return enumerateLogsWithMetadataInternal(zkc, conf, uri);
-            }
-        }, conf, uri);
-    }
-
-    private static Map<String, byte[]> 
enumerateLogsWithMetadataInternal(ZooKeeperClient zkc,
-                                                                         
DistributedLogConfiguration conf, URI uri)
-        throws IOException, IllegalArgumentException {
-        validateConfAndURI(conf, uri);
-        String namespaceRootPath = uri.getPath();
-        HashMap<String, byte[]> result = new HashMap<String, byte[]>();
-        try {
-            ZooKeeper zk = Utils.sync(zkc, namespaceRootPath);
-            Stat currentStat = zk.exists(namespaceRootPath, false);
-            if (currentStat == null) {
-                return result;
-            }
-            List<String> children = zk.getChildren(namespaceRootPath, false);
-            for(String child: children) {
-                if (isReservedStreamName(child)) {
-                    continue;
-                }
-                String zkPath = String.format("%s/%s", namespaceRootPath, 
child);
-                currentStat = zk.exists(zkPath, false);
-                if (currentStat == null) {
-                    result.put(child, new byte[0]);
-                } else {
-                    result.put(child, zk.getData(zkPath, false, currentStat));
-                }
-            }
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while deleting " + namespaceRootPath, ie);
-            throw new IOException("Interrupted while reading " + 
namespaceRootPath, ie);
-        } catch (KeeperException ke) {
-            LOG.error("Error reading" + namespaceRootPath + "entry in 
zookeeper", ke);
-            throw new IOException("Error reading" + namespaceRootPath + "entry 
in zookeeper", ke);
-        }
-        return result;
-    }
-
+    /**
+     * Check the namespace state.
+     *
+     * @throws IOException
+     */
     private void checkState() throws IOException {
         if (closed.get()) {
-            LOG.error("BKDistributedLogNamespace {} is already closed", 
namespace);
-            throw new AlreadyClosedException("Namespace " + namespace + " is 
already closed");
+            LOG.error("BK namespace {} is already closed", namespace);
+            throw new AlreadyClosedException("BK namespace " + namespace + " 
is already closed");
         }
     }
 
@@ -991,58 +305,16 @@ public class BKDistributedLogNamespace implements 
DistributedLogNamespace {
      */
     @Override
     public void close() {
-        ZooKeeperClient writerZKC;
-        ZooKeeperClient readerZKC;
-        AccessControlManager acm;
-        if (closed.compareAndSet(false, true)) {
-            writerZKC = sharedWriterZKCForBK;
-            readerZKC = sharedReaderZKCForBK;
-            acm = accessControlManager;
-        } else {
+        if (!closed.compareAndSet(false, true)) {
             return;
         }
-        if (null != acm) {
-            acm.close();
-            LOG.info("Access Control Manager Stopped.");
-        }
-
-        // Close the allocator
-        if (null != allocator) {
-            Utils.closeQuietly(allocator);
-            LOG.info("Ledger Allocator stopped.");
-        }
-
+        // shutdown the driver
+        Utils.close(driver);
+        // close the write limiter
         this.writeLimiter.close();
-
-        // Shutdown log segment metadata stores
-        Utils.close(writerStreamMetadataStore);
-        Utils.close(readerStreamMetadataStore);
-
         // Shutdown the schedulers
         SchedulerUtils.shutdownScheduler(scheduler, 
conf.getSchedulerShutdownTimeoutMs(),
-            TimeUnit.MILLISECONDS);
-        LOG.info("Executor Service Stopped.");
-        if (scheduler != readAheadExecutor) {
-            SchedulerUtils.shutdownScheduler(readAheadExecutor, 
conf.getSchedulerShutdownTimeoutMs(),
                 TimeUnit.MILLISECONDS);
-            LOG.info("ReadAhead Executor Service Stopped.");
-        }
-
-        writerBKC.close();
-        readerBKC.close();
-        sharedWriterZKCForDL.close();
-        sharedReaderZKCForDL.close();
-
-        // Close shared zookeeper clients for bk
-        if (null != writerZKC) {
-            writerZKC.close();
-        }
-        if (null != readerZKC) {
-            readerZKC.close();
-        }
-        channelFactory.releaseExternalResources();
-        LOG.info("Release external resources used by channel factory.");
-        requestTimer.stop();
-        LOG.info("Stopped request timer");
+        LOG.info("Executor Service Stopped.");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 25b25e2..2486297 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -20,7 +20,6 @@ package com.twitter.distributedlog;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
-import com.twitter.distributedlog.bk.LedgerAllocator;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLIllegalStateException;
 import com.twitter.distributedlog.exceptions.EndOfStreamException;
@@ -29,8 +28,8 @@ import 
com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
 import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.GetLastTxIdFunction;
-import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
 import com.twitter.distributedlog.metadata.LogMetadataForWriter;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
@@ -41,6 +40,7 @@ import 
com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy;
 import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
 import com.twitter.distributedlog.metadata.MetadataUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import com.twitter.distributedlog.util.Allocator;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FailpointUtils;
 import com.twitter.distributedlog.util.FutureUtils;
@@ -53,7 +53,6 @@ import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -88,9 +87,22 @@ import static 
com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_F
 class BKLogWriteHandler extends BKLogHandler {
     static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
 
+    private static Transaction.OpListener<LogSegmentEntryWriter> 
NULL_OP_LISTENER =
+            new Transaction.OpListener<LogSegmentEntryWriter>() {
+        @Override
+        public void onCommit(LogSegmentEntryWriter r) {
+            // no-op
+        }
+
+        @Override
+        public void onAbort(Throwable t) {
+            // no-op
+        }
+    };
+
     protected final LogMetadataForWriter logMetadataForWriter;
+    protected final Allocator<LogSegmentEntryWriter, Object> 
logSegmentAllocator;
     protected final DistributedLock lock;
-    protected final LedgerAllocator ledgerAllocator;
     protected final MaxTxId maxTxId;
     protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
     protected final boolean validateLogSegmentSequenceNumber;
@@ -154,7 +166,7 @@ class BKLogWriteHandler extends BKLogHandler {
                       LogSegmentMetadataCache metadataCache,
                       LogSegmentEntryStore entryStore,
                       OrderedScheduler scheduler,
-                      LedgerAllocator allocator,
+                      Allocator<LogSegmentEntryWriter, Object> 
segmentAllocator,
                       StatsLogger statsLogger,
                       StatsLogger perLogStatsLogger,
                       AlertStatsLogger alertStatsLogger,
@@ -174,11 +186,11 @@ class BKLogWriteHandler extends BKLogHandler {
                 alertStatsLogger,
                 clientId);
         this.logMetadataForWriter = logMetadata;
+        this.logSegmentAllocator = segmentAllocator;
         this.perLogStatsLogger = perLogStatsLogger;
         this.writeLimiter = writeLimiter;
         this.featureProvider = featureProvider;
         this.dynConf = dynConf;
-        this.ledgerAllocator = allocator;
         this.lock = lock;
         this.metadataUpdater = 
LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
 
@@ -523,7 +535,7 @@ class BKLogWriteHandler extends BKLogHandler {
         }
 
         try {
-            ledgerAllocator.allocate();
+            logSegmentAllocator.allocate();
         } catch (IOException e) {
             // failed to issue an allocation request
             failStartLogSegment(promise, bestEffort, e);
@@ -541,25 +553,16 @@ class BKLogWriteHandler extends BKLogHandler {
             return;
         }
 
-        ledgerAllocator.tryObtain(txn, new 
Transaction.OpListener<LedgerHandle>() {
-            @Override
-            public void onCommit(LedgerHandle lh) {
-                // no-op
-            }
-
-            @Override
-            public void onAbort(Throwable t) {
-                // no-op
-            }
-        }).addEventListener(new FutureEventListener<LedgerHandle>() {
+        logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER)
+                .addEventListener(new 
FutureEventListener<LogSegmentEntryWriter>() {
 
             @Override
-            public void onSuccess(LedgerHandle lh) {
+            public void onSuccess(LogSegmentEntryWriter entryWriter) {
                 // try-obtain succeed
                 createInprogressLogSegment(
                         txn,
                         txId,
-                        lh,
+                        entryWriter,
                         bestEffort,
                         promise);
             }
@@ -586,7 +589,7 @@ class BKLogWriteHandler extends BKLogHandler {
     // just leak from the allocation pool - hence cause "No Ledger Allocator"
     private void createInprogressLogSegment(Transaction<Object> txn,
                                             final long txId,
-                                            final LedgerHandle lh,
+                                            final LogSegmentEntryWriter 
entryWriter,
                                             boolean bestEffort,
                                             final Promise<BKLogSegmentWriter> 
promise) {
         final long logSegmentSeqNo;
@@ -601,13 +604,15 @@ class BKLogWriteHandler extends BKLogHandler {
             return;
         }
 
-        final String inprogressZnodePath = inprogressZNode(lh.getId(), txId, 
logSegmentSeqNo);
+        final String inprogressZnodePath = inprogressZNode(
+                entryWriter.getLogSegmentId(), txId, logSegmentSeqNo);
         final LogSegmentMetadata l =
             new 
LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
-                conf.getDLLedgerMetadataLayoutVersion(), lh.getId(), txId)
+                conf.getDLLedgerMetadataLayoutVersion(), 
entryWriter.getLogSegmentId(), txId)
                     .setLogSegmentSequenceNo(logSegmentSeqNo)
                     .setRegionId(regionId)
-                    
.setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion()))
+                    .setEnvelopeEntries(
+                            
LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion()))
                     .build();
 
         // Create an inprogress segment
@@ -631,7 +636,7 @@ class BKLogWriteHandler extends BKLogHandler {
                             l.getSegmentName(),
                             conf,
                             conf.getDLLedgerMetadataLayoutVersion(),
-                            new BKLogSegmentEntryWriter(lh),
+                            entryWriter,
                             lock,
                             txId,
                             logSegmentSeqNo,
@@ -1268,8 +1273,7 @@ class BKLogWriteHandler extends BKLogHandler {
     public Future<Void> asyncClose() {
         return Utils.closeSequence(scheduler,
                 lock,
-                ledgerAllocator
-        );
+                logSegmentAllocator);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
index c39ae4c..8d3c418 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
@@ -250,15 +250,22 @@ public class BookKeeperClient {
         return promise;
     }
 
-    public synchronized void close() {
-        if (closed) {
-            return;
+    public void close() {
+        BookKeeper bkcToClose;
+        ZooKeeperClient zkcToClose;
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            bkcToClose = bkc;
+            zkcToClose = zkc;
         }
 
         LOG.info("BookKeeper Client closed {}", name);
-        if (null != bkc) {
+        if (null != bkcToClose) {
             try {
-                bkc.close();
+                bkcToClose.close();
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted on closing bookkeeper client {} : ", 
name, e);
                 Thread.currentThread().interrupt();
@@ -266,12 +273,11 @@ public class BookKeeperClient {
                 LOG.warn("Error on closing bookkeeper client {} : ", name, e);
             }
         }
-        if (null != zkc) {
+        if (null != zkcToClose) {
             if (ownZK) {
-                zkc.close();
+                zkcToClose.close();
             }
         }
-        closed = true;
     }
 
     public synchronized void checkClosedOrInError() throws 
AlreadyClosedException {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index 6c6017e..6da4b8d 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -1301,6 +1301,7 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
      * @return number of dedicated readahead worker threads.
      * @see #getNumWorkerThreads()
      */
+    @Deprecated
     public int getNumReadAheadWorkerThreads() {
         return getInt(BKDL_NUM_READAHEAD_WORKER_THREADS, 0);
     }
@@ -1313,6 +1314,7 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
      * @return configuration
      * @see #getNumReadAheadWorkerThreads()
      */
+    @Deprecated
     public DistributedLogConfiguration setNumReadAheadWorkerThreads(int 
numWorkerThreads) {
         setProperty(BKDL_NUM_READAHEAD_WORKER_THREADS, numWorkerThreads);
         return this;
@@ -3515,8 +3517,11 @@ public class DistributedLogConfiguration extends 
CompositeConfiguration {
         Preconditions.checkArgument(getBKClientReadTimeout() * 1000 >= 
getReadLACLongPollTimeout(),
             "Invalid timeout configuration: bkcReadTimeoutSeconds 
("+getBKClientReadTimeout()+
                 ") should be longer than readLACLongPollTimeout 
("+getReadLACLongPollTimeout()+")");
-        Preconditions.checkArgument(getReaderIdleWarnThresholdMillis() > 2 * 
getReadLACLongPollTimeout(),
-            "Invalid configuration: ReaderIdleWarnThreshold should be 2x 
larget than readLACLongPollTimeout");
+        long readerIdleWarnThresholdMs = getReaderIdleWarnThresholdMillis();
+        if (readerIdleWarnThresholdMs > 0) { // NOTE: some test cases set the 
idle warn threshold to 0
+            Preconditions.checkArgument(readerIdleWarnThresholdMs > 2 * 
getReadLACLongPollTimeout(),
+                    "Invalid configuration: ReaderIdleWarnThreshold should be 
2x larget than readLACLongPollTimeout");
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
index ccb0778..34cfb65 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
@@ -18,9 +18,13 @@
 package com.twitter.distributedlog;
 
 import com.twitter.distributedlog.callback.LogSegmentListener;
+import com.twitter.distributedlog.io.AsyncCloseable;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.subscription.SubscriptionStateStore;
 import com.twitter.distributedlog.subscription.SubscriptionsStore;
 import com.twitter.util.Future;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
@@ -31,7 +35,20 @@ import java.util.List;
  * each conceptual place of storage corresponds to exactly one instance of
  * this class, which is created when the EditLog is first opened.
  */
-public interface DistributedLogManager extends MetadataAccessor {
+public interface DistributedLogManager extends AsyncCloseable, Closeable {
+
+    /**
+     * Get the name of the stream managed by this log manager
+     * @return streamName
+     */
+    public String getStreamName();
+
+    /**
+     * Get the namespace driver used by this manager.
+     *
+     * @return the namespace driver
+     */
+    public NamespaceDriver getNamespaceDriver();
 
     /**
      * Get log segments.
@@ -282,15 +299,6 @@ public interface DistributedLogManager extends 
MetadataAccessor {
     public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
 
     /**
-     * Get the subscription state storage provided by the distributed log 
manager
-     *
-     * @param subscriberId - Application specific Id associated with the 
subscriber
-     * @return Subscription state store
-     */
-    @Deprecated
-    public SubscriptionStateStore getSubscriptionStateStore(String 
subscriberId);
-
-    /**
      * Get the subscriptions store provided by the distributedlog manager.
      *
      * @return subscriptions store manages subscriptions for current stream.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java
deleted file mode 100644
index 4caeeba..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Optional;
-import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * This is the legacy way to access bookkeeper based distributedlog namespace.
- * Use {@link DistributedLogNamespace} to manage logs instead if you could.
- */
-@Deprecated
-public class DistributedLogManagerFactory {
-    static final Logger LOG = 
LoggerFactory.getLogger(DistributedLogManagerFactory.class);
-
-    public static enum ClientSharingOption {
-        PerStreamClients,
-        SharedZKClientPerStreamBKClient,
-        SharedClients
-    }
-
-    private final BKDistributedLogNamespace namespace;
-
-    public DistributedLogManagerFactory(DistributedLogConfiguration conf, URI 
uri)
-            throws IOException, IllegalArgumentException {
-        this(conf, uri, NullStatsLogger.INSTANCE);
-    }
-
-    public DistributedLogManagerFactory(DistributedLogConfiguration conf, URI 
uri,
-                                        StatsLogger statsLogger)
-            throws IOException, IllegalArgumentException {
-        this(conf,
-             uri,
-             statsLogger,
-             DistributedLogConstants.UNKNOWN_CLIENT_ID,
-             DistributedLogConstants.LOCAL_REGION_ID);
-    }
-
-    public DistributedLogManagerFactory(DistributedLogConfiguration conf,
-                                        URI uri,
-                                        StatsLogger statsLogger,
-                                        String clientId,
-                                        int regionId)
-            throws IOException, IllegalArgumentException {
-        this.namespace = BKDistributedLogNamespace.newBuilder()
-                .conf(conf)
-                .uri(uri)
-                .statsLogger(statsLogger)
-                .clientId(clientId)
-                .regionId(regionId)
-                .build();
-    }
-
-    public DistributedLogNamespace getNamespace() {
-        return namespace;
-    }
-
-    public void registerNamespaceListener(NamespaceListener listener) {
-        namespace.registerNamespaceListener(listener);
-    }
-
-    /**
-     * Create a DistributedLogManager for <i>nameOfLogStream</i>, with default 
shared clients.
-     *
-     * @param nameOfLogStream
-     *          name of log stream
-     * @return distributedlog manager
-     * @throws 
com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name 
is invalid
-     * @throws IOException
-     */
-    public DistributedLogManager 
createDistributedLogManagerWithSharedClients(String nameOfLogStream)
-        throws InvalidStreamNameException, IOException {
-        return createDistributedLogManager(nameOfLogStream, 
ClientSharingOption.SharedClients);
-    }
-
-    /**
-     * Create a DistributedLogManager for <i>nameOfLogStream</i>, with 
specified client sharing options.
-     *
-     * @param nameOfLogStream
-     *          name of log stream.
-     * @param clientSharingOption
-     *          specifies if the ZK/BK clients are shared
-     * @return distributedlog manager instance.
-     * @throws 
com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name 
is invalid
-     * @throws IOException
-     */
-    public DistributedLogManager createDistributedLogManager(
-            String nameOfLogStream,
-            ClientSharingOption clientSharingOption)
-        throws InvalidStreamNameException, IOException {
-        Optional<DistributedLogConfiguration> streamConfiguration = 
Optional.absent();
-        Optional<DynamicDistributedLogConfiguration> 
dynamicStreamConfiguration = Optional.absent();
-        return createDistributedLogManager(nameOfLogStream,
-            clientSharingOption,
-            streamConfiguration,
-            dynamicStreamConfiguration);
-    }
-
-    /**
-     * Create a DistributedLogManager for <i>nameOfLogStream</i>, with 
specified client sharing options.
-     * This method allows the caller to override global configuration options 
by supplying stream
-     * configuration overrides. Stream config overrides come in two flavors, 
static and dynamic. Static
-     * config never changes, and DynamicDistributedLogConfiguration is a) 
reloaded periodically and
-     * b) safe to access from any context.
-     *
-     * @param nameOfLogStream
-     *          name of log stream.
-     * @param clientSharingOption
-     *          specifies if the ZK/BK clients are shared
-     * @param streamConfiguration
-     *          stream configuration overrides.
-     * @param dynamicStreamConfiguration
-     *          dynamic stream configuration overrides.
-     * @return distributedlog manager instance.
-     * @throws 
com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name 
is invalid
-     * @throws IOException
-     */
-    public DistributedLogManager createDistributedLogManager(
-            String nameOfLogStream,
-            ClientSharingOption clientSharingOption,
-            Optional<DistributedLogConfiguration> streamConfiguration,
-            Optional<DynamicDistributedLogConfiguration> 
dynamicStreamConfiguration)
-        throws InvalidStreamNameException, IOException {
-        return namespace.createDistributedLogManager(
-            nameOfLogStream,
-            clientSharingOption,
-            streamConfiguration,
-            dynamicStreamConfiguration);
-    }
-
-    public MetadataAccessor createMetadataAccessor(String nameOfMetadataNode)
-            throws InvalidStreamNameException, IOException {
-        return namespace.createMetadataAccessor(nameOfMetadataNode);
-    }
-
-    public synchronized AccessControlManager createAccessControlManager() 
throws IOException {
-        return namespace.createAccessControlManager();
-    }
-
-    public boolean checkIfLogExists(String nameOfLogStream)
-        throws IOException, IllegalArgumentException {
-        return namespace.logExists(nameOfLogStream);
-    }
-
-    public Collection<String> enumerateAllLogsInNamespace()
-        throws IOException, IllegalArgumentException {
-        return namespace.enumerateAllLogsInNamespace();
-    }
-
-    public Map<String, byte[]> enumerateLogsWithMetadataInNamespace()
-        throws IOException, IllegalArgumentException {
-        return namespace.enumerateLogsWithMetadataInNamespace();
-    }
-
-    /**
-     * This method is to initialize the metadata for a unpartitioned stream 
with name <i>streamName</i>.
-     *
-     * TODO: after 0.2 is upgraded to 0.3, remove this.
-     *
-     * @param streamName
-     *          stream name.
-     * @throws IOException
-     */
-    public void createUnpartitionedStream(final String streamName) throws 
IOException {
-        namespace.createLog(streamName);
-    }
-
-    /**
-     * Close the distributed log manager factory, freeing any resources it may 
hold.
-     */
-    public void close() {
-        namespace.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
index 85a370f..f4a1e41 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
@@ -18,7 +18,7 @@
 package com.twitter.distributedlog;
 
 import com.google.common.base.Optional;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
index 94e618a..19f4497 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class ReadAheadEntryReader implements
         AsyncCloseable,
         LogSegmentListener,
+        LogSegmentEntryReader.StateChangeListener,
         FutureEventListener<List<Entry.Reader>> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ReadAheadEntryReader.class);
@@ -169,6 +170,9 @@ public class ReadAheadEntryReader implements
         @Override
         synchronized public void onSuccess(LogSegmentEntryReader reader) {
             this.reader = reader;
+            if (reader.getSegment().isInProgress()) {
+                reader.registerListener(ReadAheadEntryReader.this);
+            }
         }
 
         @Override
@@ -271,7 +275,7 @@ public class ReadAheadEntryReader implements
     // State of the reader
     //
 
-    private boolean isInitialized;
+    private boolean isInitialized = false;
     private boolean readAheadPaused = false;
     private Promise<Void> closePromise = null;
     // segment readers
@@ -549,10 +553,22 @@ public class ReadAheadEntryReader implements
         }
     }
 
+    void markCaughtup() {
+        if (isCatchingUp) {
+            isCatchingUp = false;
+            logger.info("ReadAhead for {} is caught up", 
readHandler.getFullyQualifiedName());
+        }
+    }
+
     public boolean isReadAheadCaughtUp() {
         return !isCatchingUp;
     }
 
+    @Override
+    public void onCaughtupOnInprogress() {
+        markCaughtup();
+    }
+
     //
     // ReadAhead State Machine
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java
deleted file mode 100644
index 4d7a0e1..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.metadata.BKDLConfig;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZKMetadataAccessor implements MetadataAccessor {
-    static final Logger LOG = 
LoggerFactory.getLogger(ZKMetadataAccessor.class);
-    protected final String name;
-    protected Promise<Void> closePromise;
-    protected final URI uri;
-    // zookeeper clients
-    // NOTE: The actual zookeeper client is initialized lazily when it is 
referenced by
-    //       {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it 
is safe to
-    //       keep builders and their client wrappers here, as they will be 
used when
-    //       instantiating readers or writers.
-    protected final ZooKeeperClientBuilder writerZKCBuilder;
-    protected final ZooKeeperClient writerZKC;
-    protected final boolean ownWriterZKC;
-    protected final ZooKeeperClientBuilder readerZKCBuilder;
-    protected final ZooKeeperClient readerZKC;
-    protected final boolean ownReaderZKC;
-
-    ZKMetadataAccessor(String name,
-                       DistributedLogConfiguration conf,
-                       URI uri,
-                       ZooKeeperClientBuilder writerZKCBuilder,
-                       ZooKeeperClientBuilder readerZKCBuilder,
-                       StatsLogger statsLogger) {
-        this.name = name;
-        this.uri = uri;
-
-        if (null == writerZKCBuilder) {
-            RetryPolicy retryPolicy = null;
-            if (conf.getZKNumRetries() > 0) {
-                retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                    conf.getZKRetryBackoffStartMillis(),
-                    conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
-            }
-            this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
-                    .name(String.format("dlzk:%s:dlm_writer_shared", name))
-                    .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                    .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                    .requestRateLimit(conf.getZKRequestRateLimit())
-                    .zkAclId(conf.getZkAclId())
-                    .uri(uri)
-                    .retryPolicy(retryPolicy)
-                    .statsLogger(statsLogger.scope("dlzk_dlm_writer_shared"));
-            this.ownWriterZKC = true;
-        } else {
-            this.writerZKCBuilder = writerZKCBuilder;
-            this.ownWriterZKC = false;
-        }
-        this.writerZKC = this.writerZKCBuilder.build();
-
-        if (null == readerZKCBuilder) {
-            String zkServersForWriter = DLUtils.getZKServersFromDLUri(uri);
-            String zkServersForReader;
-            try {
-                BKDLConfig bkdlConfig = 
BKDLConfig.resolveDLConfig(this.writerZKC, uri);
-                zkServersForReader = bkdlConfig.getDlZkServersForReader();
-            } catch (IOException e) {
-                LOG.warn("Error on resolving dl metadata bindings for {} : ", 
uri, e);
-                zkServersForReader = zkServersForWriter;
-            }
-            if (zkServersForReader.equals(zkServersForWriter)) {
-                LOG.info("Used same zookeeper servers '{}' for both writers 
and readers for {}.",
-                         zkServersForWriter, name);
-                this.readerZKCBuilder = this.writerZKCBuilder;
-                this.ownReaderZKC = false;
-            } else {
-                RetryPolicy retryPolicy = null;
-                if (conf.getZKNumRetries() > 0) {
-                    retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                        conf.getZKRetryBackoffStartMillis(),
-                        conf.getZKRetryBackoffMaxMillis(), 
conf.getZKNumRetries());
-                }
-                this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
-                        .name(String.format("dlzk:%s:dlm_reader_shared", name))
-                        
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                        .retryThreadCount(conf.getZKClientNumberRetryThreads())
-                        .requestRateLimit(conf.getZKRequestRateLimit())
-                        .zkServers(zkServersForReader)
-                        .retryPolicy(retryPolicy)
-                        .zkAclId(conf.getZkAclId())
-                        
.statsLogger(statsLogger.scope("dlzk_dlm_reader_shared"));
-                this.ownReaderZKC = true;
-            }
-        } else {
-            this.readerZKCBuilder = readerZKCBuilder;
-            this.ownReaderZKC = false;
-        }
-        this.readerZKC = this.readerZKCBuilder.build();
-    }
-
-    /**
-     * Get the name of the stream managed by this log manager
-     *
-     * @return streamName
-     */
-    @Override
-    public String getStreamName() {
-        return name;
-    }
-
-    /**
-     * Creates or update the metadata stored at the node associated with the
-     * name and URI
-     * @param metadata opaque metadata to be stored for the node
-     * @throws IOException
-     */
-    @Override
-    public void createOrUpdateMetadata(byte[] metadata) throws IOException {
-        checkClosedOrInError("createOrUpdateMetadata");
-
-        String zkPath = getZKPath();
-        LOG.debug("Setting application specific metadata on {}", zkPath);
-        try {
-            Stat currentStat = writerZKC.get().exists(zkPath, false);
-            if (currentStat == null) {
-                if (metadata.length > 0) {
-                    Utils.zkCreateFullPathOptimistic(writerZKC,
-                            zkPath,
-                            metadata,
-                            writerZKC.getDefaultACL(),
-                            CreateMode.PERSISTENT);
-                }
-            } else {
-                writerZKC.get().setData(zkPath, metadata, 
currentStat.getVersion());
-            }
-        } catch (InterruptedException ie) {
-            throw new DLInterruptedException("Interrupted on creating or 
updating container metadata", ie);
-        } catch (Exception exc) {
-            throw new IOException("Exception creating or updating container 
metadata", exc);
-        }
-    }
-
-    /**
-     * Delete the metadata stored at the associated node. This only deletes 
the metadata
-     * and not the node itself
-     * @throws IOException
-     */
-    @Override
-    public void deleteMetadata() throws IOException {
-        checkClosedOrInError("createOrUpdateMetadata");
-        createOrUpdateMetadata(null);
-    }
-
-    /**
-     * Retrieve the metadata stored at the node
-     * @return byte array containing the metadata
-     * @throws IOException
-     */
-    @Override
-    public byte[] getMetadata() throws IOException {
-        checkClosedOrInError("createOrUpdateMetadata");
-        String zkPath = getZKPath();
-        LOG.debug("Getting application specific metadata from {}", zkPath);
-        try {
-            Stat currentStat = readerZKC.get().exists(zkPath, false);
-            if (currentStat == null) {
-                return null;
-            } else {
-                return readerZKC.get().getData(zkPath, false, currentStat);
-            }
-        } catch (InterruptedException ie) {
-            throw new DLInterruptedException("Error reading the max tx id from 
zk", ie);
-        } catch (Exception e) {
-            throw new IOException("Error reading the max tx id from zk", e);
-        }
-    }
-
-    /**
-     * Close the metadata accessor, freeing any resources it may hold.
-     * @return future represents the close result.
-     */
-    @Override
-    public Future<Void> asyncClose() {
-        Promise<Void> closeFuture;
-        synchronized (this) {
-            if (null != closePromise) {
-                return closePromise;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests
-        //       the managers created by the namespace - whose zkc will be 
closed by namespace
-        try {
-            if (ownWriterZKC) {
-                writerZKC.close();
-            }
-            if (ownReaderZKC) {
-                readerZKC.close();
-            }
-        } catch (Exception e) {
-            LOG.warn("Exception while closing distributed log manager", e);
-        }
-        FutureUtils.setValue(closeFuture, null);
-        return closeFuture;
-    }
-
-    @Override
-    public void close() throws IOException {
-        FutureUtils.result(asyncClose());
-    }
-
-    public synchronized void checkClosedOrInError(String operation) throws 
AlreadyClosedException {
-        if (null != closePromise) {
-            throw new AlreadyClosedException("Executing " + operation + " on 
already closed ZKMetadataAccessor");
-        }
-    }
-
-    protected String getZKPath() {
-        return String.format("%s/%s", uri.getPath(), name);
-    }
-
-    @VisibleForTesting
-    protected ZooKeeperClient getReaderZKC() {
-        return readerZKC;
-    }
-
-    @VisibleForTesting
-    protected ZooKeeperClient getWriterZKC() {
-        return writerZKC;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
index 90807b0..15f1805 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java
@@ -20,7 +20,7 @@ package com.twitter.distributedlog;
 import com.google.common.base.Preconditions;
 import com.twitter.distributedlog.ZooKeeperClient.Credentials;
 import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
-import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
@@ -139,7 +139,7 @@ public class ZooKeeperClientBuilder {
      * @return builder.
      */
     public synchronized ZooKeeperClientBuilder uri(URI uri) {
-        this.zkServers = DLUtils.getZKServersFromDLUri(uri);
+        this.zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
         return this;
     }
 


Reply via email to