http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
new file mode 100644
index 0000000..eeda804
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.MetadataAccessor;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.ZooKeeperClientBuilder;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
com.twitter.distributedlog.impl.BKNamespaceDriver.getZKServersFromDLUri;
+
+public class ZKMetadataAccessor implements MetadataAccessor {
+    static final Logger LOG = 
LoggerFactory.getLogger(ZKMetadataAccessor.class);
+    protected final String name;
+    protected Promise<Void> closePromise;
+    protected final URI uri;
+    // zookeeper clients
+    // NOTE: The actual zookeeper client is initialized lazily when it is 
referenced by
+    //       {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it 
is safe to
+    //       keep builders and their client wrappers here, as they will be 
used when
+    //       instantiating readers or writers.
+    protected final ZooKeeperClientBuilder writerZKCBuilder;
+    protected final ZooKeeperClient writerZKC;
+    protected final boolean ownWriterZKC;
+    protected final ZooKeeperClientBuilder readerZKCBuilder;
+    protected final ZooKeeperClient readerZKC;
+    protected final boolean ownReaderZKC;
+
+    ZKMetadataAccessor(String name,
+                       DistributedLogConfiguration conf,
+                       URI uri,
+                       ZooKeeperClientBuilder writerZKCBuilder,
+                       ZooKeeperClientBuilder readerZKCBuilder,
+                       StatsLogger statsLogger) {
+        this.name = name;
+        this.uri = uri;
+
+        if (null == writerZKCBuilder) {
+            RetryPolicy retryPolicy = null;
+            if (conf.getZKNumRetries() > 0) {
+                retryPolicy = new BoundExponentialBackoffRetryPolicy(
+                    conf.getZKRetryBackoffStartMillis(),
+                    conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
+            }
+            this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
+                    .name(String.format("dlzk:%s:dlm_writer_shared", name))
+                    .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+                    .retryThreadCount(conf.getZKClientNumberRetryThreads())
+                    .requestRateLimit(conf.getZKRequestRateLimit())
+                    .zkAclId(conf.getZkAclId())
+                    .uri(uri)
+                    .retryPolicy(retryPolicy)
+                    .statsLogger(statsLogger.scope("dlzk_dlm_writer_shared"));
+            this.ownWriterZKC = true;
+        } else {
+            this.writerZKCBuilder = writerZKCBuilder;
+            this.ownWriterZKC = false;
+        }
+        this.writerZKC = this.writerZKCBuilder.build();
+
+        if (null == readerZKCBuilder) {
+            String zkServersForWriter = getZKServersFromDLUri(uri);
+            String zkServersForReader;
+            try {
+                BKDLConfig bkdlConfig = 
BKDLConfig.resolveDLConfig(this.writerZKC, uri);
+                zkServersForReader = bkdlConfig.getDlZkServersForReader();
+            } catch (IOException e) {
+                LOG.warn("Error on resolving dl metadata bindings for {} : ", 
uri, e);
+                zkServersForReader = zkServersForWriter;
+            }
+            if (zkServersForReader.equals(zkServersForWriter)) {
+                LOG.info("Used same zookeeper servers '{}' for both writers 
and readers for {}.",
+                         zkServersForWriter, name);
+                this.readerZKCBuilder = this.writerZKCBuilder;
+                this.ownReaderZKC = false;
+            } else {
+                RetryPolicy retryPolicy = null;
+                if (conf.getZKNumRetries() > 0) {
+                    retryPolicy = new BoundExponentialBackoffRetryPolicy(
+                        conf.getZKRetryBackoffStartMillis(),
+                        conf.getZKRetryBackoffMaxMillis(), 
conf.getZKNumRetries());
+                }
+                this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
+                        .name(String.format("dlzk:%s:dlm_reader_shared", name))
+                        
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+                        .retryThreadCount(conf.getZKClientNumberRetryThreads())
+                        .requestRateLimit(conf.getZKRequestRateLimit())
+                        .zkServers(zkServersForReader)
+                        .retryPolicy(retryPolicy)
+                        .zkAclId(conf.getZkAclId())
+                        
.statsLogger(statsLogger.scope("dlzk_dlm_reader_shared"));
+                this.ownReaderZKC = true;
+            }
+        } else {
+            this.readerZKCBuilder = readerZKCBuilder;
+            this.ownReaderZKC = false;
+        }
+        this.readerZKC = this.readerZKCBuilder.build();
+    }
+
+    /**
+     * Get the name of the stream managed by this log manager
+     *
+     * @return streamName
+     */
+    @Override
+    public String getStreamName() {
+        return name;
+    }
+
+    /**
+     * Creates or update the metadata stored at the node associated with the
+     * name and URI
+     * @param metadata opaque metadata to be stored for the node
+     * @throws IOException
+     */
+    @Override
+    public void createOrUpdateMetadata(byte[] metadata) throws IOException {
+        checkClosedOrInError("createOrUpdateMetadata");
+
+        String zkPath = getZKPath();
+        LOG.debug("Setting application specific metadata on {}", zkPath);
+        try {
+            Stat currentStat = writerZKC.get().exists(zkPath, false);
+            if (currentStat == null) {
+                if (metadata.length > 0) {
+                    Utils.zkCreateFullPathOptimistic(writerZKC,
+                            zkPath,
+                            metadata,
+                            writerZKC.getDefaultACL(),
+                            CreateMode.PERSISTENT);
+                }
+            } else {
+                writerZKC.get().setData(zkPath, metadata, 
currentStat.getVersion());
+            }
+        } catch (InterruptedException ie) {
+            throw new DLInterruptedException("Interrupted on creating or 
updating container metadata", ie);
+        } catch (Exception exc) {
+            throw new IOException("Exception creating or updating container 
metadata", exc);
+        }
+    }
+
+    /**
+     * Delete the metadata stored at the associated node. This only deletes 
the metadata
+     * and not the node itself
+     * @throws IOException
+     */
+    @Override
+    public void deleteMetadata() throws IOException {
+        checkClosedOrInError("createOrUpdateMetadata");
+        createOrUpdateMetadata(null);
+    }
+
+    /**
+     * Retrieve the metadata stored at the node
+     * @return byte array containing the metadata
+     * @throws IOException
+     */
+    @Override
+    public byte[] getMetadata() throws IOException {
+        checkClosedOrInError("createOrUpdateMetadata");
+        String zkPath = getZKPath();
+        LOG.debug("Getting application specific metadata from {}", zkPath);
+        try {
+            Stat currentStat = readerZKC.get().exists(zkPath, false);
+            if (currentStat == null) {
+                return null;
+            } else {
+                return readerZKC.get().getData(zkPath, false, currentStat);
+            }
+        } catch (InterruptedException ie) {
+            throw new DLInterruptedException("Error reading the max tx id from 
zk", ie);
+        } catch (Exception e) {
+            throw new IOException("Error reading the max tx id from zk", e);
+        }
+    }
+
+    /**
+     * Close the metadata accessor, freeing any resources it may hold.
+     * @return future represents the close result.
+     */
+    @Override
+    public Future<Void> asyncClose() {
+        Promise<Void> closeFuture;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+        }
+        // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests
+        //       the managers created by the namespace - whose zkc will be 
closed by namespace
+        try {
+            if (ownWriterZKC) {
+                writerZKC.close();
+            }
+            if (ownReaderZKC) {
+                readerZKC.close();
+            }
+        } catch (Exception e) {
+            LOG.warn("Exception while closing distributed log manager", e);
+        }
+        FutureUtils.setValue(closeFuture, null);
+        return closeFuture;
+    }
+
+    @Override
+    public void close() throws IOException {
+        FutureUtils.result(asyncClose());
+    }
+
+    public synchronized void checkClosedOrInError(String operation) throws 
AlreadyClosedException {
+        if (null != closePromise) {
+            throw new AlreadyClosedException("Executing " + operation + " on 
already closed ZKMetadataAccessor");
+        }
+    }
+
+    protected String getZKPath() {
+        return String.format("%s/%s", uri.getPath(), name);
+    }
+
+    @VisibleForTesting
+    protected ZooKeeperClient getReaderZKC() {
+        return readerZKC;
+    }
+
+    @VisibleForTesting
+    protected ZooKeeperClient getWriterZKC() {
+        return writerZKC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
index 61c1760..06bc8fb 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java
@@ -37,7 +37,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static com.twitter.distributedlog.impl.BKDLUtils.*;
+import static com.twitter.distributedlog.util.DLUtils.*;
 
 /**
  * Watcher on watching a given namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
new file mode 100644
index 0000000..8126723
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.acl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.thrift.AccessControlEntry;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class ZKAccessControl {
+
+    private static final int BUFFER_SIZE = 4096;
+
+    public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new 
AccessControlEntry();
+
+    public static class CorruptedAccessControlException extends IOException {
+
+        private static final long serialVersionUID = 5391285182476211603L;
+
+        public CorruptedAccessControlException(String zkPath, Throwable t) {
+            super("Access Control @ " + zkPath + " is corrupted.", t);
+        }
+    }
+
+    protected final AccessControlEntry accessControlEntry;
+    protected final String zkPath;
+    private int zkVersion;
+
+    public ZKAccessControl(AccessControlEntry ace, String zkPath) {
+        this(ace, zkPath, -1);
+    }
+
+    private ZKAccessControl(AccessControlEntry ace, String zkPath, int 
zkVersion) {
+        this.accessControlEntry = ace;
+        this.zkPath = zkPath;
+        this.zkVersion = zkVersion;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(zkPath, accessControlEntry);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ZKAccessControl)) {
+            return false;
+        }
+        ZKAccessControl other = (ZKAccessControl) obj;
+        return Objects.equal(zkPath, other.zkPath) &&
+                Objects.equal(accessControlEntry, other.accessControlEntry);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("entry(path=").append(zkPath).append(", acl=")
+                .append(accessControlEntry).append(")");
+        return sb.toString();
+    }
+
+    @VisibleForTesting
+    public String getZKPath() {
+        return zkPath;
+    }
+
+    @VisibleForTesting
+    public AccessControlEntry getAccessControlEntry() {
+        return accessControlEntry;
+    }
+
+    public Future<ZKAccessControl> create(ZooKeeperClient zkc) {
+        final Promise<ZKAccessControl> promise = new 
Promise<ZKAccessControl>();
+        try {
+            zkc.get().create(zkPath, serialize(accessControlEntry), 
zkc.getDefaultACL(), CreateMode.PERSISTENT,
+                    new AsyncCallback.StringCallback() {
+                        @Override
+                        public void processResult(int rc, String path, Object 
ctx, String name) {
+                            if (KeeperException.Code.OK.intValue() == rc) {
+                                ZKAccessControl.this.zkVersion = 0;
+                                promise.setValue(ZKAccessControl.this);
+                            } else {
+                                
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                            }
+                        }
+                    }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+        } catch (InterruptedException e) {
+            promise.setException(e);
+        } catch (IOException e) {
+            promise.setException(e);
+        }
+        return promise;
+    }
+
+    public Future<ZKAccessControl> update(ZooKeeperClient zkc) {
+        final Promise<ZKAccessControl> promise = new 
Promise<ZKAccessControl>();
+        try {
+            zkc.get().setData(zkPath, serialize(accessControlEntry), 
zkVersion, new AsyncCallback.StatCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    if (KeeperException.Code.OK.intValue() == rc) {
+                        ZKAccessControl.this.zkVersion = stat.getVersion();
+                        promise.setValue(ZKAccessControl.this);
+                    } else {
+                        
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                    }
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+        } catch (InterruptedException e) {
+            promise.setException(e);
+        } catch (IOException e) {
+            promise.setException(e);
+        }
+        return promise;
+    }
+
+    public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, 
final String zkPath, Watcher watcher) {
+        final Promise<ZKAccessControl> promise = new 
Promise<ZKAccessControl>();
+
+        try {
+            zkc.get().getData(zkPath, watcher, new 
AsyncCallback.DataCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+                    if (KeeperException.Code.OK.intValue() == rc) {
+                        try {
+                            AccessControlEntry ace = deserialize(zkPath, data);
+                            promise.setValue(new ZKAccessControl(ace, zkPath, 
stat.getVersion()));
+                        } catch (IOException ioe) {
+                            promise.setException(ioe);
+                        }
+                    } else {
+                        
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                    }
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+        } catch (InterruptedException e) {
+            promise.setException(e);
+        }
+        return promise;
+    }
+
+    public static Future<Void> delete(final ZooKeeperClient zkc, final String 
zkPath) {
+        final Promise<Void> promise = new Promise<Void>();
+
+        try {
+            zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    if (KeeperException.Code.OK.intValue() == rc ||
+                            KeeperException.Code.NONODE.intValue() == rc) {
+                        promise.setValue(null);
+                    } else {
+                        
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                    }
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+        } catch (InterruptedException e) {
+            promise.setException(e);
+        }
+        return promise;
+    }
+
+    static byte[] serialize(AccessControlEntry ace) throws IOException {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            ace.write(protocol);
+            transport.flush();
+            return transport.toString(UTF_8.name()).getBytes(UTF_8);
+        } catch (TException e) {
+            throw new IOException("Failed to serialize access control entry : 
", e);
+        } catch (UnsupportedEncodingException uee) {
+            throw new IOException("Failed to serialize acesss control entry : 
", uee);
+        }
+    }
+
+    static AccessControlEntry deserialize(String zkPath, byte[] data) throws 
IOException {
+        if (data.length == 0) {
+            return DEFAULT_ACCESS_CONTROL_ENTRY;
+        }
+
+        AccessControlEntry ace = new AccessControlEntry();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            ace.read(protocol);
+        } catch (TException e) {
+            throw new CorruptedAccessControlException(zkPath, e);
+        }
+        return ace;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
new file mode 100644
index 0000000..0c90a50
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.acl;
+
+import com.google.common.collect.Sets;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.thrift.AccessControlEntry;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager}
+ */
+public class ZKAccessControlManager implements AccessControlManager, Watcher {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ZKAccessControlManager.class);
+
+    private static final int ZK_RETRY_BACKOFF_MS = 500;
+
+    protected final DistributedLogConfiguration conf;
+    protected final ZooKeeperClient zkc;
+    protected final String zkRootPath;
+    protected final ScheduledExecutorService scheduledExecutorService;
+
+    protected final ConcurrentMap<String, ZKAccessControl> streamEntries;
+    protected ZKAccessControl defaultAccessControl;
+    protected volatile boolean closed = false;
+
+    public ZKAccessControlManager(DistributedLogConfiguration conf,
+                                  ZooKeeperClient zkc,
+                                  String zkRootPath,
+                                  ScheduledExecutorService 
scheduledExecutorService) throws IOException {
+        this.conf = conf;
+        this.zkc = zkc;
+        this.zkRootPath = zkRootPath;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
+        try {
+            Await.result(fetchDefaultAccessControlEntry());
+        } catch (Throwable t) {
+            if (t instanceof InterruptedException) {
+                throw new DLInterruptedException("Interrupted on getting 
default access control entry for " + zkRootPath, t);
+            } else if (t instanceof KeeperException) {
+                throw new IOException("Encountered zookeeper exception on 
getting default access control entry for " + zkRootPath, t);
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Encountered unknown exception on 
getting access control entries for " + zkRootPath, t);
+            }
+        }
+
+        try {
+            Await.result(fetchAccessControlEntries());
+        } catch (Throwable t) {
+            if (t instanceof InterruptedException) {
+                throw new DLInterruptedException("Interrupted on getting 
access control entries for " + zkRootPath, t);
+            } else if (t instanceof KeeperException) {
+                throw new IOException("Encountered zookeeper exception on 
getting access control entries for " + zkRootPath, t);
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException("Encountered unknown exception on 
getting access control entries for " + zkRootPath, t);
+            }
+        }
+    }
+
+    protected AccessControlEntry getAccessControlEntry(String stream) {
+        ZKAccessControl entry = streamEntries.get(stream);
+        entry = null == entry ? defaultAccessControl : entry;
+        return entry.getAccessControlEntry();
+    }
+
+    @Override
+    public boolean allowWrite(String stream) {
+        return !getAccessControlEntry(stream).isDenyWrite();
+    }
+
+    @Override
+    public boolean allowTruncate(String stream) {
+        return !getAccessControlEntry(stream).isDenyTruncate();
+    }
+
+    @Override
+    public boolean allowDelete(String stream) {
+        return !getAccessControlEntry(stream).isDenyDelete();
+    }
+
+    @Override
+    public boolean allowAcquire(String stream) {
+        return !getAccessControlEntry(stream).isDenyAcquire();
+    }
+
+    @Override
+    public boolean allowRelease(String stream) {
+        return !getAccessControlEntry(stream).isDenyRelease();
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+    }
+
+    private Future<Void> fetchAccessControlEntries() {
+        final Promise<Void> promise = new Promise<Void>();
+        fetchAccessControlEntries(promise);
+        return promise;
+    }
+
+    private void fetchAccessControlEntries(final Promise<Void> promise) {
+        try {
+            zkc.get().getChildren(zkRootPath, this, new 
AsyncCallback.Children2Callback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+                    if (KeeperException.Code.OK.intValue() != rc) {
+                        
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                        return;
+                    }
+                    Set<String> streamsReceived = new HashSet<String>();
+                    streamsReceived.addAll(children);
+                    Set<String> streamsCached = streamEntries.keySet();
+                    Set<String> streamsRemoved = 
Sets.difference(streamsCached, streamsReceived).immutableCopy();
+                    for (String s : streamsRemoved) {
+                        ZKAccessControl accessControl = 
streamEntries.remove(s);
+                        if (null != accessControl) {
+                            logger.info("Removed Access Control Entry for 
stream {} : {}", s, accessControl.getAccessControlEntry());
+                        }
+                    }
+                    if (streamsReceived.isEmpty()) {
+                        promise.setValue(null);
+                        return;
+                    }
+                    final AtomicInteger numPendings = new 
AtomicInteger(streamsReceived.size());
+                    final AtomicInteger numFailures = new AtomicInteger(0);
+                    for (String s : streamsReceived) {
+                        final String streamName = s;
+                        ZKAccessControl.read(zkc, zkRootPath + "/" + 
streamName, null)
+                                .addEventListener(new 
FutureEventListener<ZKAccessControl>() {
+
+                                    @Override
+                                    public void onSuccess(ZKAccessControl 
accessControl) {
+                                        streamEntries.put(streamName, 
accessControl);
+                                        logger.info("Added overrided access 
control for stream {} : {}", streamName, accessControl.getAccessControlEntry());
+                                        complete();
+                                    }
+
+                                    @Override
+                                    public void onFailure(Throwable cause) {
+                                        if (cause instanceof 
KeeperException.NoNodeException) {
+                                            streamEntries.remove(streamName);
+                                        } else if (cause instanceof 
ZKAccessControl.CorruptedAccessControlException) {
+                                            logger.warn("Access control is 
corrupted for stream {} @ {}, skipped it ...",
+                                                        new Object[] { 
streamName, zkRootPath, cause });
+                                            streamEntries.remove(streamName);
+                                        } else {
+                                            if (1 == 
numFailures.incrementAndGet()) {
+                                                promise.setException(cause);
+                                            }
+                                        }
+                                        complete();
+                                    }
+
+                                    private void complete() {
+                                        if (0 == numPendings.decrementAndGet() 
&& numFailures.get() == 0) {
+                                            promise.setValue(null);
+                                        }
+                                    }
+                                });
+                    }
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+        } catch (InterruptedException e) {
+            promise.setException(e);
+        }
+    }
+
+    private Future<ZKAccessControl> fetchDefaultAccessControlEntry() {
+        final Promise<ZKAccessControl> promise = new 
Promise<ZKAccessControl>();
+        fetchDefaultAccessControlEntry(promise);
+        return promise;
+    }
+
+    private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> 
promise) {
+        ZKAccessControl.read(zkc, zkRootPath, this)
+            .addEventListener(new FutureEventListener<ZKAccessControl>() {
+                @Override
+                public void onSuccess(ZKAccessControl accessControl) {
+                    logger.info("Default Access Control will be changed from 
{} to {}",
+                                
ZKAccessControlManager.this.defaultAccessControl,
+                                accessControl);
+                    ZKAccessControlManager.this.defaultAccessControl = 
accessControl;
+                    promise.setValue(accessControl);
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    if (cause instanceof KeeperException.NoNodeException) {
+                        logger.info("Default Access Control is missing, 
creating one for {} ...", zkRootPath);
+                        createDefaultAccessControlEntryIfNeeded(promise);
+                    } else {
+                        promise.setException(cause);
+                    }
+                }
+            });
+    }
+
+    private void createDefaultAccessControlEntryIfNeeded(final 
Promise<ZKAccessControl> promise) {
+        ZooKeeper zk;
+        try {
+            zk = zkc.get();
+        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+            promise.setException(e);
+            return;
+        } catch (InterruptedException e) {
+            promise.setException(e);
+            return;
+        }
+        ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], 
zkc.getDefaultACL(),
+                CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, String 
name) {
+                if (KeeperException.Code.OK.intValue() == rc) {
+                    logger.info("Created zk path {} for default ACL.", 
zkRootPath);
+                    fetchDefaultAccessControlEntry(promise);
+                } else {
+                    
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+                }
+            }
+        }, null);
+    }
+
+    private void refetchDefaultAccessControlEntry(final int delayMs) {
+        if (closed) {
+            return;
+        }
+        scheduledExecutorService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                fetchDefaultAccessControlEntry().addEventListener(new 
FutureEventListener<ZKAccessControl>() {
+                    @Override
+                    public void onSuccess(ZKAccessControl value) {
+                        // no-op
+                    }
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        if (cause instanceof 
ZKAccessControl.CorruptedAccessControlException) {
+                            logger.warn("Default access control entry is 
corrupted, ignore this update : ", cause);
+                            return;
+                        }
+
+                        logger.warn("Encountered an error on refetching 
default access control entry, retrying in {} ms : ",
+                                    ZK_RETRY_BACKOFF_MS, cause);
+                        refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS);
+                    }
+                });
+            }
+        }, delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void refetchAccessControlEntries(final int delayMs) {
+        if (closed) {
+            return;
+        }
+        scheduledExecutorService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                fetchAccessControlEntries().addEventListener(new 
FutureEventListener<Void>() {
+                    @Override
+                    public void onSuccess(Void value) {
+                        // no-op
+                    }
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        logger.warn("Encountered an error on refetching access 
control entries, retrying in {} ms : ",
+                                    ZK_RETRY_BACKOFF_MS, cause);
+                        refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
+                    }
+                });
+            }
+        }, delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void refetchAllAccessControlEntries(final int delayMs) {
+        if (closed) {
+            return;
+        }
+        scheduledExecutorService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                fetchDefaultAccessControlEntry().addEventListener(new 
FutureEventListener<ZKAccessControl>() {
+                    @Override
+                    public void onSuccess(ZKAccessControl value) {
+                        fetchAccessControlEntries().addEventListener(new 
FutureEventListener<Void>() {
+                            @Override
+                            public void onSuccess(Void value) {
+                                // no-op
+                            }
+
+                            @Override
+                            public void onFailure(Throwable cause) {
+                                logger.warn("Encountered an error on fetching 
all access control entries, retrying in {} ms : ",
+                                            ZK_RETRY_BACKOFF_MS, cause);
+                                
refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
+                            }
+                        });
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        logger.warn("Encountered an error on refetching all 
access control entries, retrying in {} ms : ",
+                                    ZK_RETRY_BACKOFF_MS, cause);
+                        refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS);
+                    }
+                });
+            }
+        }, delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        if (Event.EventType.None.equals(event.getType())) {
+            if (event.getState() == Event.KeeperState.Expired) {
+                refetchAllAccessControlEntries(0);
+            }
+        } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {
+            logger.info("Default ACL for {} is changed, refetching ...", 
zkRootPath);
+            refetchDefaultAccessControlEntry(0);
+        } else if 
(Event.EventType.NodeChildrenChanged.equals(event.getType())) {
+            logger.info("List of ACLs for {} are changed, refetching ...", 
zkRootPath);
+            refetchAccessControlEntries(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
new file mode 100644
index 0000000..d7ff4fb
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.logsegment;
+
+import com.twitter.distributedlog.bk.LedgerAllocator;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
+import com.twitter.distributedlog.util.Allocator;
+import com.twitter.distributedlog.util.Transaction;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.client.LedgerHandle;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+
+import java.io.IOException;
+
+/**
+ * Allocate log segments
+ */
+class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, 
Object> {
+
+    private static class NewLogSegmentEntryWriterFn extends 
AbstractFunction1<LedgerHandle, LogSegmentEntryWriter> {
+
+        static final Function1<LedgerHandle, LogSegmentEntryWriter> INSTANCE =
+                new NewLogSegmentEntryWriterFn();
+
+        private NewLogSegmentEntryWriterFn() {}
+
+        @Override
+        public LogSegmentEntryWriter apply(LedgerHandle lh) {
+            return new BKLogSegmentEntryWriter(lh);
+        }
+    }
+
+    LedgerAllocator allocator;
+
+    BKLogSegmentAllocator(LedgerAllocator allocator) {
+        this.allocator = allocator;
+    }
+
+    @Override
+    public void allocate() throws IOException {
+        allocator.allocate();
+    }
+
+    @Override
+    public Future<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn,
+                                                   final 
Transaction.OpListener<LogSegmentEntryWriter> listener) {
+        return allocator.tryObtain(txn, new 
Transaction.OpListener<LedgerHandle>() {
+            @Override
+            public void onCommit(LedgerHandle lh) {
+                listener.onCommit(new BKLogSegmentEntryWriter(lh));
+            }
+
+            @Override
+            public void onAbort(Throwable t) {
+                listener.onAbort(t);
+            }
+        }).map(NewLogSegmentEntryWriterFn.INSTANCE);
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return allocator.asyncClose();
+    }
+
+    @Override
+    public Future<Void> delete() {
+        return allocator.delete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index dc382d2..f85760d 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -209,6 +210,7 @@ public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader,
                 int numErrors = Math.max(1, numReadErrors.incrementAndGet());
                 int nextReadBackoffTime = Math.min(numErrors * 
readAheadWaitTime, maxReadBackoffTime);
                 scheduler.schedule(
+                        getSegment().getLogSegmentId(),
                         this,
                         nextReadBackoffTime,
                         TimeUnit.MILLISECONDS);
@@ -284,6 +286,8 @@ public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader,
     private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>(null);
     private final AtomicLong scheduleCount = new AtomicLong(0);
     private volatile boolean hasCaughtupOnInprogress = false;
+    private final CopyOnWriteArraySet<StateChangeListener> 
stateChangeListeners =
+            new CopyOnWriteArraySet<StateChangeListener>();
     // read retries
     private int readAheadWaitTime;
     private final int maxReadBackoffTime;
@@ -374,6 +378,24 @@ public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader,
         return hasCaughtupOnInprogress;
     }
 
+    @Override
+    public LogSegmentEntryReader registerListener(StateChangeListener 
listener) {
+        stateChangeListeners.add(listener);
+        return this;
+    }
+
+    @Override
+    public LogSegmentEntryReader unregisterListener(StateChangeListener 
listener) {
+        stateChangeListeners.remove(listener);
+        return this;
+    }
+
+    private void notifyCaughtupOnInprogress() {
+        for (StateChangeListener listener : stateChangeListeners) {
+            listener.onCaughtupOnInprogress();
+        }
+    }
+
     //
     // Process on Log Segment Metadata Updates
     //
@@ -440,7 +462,7 @@ public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader,
             return;
         }
         // the reader is still catching up, retry opening the log segment later
-        scheduler.schedule(new Runnable() {
+        scheduler.schedule(segment.getLogSegmentId(), new Runnable() {
             @Override
             public void run() {
                 onLogSegmentMetadataUpdated(segment);
@@ -583,6 +605,7 @@ public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader,
 
         if (!hasCaughtupOnInprogress) {
             hasCaughtupOnInprogress = true;
+            notifyCaughtupOnInprogress();
         }
         getLh().asyncReadLastConfirmedAndEntry(
                 cacheEntry.entryId,
@@ -633,7 +656,7 @@ public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader,
 
         long prevCount = scheduleCount.getAndIncrement();
         if (0 == prevCount) {
-            scheduler.submit(this);
+            scheduler.submit(getSegment().getLogSegmentId(), this);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
index f7f4acf..91e6dec 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -20,12 +20,21 @@ package com.twitter.distributedlog.impl.logsegment;
 import com.twitter.distributedlog.BookKeeperClient;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LogSegmentMetadata;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
+import com.twitter.distributedlog.bk.LedgerAllocator;
+import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
+import com.twitter.distributedlog.bk.QuorumConfigProvider;
+import com.twitter.distributedlog.bk.SimpleLedgerAllocator;
+import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.BKTransmitException;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
 import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
+import com.twitter.distributedlog.metadata.LogMetadataForWriter;
+import com.twitter.distributedlog.util.Allocator;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.util.Future;
@@ -80,21 +89,31 @@ public class BKLogSegmentEntryStore implements
     }
 
     private final byte[] passwd;
+    private final ZooKeeperClient zkc;
     private final BookKeeperClient bkc;
     private final OrderedScheduler scheduler;
     private final DistributedLogConfiguration conf;
+    private final DynamicDistributedLogConfiguration dynConf;
     private final StatsLogger statsLogger;
     private final AsyncFailureInjector failureInjector;
+    // ledger allocator
+    private final LedgerAllocator allocator;
 
     public BKLogSegmentEntryStore(DistributedLogConfiguration conf,
+                                  DynamicDistributedLogConfiguration dynConf,
+                                  ZooKeeperClient zkc,
                                   BookKeeperClient bkc,
                                   OrderedScheduler scheduler,
+                                  LedgerAllocator allocator,
                                   StatsLogger statsLogger,
                                   AsyncFailureInjector failureInjector) {
         this.conf = conf;
+        this.dynConf = dynConf;
+        this.zkc = zkc;
         this.bkc = bkc;
         this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
         this.scheduler = scheduler;
+        this.allocator = allocator;
         this.statsLogger = statsLogger;
         this.failureInjector = failureInjector;
     }
@@ -129,11 +148,43 @@ public class BKLogSegmentEntryStore implements
         FutureUtils.setValue(deleteRequest.deletePromise, 
deleteRequest.segment);
     }
 
+    //
+    // Writers
+    //
+
+    LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata,
+                                          DynamicDistributedLogConfiguration 
dynConf)
+            throws IOException {
+        LedgerAllocator ledgerAllocatorDelegator;
+        if (null == allocator || !dynConf.getEnableLedgerAllocatorPool()) {
+            QuorumConfigProvider quorumConfigProvider =
+                    new DynamicQuorumConfigProvider(dynConf);
+            LedgerAllocator allocator = new SimpleLedgerAllocator(
+                    logMetadata.getAllocationPath(),
+                    logMetadata.getAllocationData(),
+                    quorumConfigProvider,
+                    zkc,
+                    bkc);
+            ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, 
true);
+        } else {
+            ledgerAllocatorDelegator = allocator;
+        }
+        return ledgerAllocatorDelegator;
+    }
+
     @Override
-    public Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata 
segment) {
-        throw new UnsupportedOperationException("Not supported yet");
+    public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator(
+            LogMetadataForWriter logMetadata,
+            DynamicDistributedLogConfiguration dynConf) throws IOException {
+        // Build the ledger allocator
+        LedgerAllocator allocator = createLedgerAllocator(logMetadata, 
dynConf);
+        return new BKLogSegmentAllocator(allocator);
     }
 
+    //
+    // Readers
+    //
+
     @Override
     public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
                                                     long startEntryId) {
@@ -220,15 +271,15 @@ public class BKLogSegmentEntryStore implements
                     segment.getLogSegmentId(),
                     BookKeeper.DigestType.CRC32,
                     passwd,
-                    this,
-                    openCallback);
+                    openCallback,
+                    null);
         } else {
             bk.asyncOpenLedger(
                     segment.getLogSegmentId(),
                     BookKeeper.DigestType.CRC32,
                     passwd,
-                    this,
-                    openCallback);
+                    openCallback,
+                    null);
         }
         return openPromise;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
new file mode 100644
index 0000000..3e859fb
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import com.twitter.distributedlog.metadata.DLConfig;
+import com.twitter.distributedlog.thrift.BKDLConfigFormat;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * Configurations for BookKeeper based DL.
+ */
+public class BKDLConfig implements DLConfig {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BKDLConfig.class);
+
+    private static final int BUFFER_SIZE = 4096;
+    private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs =
+            new ConcurrentHashMap<URI, DLConfig>();
+
+    public static void propagateConfiguration(BKDLConfig bkdlConfig, 
DistributedLogConfiguration dlConf) {
+        
dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID());
+        
dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo());
+        if (bkdlConfig.isFederatedNamespace()) {
+            dlConf.setCreateStreamIfNotExists(false);
+            LOG.info("Disabled createIfNotExists for federated namespace.");
+        }
+        LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," +
+                        " firstLogSegmentSequenceNumber = {}, 
createStreamIfNotExists = {}, isFederated = {}.",
+                new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(),
+                        dlConf.getFirstLogSegmentSequenceNumber(), 
dlConf.getCreateStreamIfNotExists(),
+                        bkdlConfig.isFederatedNamespace() });
+    }
+
+    public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) 
throws IOException {
+        DLConfig dlConfig = cachedDLConfigs.get(uri);
+        if (dlConfig == null) {
+            dlConfig = (new 
ZkMetadataResolver(zkc).resolve(uri)).getDLConfig();
+            DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig);
+            if (null != oldDLConfig) {
+                dlConfig = oldDLConfig;
+            }
+        }
+        assert (dlConfig instanceof BKDLConfig);
+        return (BKDLConfig)dlConfig;
+    }
+
+    @VisibleForTesting
+    public static void clearCachedDLConfigs() {
+        cachedDLConfigs.clear();
+    }
+
+    private String bkZkServersForWriter;
+    private String bkZkServersForReader;
+    private String bkLedgersPath;
+    private boolean sanityCheckTxnID = true;
+    private boolean encodeRegionID = false;
+    private String dlZkServersForWriter;
+    private String dlZkServersForReader;
+    private String aclRootPath;
+    private Long firstLogSegmentSeqNo;
+    private boolean isFederatedNamespace = false;
+
+    /**
+     * Construct a empty config with given <i>uri</i>.
+     */
+    public BKDLConfig(URI uri) {
+        this(BKNamespaceDriver.getZKServersFromDLUri(uri),
+             BKNamespaceDriver.getZKServersFromDLUri(uri),
+             null, null, null);
+    }
+
+    /**
+     * The caller should make sure both dl and bk use same zookeeper server.
+     *
+     * @param zkServers
+     *          zk servers used for both dl and bk.
+     * @param ledgersPath
+     *          ledgers path.
+     */
+    @VisibleForTesting
+    public BKDLConfig(String zkServers, String ledgersPath) {
+        this(zkServers, zkServers, zkServers, zkServers, ledgersPath);
+    }
+
+    public BKDLConfig(String dlZkServersForWriter,
+                      String dlZkServersForReader,
+                      String bkZkServersForWriter,
+                      String bkZkServersForReader,
+                      String bkLedgersPath) {
+        this.dlZkServersForWriter = dlZkServersForWriter;
+        this.dlZkServersForReader = dlZkServersForReader;
+        this.bkZkServersForWriter = bkZkServersForWriter;
+        this.bkZkServersForReader = bkZkServersForReader;
+        this.bkLedgersPath = bkLedgersPath;
+    }
+
+    /**
+     * @return zk servers used for bk for writers
+     */
+    public String getBkZkServersForWriter() {
+        return bkZkServersForWriter;
+    }
+
+    /**
+     * @return zk servers used for bk for readers
+     */
+    public String getBkZkServersForReader() {
+        return bkZkServersForReader;
+    }
+
+    /**
+     * @return zk servers used for dl for writers
+     */
+    public String getDlZkServersForWriter() {
+        return dlZkServersForWriter;
+    }
+
+    /**
+     * @return zk servers used for dl for readers
+     */
+    public String getDlZkServersForReader() {
+        return dlZkServersForReader;
+    }
+
+    /**
+     * @return ledgers path for bk
+     */
+    public String getBkLedgersPath() {
+        return bkLedgersPath;
+    }
+
+    /**
+     * Enable/Disable sanity check txn id.
+     *
+     * @param enabled
+     *          flag to enable/disable sanity check txn id.
+     * @return bk dl config.
+     */
+    public BKDLConfig setSanityCheckTxnID(boolean enabled) {
+        this.sanityCheckTxnID = enabled;
+        return this;
+    }
+
+    /**
+     * @return flag to sanity check highest txn id.
+     */
+    public boolean getSanityCheckTxnID() {
+        return sanityCheckTxnID;
+    }
+
+    /**
+     * Enable/Disable encode region id.
+     *
+     * @param enabled
+     *          flag to enable/disable encoding region id.
+     * @return bk dl config
+     */
+    public BKDLConfig setEncodeRegionID(boolean enabled) {
+        this.encodeRegionID = enabled;
+        return this;
+    }
+
+    /**
+     * @return flag to encode region id.
+     */
+    public boolean getEncodeRegionID() {
+        return encodeRegionID;
+    }
+
+    /**
+     * Set the root path of zk based ACL manager.
+     *
+     * @param aclRootPath
+     *          root path of zk based ACL manager.
+     * @return bk dl config
+     */
+    public BKDLConfig setACLRootPath(String aclRootPath) {
+        this.aclRootPath = aclRootPath;
+        return this;
+    }
+
+    /**
+     * Get the root path of zk based ACL manager.
+     *
+     * @return root path of zk based ACL manager.
+     */
+    public String getACLRootPath() {
+        return aclRootPath;
+    }
+
+    /**
+     * Set the value at which ledger sequence number should start for streams 
that are being
+     * upgraded and did not have ledger sequence number to start with or for 
newly created
+     * streams
+     *
+     * @param firstLogSegmentSeqNo first ledger sequence number
+     * @return bk dl config
+     */
+    public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) {
+        this.firstLogSegmentSeqNo = firstLogSegmentSeqNo;
+        return this;
+    }
+
+    /**
+     * Get the value at which ledger sequence number should start for streams 
that are being
+     * upgraded and did not have ledger sequence number to start with or for 
newly created
+     * streams
+     *
+     * @return first ledger sequence number
+     */
+    public Long getFirstLogSegmentSeqNo() {
+        if (null == firstLogSegmentSeqNo) {
+            return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO;
+        }
+        return firstLogSegmentSeqNo;
+    }
+
+    /**
+     * Set the namespace to federated <i>isFederatedNamespace</i>.
+     *
+     * @param isFederatedNamespace
+     *          is the namespace federated?
+     * @return bk dl config
+     */
+    public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) {
+        this.isFederatedNamespace = isFederatedNamespace;
+        return this;
+    }
+
+    /**
+     * Whether the namespace is federated namespace
+     *
+     * @return true if the namespace is a federated namespace. otherwise false.
+     */
+    public boolean isFederatedNamespace() {
+        return this.isFederatedNamespace;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader,
+                                dlZkServersForWriter, dlZkServersForReader,
+                                bkLedgersPath);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof BKDLConfig)) {
+            return false;
+        }
+        BKDLConfig another = (BKDLConfig) o;
+        return Objects.equal(bkZkServersForWriter, 
another.bkZkServersForWriter) &&
+               Objects.equal(bkZkServersForReader, 
another.bkZkServersForReader) &&
+               Objects.equal(dlZkServersForWriter, 
another.dlZkServersForWriter) &&
+               Objects.equal(dlZkServersForReader, 
another.dlZkServersForReader) &&
+               Objects.equal(bkLedgersPath, another.bkLedgersPath) &&
+               sanityCheckTxnID == another.sanityCheckTxnID &&
+               encodeRegionID == another.encodeRegionID &&
+               Objects.equal(aclRootPath, another.aclRootPath) &&
+               Objects.equal(firstLogSegmentSeqNo, 
another.firstLogSegmentSeqNo) &&
+               Objects.equal(isFederatedNamespace, 
another.isFederatedNamespace);
+
+    }
+
+    @Override
+    public String toString() {
+        return serialize();
+    }
+
+    @Override
+    public String serialize() {
+        BKDLConfigFormat configFormat = new BKDLConfigFormat();
+        if (null != bkZkServersForWriter) {
+            configFormat.setBkZkServers(bkZkServersForWriter);
+        }
+        if (null != bkZkServersForReader) {
+            configFormat.setBkZkServersForReader(bkZkServersForReader);
+        }
+        if (null != dlZkServersForWriter) {
+            configFormat.setDlZkServersForWriter(dlZkServersForWriter);
+        }
+        if (null != dlZkServersForReader) {
+            configFormat.setDlZkServersForReader(dlZkServersForReader);
+        }
+        if (null != bkLedgersPath) {
+            configFormat.setBkLedgersPath(bkLedgersPath);
+        }
+        configFormat.setSanityCheckTxnID(sanityCheckTxnID);
+        configFormat.setEncodeRegionID(encodeRegionID);
+        if (null != aclRootPath) {
+            configFormat.setAclRootPath(aclRootPath);
+        }
+        if (null != firstLogSegmentSeqNo) {
+            configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo);
+        }
+        if (isFederatedNamespace) {
+            configFormat.setFederatedNamespace(true);
+        }
+        return serialize(configFormat);
+    }
+
+    String serialize(BKDLConfigFormat configFormat) {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            configFormat.write(protocol);
+            transport.flush();
+            return transport.toString("UTF-8");
+        } catch (TException e) {
+            throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException("Failed to serialize BKDLConfig : ", e);
+        }
+    }
+
+    @Override
+    public void deserialize(byte[] data) throws IOException {
+        BKDLConfigFormat configFormat = new BKDLConfigFormat();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            configFormat.read(protocol);
+        } catch (TException e) {
+            throw new IOException("Failed to deserialize data '" +
+                    new String(data, UTF_8) + "' : ", e);
+        }
+        // bookkeeper cluster settings
+        if (configFormat.isSetBkZkServers()) {
+            bkZkServersForWriter = configFormat.getBkZkServers();
+        }
+        if (configFormat.isSetBkZkServersForReader()) {
+            bkZkServersForReader = configFormat.getBkZkServersForReader();
+        } else {
+            bkZkServersForReader = bkZkServersForWriter;
+        }
+        if (configFormat.isSetBkLedgersPath()) {
+            bkLedgersPath = configFormat.getBkLedgersPath();
+        }
+        // dl zookeeper cluster settings
+        if (configFormat.isSetDlZkServersForWriter()) {
+            dlZkServersForWriter = configFormat.getDlZkServersForWriter();
+        }
+        if (configFormat.isSetDlZkServersForReader()) {
+            dlZkServersForReader = configFormat.getDlZkServersForReader();
+        } else {
+            dlZkServersForReader = dlZkServersForWriter;
+        }
+        // dl settings
+        sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || 
configFormat.isSanityCheckTxnID();
+        encodeRegionID = configFormat.isSetEncodeRegionID() && 
configFormat.isEncodeRegionID();
+        if (configFormat.isSetAclRootPath()) {
+            aclRootPath = configFormat.getAclRootPath();
+        }
+
+        if (configFormat.isSetFirstLogSegmentSeqNo()) {
+            firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo();
+        }
+        isFederatedNamespace = configFormat.isSetFederatedNamespace() && 
configFormat.isFederatedNamespace();
+
+        // Validate the settings
+        if (null == bkZkServersForWriter || null == bkZkServersForReader || 
null == bkLedgersPath ||
+                null == dlZkServersForWriter || null == dlZkServersForReader) {
+            throw new IOException("Missing zk/bk settings in BKDL Config : " + 
new String(data, UTF_8));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
new file mode 100644
index 0000000..6b7a231
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.metadata.DLMetadata;
+import com.twitter.distributedlog.metadata.MetadataResolver;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class ZkMetadataResolver implements MetadataResolver {
+
+    private final ZooKeeperClient zkc;
+
+    public ZkMetadataResolver(ZooKeeperClient zkc) {
+        this.zkc = zkc;
+    }
+
+    @Override
+    public DLMetadata resolve(URI uri) throws IOException {
+        String dlPath = uri.getPath();
+        PathUtils.validatePath(dlPath);
+        // Normal case the dl metadata is stored in the last segment
+        // so lookup last segment first.
+        String[] parts = StringUtils.split(dlPath, '/');
+        if (null == parts || 0 == parts.length) {
+            throw new IOException("Invalid dlPath to resolve dl metadata : " + 
dlPath);
+        }
+        for (int i = parts.length; i >= 0; i--) {
+            String pathToResolve = String.format("/%s", 
StringUtils.join(parts, '/', 0, i));
+            byte[] data;
+            try {
+                data = zkc.get().getData(pathToResolve, false, new Stat());
+            } catch (KeeperException.NoNodeException nne) {
+                continue;
+            } catch (KeeperException ke) {
+                throw new IOException("Fail to resolve dl path : " + 
pathToResolve);
+            } catch (InterruptedException ie) {
+                throw new IOException("Interrupted when resolving dl path : " 
+ pathToResolve);
+            }
+            if (null == data || data.length == 0) {
+                continue;
+            }
+            try {
+                return DLMetadata.deserialize(uri, data);
+            } catch (IOException ie) {
+                throw new IOException("Failed to deserialize uri : " + uri);
+            }
+        }
+        throw new IOException("No bkdl config bound under dl path : " + 
dlPath);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
new file mode 100644
index 0000000..b067ee9
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.subscription;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.twitter.distributedlog.subscription.SubscriptionStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import com.google.common.base.Charsets;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import com.twitter.distributedlog.DLSN;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+
+public class ZKSubscriptionStateStore implements SubscriptionStateStore {
+
+    static final Logger logger = 
LoggerFactory.getLogger(ZKSubscriptionStateStore.class);
+
+    private final ZooKeeperClient zooKeeperClient;
+    private final String zkPath;
+    private AtomicReference<DLSN> lastCommittedPosition = new 
AtomicReference<DLSN>(null);
+
+    public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String 
zkPath) {
+        this.zooKeeperClient = zooKeeperClient;
+        this.zkPath = zkPath;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    /**
+     * Get the last committed position stored for this subscription
+     */
+    @Override
+    public Future<DLSN> getLastCommitPosition() {
+        if (null != lastCommittedPosition.get()) {
+            return Future.value(lastCommittedPosition.get());
+        } else {
+            return getLastCommitPositionFromZK();
+        }
+    }
+
+    Future<DLSN> getLastCommitPositionFromZK() {
+        final Promise<DLSN> result = new Promise<DLSN>();
+        try {
+            logger.debug("Reading last commit position from path {}", zkPath);
+            zooKeeperClient.get().getData(zkPath, false, new 
AsyncCallback.DataCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+                    logger.debug("Read last commit position from path {}: rc = 
{}", zkPath, rc);
+                    if (KeeperException.Code.NONODE.intValue() == rc) {
+                        result.setValue(DLSN.NonInclusiveLowerBound);
+                    } else if (KeeperException.Code.OK.intValue() != rc) {
+                        
result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
+                    } else {
+                        try {
+                            DLSN dlsn = DLSN.deserialize(new String(data, 
Charsets.UTF_8));
+                            result.setValue(dlsn);
+                        } catch (Exception t) {
+                            logger.warn("Invalid last commit position found 
from path {}", zkPath, t);
+                            // invalid dlsn recorded in subscription state 
store
+                            result.setValue(DLSN.NonInclusiveLowerBound);
+                        }
+                    }
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
+            result.setException(zkce);
+        } catch (InterruptedException ie) {
+            result.setException(new 
DLInterruptedException("getLastCommitPosition was interrupted", ie));
+        }
+        return result;
+    }
+
+    /**
+     * Advances the position associated with the subscriber
+     *
+     * @param newPosition - new commit position
+     */
+    @Override
+    public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) {
+        if (null == lastCommittedPosition.get() ||
+            (newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
+            lastCommittedPosition.set(newPosition);
+            return 
Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient,
+                zkPath, newPosition.serialize().getBytes(Charsets.UTF_8),
+                zooKeeperClient.getDefaultACL(),
+                CreateMode.PERSISTENT);
+        } else {
+            return Future.Done();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
new file mode 100644
index 0000000..17ba943
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.subscription;
+
+import com.twitter.distributedlog.DLSN;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.exceptions.DLInterruptedException;
+import com.twitter.distributedlog.subscription.SubscriptionStateStore;
+import com.twitter.distributedlog.subscription.SubscriptionsStore;
+import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * ZooKeeper Based Subscriptions Store.
+ */
+public class ZKSubscriptionsStore implements SubscriptionsStore {
+
+    private final ZooKeeperClient zkc;
+    private final String zkPath;
+    private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers =
+            new ConcurrentHashMap<String, ZKSubscriptionStateStore>();
+
+    public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) {
+        this.zkc = zkc;
+        this.zkPath = zkPath;
+    }
+
+    private ZKSubscriptionStateStore getSubscriber(String subscriberId) {
+        ZKSubscriptionStateStore ss = subscribers.get(subscriberId);
+        if (ss == null) {
+            ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc,
+                getSubscriberZKPath(subscriberId));
+            ZKSubscriptionStateStore oldSS = 
subscribers.putIfAbsent(subscriberId, newSS);
+            if (oldSS == null) {
+                ss = newSS;
+            } else {
+                try {
+                    newSS.close();
+                } catch (IOException e) {
+                    // ignore the exception
+                }
+                ss = oldSS;
+            }
+        }
+        return ss;
+    }
+
+    private String getSubscriberZKPath(String subscriberId) {
+        return String.format("%s/%s", zkPath, subscriberId);
+    }
+
+    @Override
+    public Future<DLSN> getLastCommitPosition(String subscriberId) {
+        return getSubscriber(subscriberId).getLastCommitPosition();
+    }
+
+    @Override
+    public Future<Map<String, DLSN>> getLastCommitPositions() {
+        final Promise<Map<String, DLSN>> result = new Promise<Map<String, 
DLSN>>();
+        try {
+            this.zkc.get().getChildren(this.zkPath, false, new 
AsyncCallback.Children2Callback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+                    if (KeeperException.Code.NONODE.intValue() == rc) {
+                        result.setValue(new HashMap<String, DLSN>());
+                    } else if (KeeperException.Code.OK.intValue() != rc) {
+                        
result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
+                    } else {
+                        getLastCommitPositions(result, children);
+                    }
+                }
+            }, null);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
+            result.setException(zkce);
+        } catch (InterruptedException ie) {
+            result.setException(new 
DLInterruptedException("getLastCommitPositions was interrupted", ie));
+        }
+        return result;
+    }
+
+    private void getLastCommitPositions(final Promise<Map<String, DLSN>> 
result,
+                                        List<String> subscribers) {
+        List<Future<Pair<String, DLSN>>> futures =
+                new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
+        for (String s : subscribers) {
+            final String subscriber = s;
+            Future<Pair<String, DLSN>> future =
+                // Get the last commit position from zookeeper
+                getSubscriber(subscriber).getLastCommitPositionFromZK().map(
+                        new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
+                            @Override
+                            public Pair<String, DLSN> apply(DLSN dlsn) {
+                                return Pair.of(subscriber, dlsn);
+                            }
+                        });
+            futures.add(future);
+        }
+        Future.collect(futures).foreach(
+            new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
+                @Override
+                public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) 
{
+                    Map<String, DLSN> subscriptionMap = new HashMap<String, 
DLSN>();
+                    for (Pair<String, DLSN> pair : subscriptions) {
+                        subscriptionMap.put(pair.getLeft(), pair.getRight());
+                    }
+                    result.setValue(subscriptionMap);
+                    return BoxedUnit.UNIT;
+                }
+            });
+    }
+
+    @Override
+    public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN 
newPosition) {
+        return getSubscriber(subscriberId).advanceCommitPosition(newPosition);
+    }
+
+    @Override
+    public Future<Boolean> deleteSubscriber(String subscriberId) {
+        subscribers.remove(subscriberId);
+        String path = getSubscriberZKPath(subscriberId);
+        return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
+    }
+
+    @Override
+    public void close() throws IOException {
+        // no-op
+        for (SubscriptionStateStore store : subscribers.values()) {
+            store.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
index 07387cb..81eb5ed 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java
@@ -32,6 +32,15 @@ import java.util.List;
 @Beta
 public interface LogSegmentEntryReader extends AsyncCloseable {
 
+    interface StateChangeListener {
+
+        /**
+         * Notify when caught up on inprogress.
+         */
+        void onCaughtupOnInprogress();
+
+    }
+
     /**
      * Start the reader. The method to signal the implementation
      * to start preparing the data for consumption {@link #readNext(int)}
@@ -39,6 +48,22 @@ public interface LogSegmentEntryReader extends 
AsyncCloseable {
     void start();
 
     /**
+     * Register the state change listener
+     *
+     * @param listener register the state change listener
+     * @return entry reader
+     */
+    LogSegmentEntryReader registerListener(StateChangeListener listener);
+
+    /**
+     * Unregister the state change listener
+     *
+     * @param listener register the state change listener
+     * @return entry reader
+     */
+    LogSegmentEntryReader unregisterListener(StateChangeListener listener);
+
+    /**
      * Return the log segment metadata for this reader.
      *
      * @return the log segment metadata


Reply via email to