http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
deleted file mode 100644
index 0c90a50..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.acl;
-
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager}
- */
-public class ZKAccessControlManager implements AccessControlManager, Watcher {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ZKAccessControlManager.class);
-
-    private static final int ZK_RETRY_BACKOFF_MS = 500;
-
-    protected final DistributedLogConfiguration conf;
-    protected final ZooKeeperClient zkc;
-    protected final String zkRootPath;
-    protected final ScheduledExecutorService scheduledExecutorService;
-
-    protected final ConcurrentMap<String, ZKAccessControl> streamEntries;
-    protected ZKAccessControl defaultAccessControl;
-    protected volatile boolean closed = false;
-
-    public ZKAccessControlManager(DistributedLogConfiguration conf,
-                                  ZooKeeperClient zkc,
-                                  String zkRootPath,
-                                  ScheduledExecutorService 
scheduledExecutorService) throws IOException {
-        this.conf = conf;
-        this.zkc = zkc;
-        this.zkRootPath = zkRootPath;
-        this.scheduledExecutorService = scheduledExecutorService;
-        this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
-        try {
-            Await.result(fetchDefaultAccessControlEntry());
-        } catch (Throwable t) {
-            if (t instanceof InterruptedException) {
-                throw new DLInterruptedException("Interrupted on getting 
default access control entry for " + zkRootPath, t);
-            } else if (t instanceof KeeperException) {
-                throw new IOException("Encountered zookeeper exception on 
getting default access control entry for " + zkRootPath, t);
-            } else if (t instanceof IOException) {
-                throw (IOException) t;
-            } else {
-                throw new IOException("Encountered unknown exception on 
getting access control entries for " + zkRootPath, t);
-            }
-        }
-
-        try {
-            Await.result(fetchAccessControlEntries());
-        } catch (Throwable t) {
-            if (t instanceof InterruptedException) {
-                throw new DLInterruptedException("Interrupted on getting 
access control entries for " + zkRootPath, t);
-            } else if (t instanceof KeeperException) {
-                throw new IOException("Encountered zookeeper exception on 
getting access control entries for " + zkRootPath, t);
-            } else if (t instanceof IOException) {
-                throw (IOException) t;
-            } else {
-                throw new IOException("Encountered unknown exception on 
getting access control entries for " + zkRootPath, t);
-            }
-        }
-    }
-
-    protected AccessControlEntry getAccessControlEntry(String stream) {
-        ZKAccessControl entry = streamEntries.get(stream);
-        entry = null == entry ? defaultAccessControl : entry;
-        return entry.getAccessControlEntry();
-    }
-
-    @Override
-    public boolean allowWrite(String stream) {
-        return !getAccessControlEntry(stream).isDenyWrite();
-    }
-
-    @Override
-    public boolean allowTruncate(String stream) {
-        return !getAccessControlEntry(stream).isDenyTruncate();
-    }
-
-    @Override
-    public boolean allowDelete(String stream) {
-        return !getAccessControlEntry(stream).isDenyDelete();
-    }
-
-    @Override
-    public boolean allowAcquire(String stream) {
-        return !getAccessControlEntry(stream).isDenyAcquire();
-    }
-
-    @Override
-    public boolean allowRelease(String stream) {
-        return !getAccessControlEntry(stream).isDenyRelease();
-    }
-
-    @Override
-    public void close() {
-        closed = true;
-    }
-
-    private Future<Void> fetchAccessControlEntries() {
-        final Promise<Void> promise = new Promise<Void>();
-        fetchAccessControlEntries(promise);
-        return promise;
-    }
-
-    private void fetchAccessControlEntries(final Promise<Void> promise) {
-        try {
-            zkc.get().getChildren(zkRootPath, this, new 
AsyncCallback.Children2Callback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
-                    if (KeeperException.Code.OK.intValue() != rc) {
-                        
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                        return;
-                    }
-                    Set<String> streamsReceived = new HashSet<String>();
-                    streamsReceived.addAll(children);
-                    Set<String> streamsCached = streamEntries.keySet();
-                    Set<String> streamsRemoved = 
Sets.difference(streamsCached, streamsReceived).immutableCopy();
-                    for (String s : streamsRemoved) {
-                        ZKAccessControl accessControl = 
streamEntries.remove(s);
-                        if (null != accessControl) {
-                            logger.info("Removed Access Control Entry for 
stream {} : {}", s, accessControl.getAccessControlEntry());
-                        }
-                    }
-                    if (streamsReceived.isEmpty()) {
-                        promise.setValue(null);
-                        return;
-                    }
-                    final AtomicInteger numPendings = new 
AtomicInteger(streamsReceived.size());
-                    final AtomicInteger numFailures = new AtomicInteger(0);
-                    for (String s : streamsReceived) {
-                        final String streamName = s;
-                        ZKAccessControl.read(zkc, zkRootPath + "/" + 
streamName, null)
-                                .addEventListener(new 
FutureEventListener<ZKAccessControl>() {
-
-                                    @Override
-                                    public void onSuccess(ZKAccessControl 
accessControl) {
-                                        streamEntries.put(streamName, 
accessControl);
-                                        logger.info("Added overrided access 
control for stream {} : {}", streamName, accessControl.getAccessControlEntry());
-                                        complete();
-                                    }
-
-                                    @Override
-                                    public void onFailure(Throwable cause) {
-                                        if (cause instanceof 
KeeperException.NoNodeException) {
-                                            streamEntries.remove(streamName);
-                                        } else if (cause instanceof 
ZKAccessControl.CorruptedAccessControlException) {
-                                            logger.warn("Access control is 
corrupted for stream {} @ {}, skipped it ...",
-                                                        new Object[] { 
streamName, zkRootPath, cause });
-                                            streamEntries.remove(streamName);
-                                        } else {
-                                            if (1 == 
numFailures.incrementAndGet()) {
-                                                promise.setException(cause);
-                                            }
-                                        }
-                                        complete();
-                                    }
-
-                                    private void complete() {
-                                        if (0 == numPendings.decrementAndGet() 
&& numFailures.get() == 0) {
-                                            promise.setValue(null);
-                                        }
-                                    }
-                                });
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-    }
-
-    private Future<ZKAccessControl> fetchDefaultAccessControlEntry() {
-        final Promise<ZKAccessControl> promise = new 
Promise<ZKAccessControl>();
-        fetchDefaultAccessControlEntry(promise);
-        return promise;
-    }
-
-    private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> 
promise) {
-        ZKAccessControl.read(zkc, zkRootPath, this)
-            .addEventListener(new FutureEventListener<ZKAccessControl>() {
-                @Override
-                public void onSuccess(ZKAccessControl accessControl) {
-                    logger.info("Default Access Control will be changed from 
{} to {}",
-                                
ZKAccessControlManager.this.defaultAccessControl,
-                                accessControl);
-                    ZKAccessControlManager.this.defaultAccessControl = 
accessControl;
-                    promise.setValue(accessControl);
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    if (cause instanceof KeeperException.NoNodeException) {
-                        logger.info("Default Access Control is missing, 
creating one for {} ...", zkRootPath);
-                        createDefaultAccessControlEntryIfNeeded(promise);
-                    } else {
-                        promise.setException(cause);
-                    }
-                }
-            });
-    }
-
-    private void createDefaultAccessControlEntryIfNeeded(final 
Promise<ZKAccessControl> promise) {
-        ZooKeeper zk;
-        try {
-            zk = zkc.get();
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-            return;
-        } catch (InterruptedException e) {
-            promise.setException(e);
-            return;
-        }
-        ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], 
zkc.getDefaultACL(),
-                CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, String 
name) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    logger.info("Created zk path {} for default ACL.", 
zkRootPath);
-                    fetchDefaultAccessControlEntry(promise);
-                } else {
-                    
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                }
-            }
-        }, null);
-    }
-
-    private void refetchDefaultAccessControlEntry(final int delayMs) {
-        if (closed) {
-            return;
-        }
-        scheduledExecutorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                fetchDefaultAccessControlEntry().addEventListener(new 
FutureEventListener<ZKAccessControl>() {
-                    @Override
-                    public void onSuccess(ZKAccessControl value) {
-                        // no-op
-                    }
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        if (cause instanceof 
ZKAccessControl.CorruptedAccessControlException) {
-                            logger.warn("Default access control entry is 
corrupted, ignore this update : ", cause);
-                            return;
-                        }
-
-                        logger.warn("Encountered an error on refetching 
default access control entry, retrying in {} ms : ",
-                                    ZK_RETRY_BACKOFF_MS, cause);
-                        refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS);
-                    }
-                });
-            }
-        }, delayMs, TimeUnit.MILLISECONDS);
-    }
-
-    private void refetchAccessControlEntries(final int delayMs) {
-        if (closed) {
-            return;
-        }
-        scheduledExecutorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                fetchAccessControlEntries().addEventListener(new 
FutureEventListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        // no-op
-                    }
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        logger.warn("Encountered an error on refetching access 
control entries, retrying in {} ms : ",
-                                    ZK_RETRY_BACKOFF_MS, cause);
-                        refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
-                    }
-                });
-            }
-        }, delayMs, TimeUnit.MILLISECONDS);
-    }
-
-    private void refetchAllAccessControlEntries(final int delayMs) {
-        if (closed) {
-            return;
-        }
-        scheduledExecutorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                fetchDefaultAccessControlEntry().addEventListener(new 
FutureEventListener<ZKAccessControl>() {
-                    @Override
-                    public void onSuccess(ZKAccessControl value) {
-                        fetchAccessControlEntries().addEventListener(new 
FutureEventListener<Void>() {
-                            @Override
-                            public void onSuccess(Void value) {
-                                // no-op
-                            }
-
-                            @Override
-                            public void onFailure(Throwable cause) {
-                                logger.warn("Encountered an error on fetching 
all access control entries, retrying in {} ms : ",
-                                            ZK_RETRY_BACKOFF_MS, cause);
-                                
refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
-                            }
-                        });
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        logger.warn("Encountered an error on refetching all 
access control entries, retrying in {} ms : ",
-                                    ZK_RETRY_BACKOFF_MS, cause);
-                        refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS);
-                    }
-                });
-            }
-        }, delayMs, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        if (Event.EventType.None.equals(event.getType())) {
-            if (event.getState() == Event.KeeperState.Expired) {
-                refetchAllAccessControlEntries(0);
-            }
-        } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {
-            logger.info("Default ACL for {} is changed, refetching ...", 
zkRootPath);
-            refetchDefaultAccessControlEntry(0);
-        } else if 
(Event.EventType.NodeChildrenChanged.equals(event.getType())) {
-            logger.info("List of ACLs for {} are changed, refetching ...", 
zkRootPath);
-            refetchAccessControlEntries(0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
deleted file mode 100644
index 0a8f28b..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ /dev/null
@@ -1,760 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.federated;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.distributedlog.exceptions.LogExistsException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.ZKNamespaceWatcher;
-import com.twitter.distributedlog.metadata.LogMetadataStore;
-import com.twitter.distributedlog.namespace.NamespaceWatcher;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * A Federated ZooKeeper Based Log Metadata Store.
- *
- * To Upgrade a simple ZKLogMetadataStore to FederatedZKLogMetadataStore, 
following steps should be taken in sequence:
- * a) deploy the new code with disabling createStreamsIfNotExists in all 
writer.
- * b) once all proxies disable the flag, update namespace binding to enable 
federated namespace.
- * c) restart writers to take federated namespace in place.
- *
- * NOTE: current federated namespace isn't optimized for deletion/creation. so 
don't use it in the workloads
- *       that have lots of creations or deletions.
- */
-public class FederatedZKLogMetadataStore extends NamespaceWatcher implements 
LogMetadataStore, Watcher, Runnable,
-        FutureEventListener<Set<URI>> {
-
-    static final Logger logger = 
LoggerFactory.getLogger(FederatedZKLogMetadataStore.class);
-
-    private final static String ZNODE_SUB_NAMESPACES = ".subnamespaces";
-    private final static String SUB_NAMESPACE_PREFIX = "NS_";
-
-    /**
-     * Create the federated namespace.
-     *
-     * @param namespace
-     *          namespace to create
-     * @param zkc
-     *          zookeeper client
-     * @throws InterruptedException
-     * @throws ZooKeeperClient.ZooKeeperConnectionException
-     * @throws KeeperException
-     */
-    public static void createFederatedNamespace(URI namespace, ZooKeeperClient 
zkc)
-            throws InterruptedException, 
ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
-        String zkSubNamespacesPath = namespace.getPath() + "/" + 
ZNODE_SUB_NAMESPACES;
-        Utils.zkCreateFullPathOptimistic(zkc, zkSubNamespacesPath, new byte[0],
-                zkc.getDefaultACL(), CreateMode.PERSISTENT);
-    }
-
-    /**
-     * Represent a sub namespace inside the federated namespace.
-     */
-    class SubNamespace implements NamespaceListener {
-        final URI uri;
-        final ZKNamespaceWatcher watcher;
-        Promise<Set<String>> logsFuture = new Promise<Set<String>>();
-
-        SubNamespace(URI uri) {
-            this.uri = uri;
-            this.watcher = new ZKNamespaceWatcher(conf, uri, zkc, scheduler);
-            this.watcher.registerListener(this);
-        }
-
-        void watch() {
-            this.watcher.watchNamespaceChanges();
-        }
-
-        synchronized Future<Set<String>> getLogs() {
-            return logsFuture;
-        }
-
-        @Override
-        public void onStreamsChanged(Iterator<String> newLogsIter) {
-            Set<String> newLogs = Sets.newHashSet(newLogsIter);
-            Set<String> oldLogs = Sets.newHashSet();
-
-            // update the sub namespace cache
-            Promise<Set<String>> newLogsPromise;
-            synchronized (this) {
-                if (logsFuture.isDefined()) { // the promise is already 
satisfied
-                    try {
-                        oldLogs = FutureUtils.result(logsFuture);
-                    } catch (IOException e) {
-                        logger.error("Unexpected exception when getting logs 
from a satisified future of {} : ",
-                                uri, e);
-                    }
-                    logsFuture = new Promise<Set<String>>();
-                }
-
-                // update the reverse cache
-                for (String logName : newLogs) {
-                    URI oldURI = log2Locations.putIfAbsent(logName, uri);
-                    if (null != oldURI && !Objects.equal(uri, oldURI)) {
-                        logger.error("Log {} is found duplicated in multiple 
locations : old location = {}," +
-                                " new location = {}", new Object[] { logName, 
oldURI, uri });
-                        duplicatedLogFound.set(true);
-                    }
-                }
-
-                // remove the gone streams
-                Set<String> deletedLogs = Sets.difference(oldLogs, newLogs);
-                for (String logName : deletedLogs) {
-                    log2Locations.remove(logName, uri);
-                }
-                newLogsPromise = logsFuture;
-            }
-            newLogsPromise.setValue(newLogs);
-
-            // notify namespace changes
-            notifyOnNamespaceChanges();
-        }
-    }
-
-    final DistributedLogConfiguration conf;
-    final URI namespace;
-    final ZooKeeperClient zkc;
-    final OrderedScheduler scheduler;
-    final String zkSubnamespacesPath;
-    final AtomicBoolean duplicatedLogFound = new AtomicBoolean(false);
-    final AtomicReference<String> duplicatedLogName = new 
AtomicReference<String>(null);
-    final AtomicReference<Integer> zkSubnamespacesVersion = new 
AtomicReference<Integer>(null);
-
-    final int maxLogsPerSubnamespace;
-    // sub namespaces
-    final ConcurrentSkipListMap<URI, SubNamespace> subNamespaces;
-    // map between log name and its location
-    final ConcurrentMap<String, URI> log2Locations;
-    // final
-    final boolean forceCheckLogExistence;
-
-    public FederatedZKLogMetadataStore(
-            DistributedLogConfiguration conf,
-            URI namespace,
-            ZooKeeperClient zkc,
-            OrderedScheduler scheduler) throws IOException {
-        this.conf = conf;
-        this.namespace = namespace;
-        this.zkc = zkc;
-        this.scheduler = scheduler;
-        this.forceCheckLogExistence = 
conf.getFederatedCheckExistenceWhenCacheMiss();
-        this.subNamespaces = new ConcurrentSkipListMap<URI, SubNamespace>();
-        this.log2Locations = new ConcurrentHashMap<String, URI>();
-        this.zkSubnamespacesPath = namespace.getPath() + "/" + 
ZNODE_SUB_NAMESPACES;
-        this.maxLogsPerSubnamespace = 
conf.getFederatedMaxLogsPerSubnamespace();
-
-        // fetch the sub namespace
-        Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this));
-        for (URI uri : uris) {
-            SubNamespace subNs = new SubNamespace(uri);
-            if (null == subNamespaces.putIfAbsent(uri, subNs)) {
-                subNs.watch();
-                logger.info("Watched sub namespace {}", uri);
-            }
-        }
-
-        logger.info("Federated ZK LogMetadataStore is initialized for {}", 
namespace);
-    }
-
-    private void scheduleTask(Runnable r, long ms) {
-        if (duplicatedLogFound.get()) {
-            logger.error("Scheduler is halted for federated namespace {} as 
duplicated log found",
-                    namespace);
-            return;
-        }
-        try {
-            scheduler.schedule(r, ms, TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException ree) {
-            logger.error("Task {} scheduled in {} ms is rejected : ", new 
Object[]{r, ms, ree});
-        }
-    }
-
-    private <T> Future<T> postStateCheck(Future<T> future) {
-        final Promise<T> postCheckedPromise = new Promise<T>();
-        future.addEventListener(new FutureEventListener<T>() {
-            @Override
-            public void onSuccess(T value) {
-                if (duplicatedLogFound.get()) {
-                    postCheckedPromise.setException(new 
UnexpectedException("Duplicate log found under " + namespace));
-                } else {
-                    postCheckedPromise.setValue(value);
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                postCheckedPromise.setException(cause);
-            }
-        });
-        return postCheckedPromise;
-    }
-
-    //
-    // SubNamespace Related Methods
-    //
-
-    @VisibleForTesting
-    Set<URI> getSubnamespaces() {
-        return subNamespaces.keySet();
-    }
-
-    @VisibleForTesting
-    void removeLogFromCache(String logName) {
-        log2Locations.remove(logName);
-    }
-
-    private URI getSubNamespaceURI(String ns) throws URISyntaxException {
-        return new URI(
-                namespace.getScheme(),
-                namespace.getUserInfo(),
-                namespace.getHost(),
-                namespace.getPort(),
-                namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + ns,
-                namespace.getQuery(),
-                namespace.getFragment());
-    }
-
-    Future<Set<URI>> getCachedSubNamespaces() {
-        Set<URI> nsSet = subNamespaces.keySet();
-        return Future.value(nsSet);
-    }
-
-    Future<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
-        final Promise<Set<URI>> promise = new Promise<Set<URI>>();
-        try {
-            zkc.get().sync(this.zkSubnamespacesPath, new 
AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    if (Code.OK.intValue() == rc) {
-                        fetchSubNamespaces(watcher, promise);
-                    } else {
-                        
promise.setException(KeeperException.create(Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-        return promise;
-    }
-
-    private void fetchSubNamespaces(Watcher watcher,
-                                    final Promise<Set<URI>> promise) {
-        try {
-            zkc.get().getChildren(this.zkSubnamespacesPath, watcher,
-                    new AsyncCallback.Children2Callback() {
-                        @Override
-                        public void processResult(int rc, String path, Object 
ctx, List<String> children, Stat stat) {
-                            if (Code.NONODE.intValue() == rc) {
-                                promise.setException(new UnexpectedException(
-                                        "The subnamespaces don't exist for the 
federated namespace " + namespace));
-                            } else if (Code.OK.intValue() == rc) {
-                                Set<URI> subnamespaces = Sets.newHashSet();
-                                subnamespaces.add(namespace);
-                                try {
-                                    for (String ns : children) {
-                                        
subnamespaces.add(getSubNamespaceURI(ns));
-                                    }
-                                } catch (URISyntaxException use) {
-                                    logger.error("Invalid sub namespace uri 
found : ", use);
-                                    promise.setException(new 
UnexpectedException(
-                                            "Invalid sub namespace uri found 
in " + namespace, use));
-                                    return;
-                                }
-                                // update the sub namespaces set before update 
version
-                                setZkSubnamespacesVersion(stat.getVersion());
-                                promise.setValue(subnamespaces);
-                            }
-                        }
-                    }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-    }
-
-    @Override
-    public void run() {
-        fetchSubNamespaces(this).addEventListener(this);
-    }
-
-    @Override
-    public void onSuccess(Set<URI> uris) {
-        for (URI uri : uris) {
-            if (subNamespaces.containsKey(uri)) {
-                continue;
-            }
-            SubNamespace subNs = new SubNamespace(uri);
-            if (null == subNamespaces.putIfAbsent(uri, subNs)) {
-                subNs.watch();
-                logger.info("Watched new sub namespace {}.", uri);
-                notifyOnNamespaceChanges();
-            }
-        }
-    }
-
-    @Override
-    public void onFailure(Throwable cause) {
-        // failed to fetch namespaces, retry later
-        scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
-    }
-
-    @Override
-    public void process(WatchedEvent watchedEvent) {
-        if (Event.EventType.None == watchedEvent.getType() &&
-                Event.KeeperState.Expired == watchedEvent.getState()) {
-            scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
-            return;
-        }
-        if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
-            // fetch the namespace
-            fetchSubNamespaces(this).addEventListener(this);
-        }
-    }
-
-    //
-    // Log Related Methods
-    //
-
-    private <A> Future<A> duplicatedLogException(String logName) {
-        return Future.exception(new UnexpectedException("Duplicated log " + 
logName
-                + " found in namespace " + namespace));
-    }
-
-    @Override
-    public Future<URI> createLog(final String logName) {
-        if (duplicatedLogFound.get()) {
-            return duplicatedLogException(duplicatedLogName.get());
-        }
-        Promise<URI> createPromise = new Promise<URI>();
-        doCreateLog(logName, createPromise);
-        return postStateCheck(createPromise);
-    }
-
-    void doCreateLog(final String logName, final Promise<URI> createPromise) {
-        getLogLocation(logName).addEventListener(new 
FutureEventListener<Optional<URI>>() {
-            @Override
-            public void onSuccess(Optional<URI> uriOptional) {
-                if (uriOptional.isPresent()) {
-                    createPromise.setException(new LogExistsException("Log " + 
logName + " already exists in " + uriOptional.get()));
-                } else {
-                    getCachedSubNamespacesAndCreateLog(logName, createPromise);
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
-            }
-        });
-    }
-
-    private void getCachedSubNamespacesAndCreateLog(final String logName,
-                                                    final Promise<URI> 
createPromise) {
-        getCachedSubNamespaces().addEventListener(new 
FutureEventListener<Set<URI>>() {
-            @Override
-            public void onSuccess(Set<URI> uris) {
-                findSubNamespaceToCreateLog(logName, uris, createPromise);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
-            }
-        });
-    }
-
-    private void fetchSubNamespacesAndCreateLog(final String logName,
-                                                final Promise<URI> 
createPromise) {
-        fetchSubNamespaces(null).addEventListener(new 
FutureEventListener<Set<URI>>() {
-            @Override
-            public void onSuccess(Set<URI> uris) {
-                findSubNamespaceToCreateLog(logName, uris, createPromise);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
-            }
-        });
-    }
-
-    private void findSubNamespaceToCreateLog(final String logName,
-                                             final Set<URI> uris,
-                                             final Promise<URI> createPromise) 
{
-        final List<URI> uriList = 
Lists.newArrayListWithExpectedSize(uris.size());
-        List<Future<Set<String>>> futureList = 
Lists.newArrayListWithExpectedSize(uris.size());
-        for (URI uri : uris) {
-            SubNamespace subNs = subNamespaces.get(uri);
-            if (null == subNs) {
-                createPromise.setException(new UnexpectedException("No sub 
namespace " + uri + " found"));
-                return;
-            }
-            futureList.add(subNs.getLogs());
-            uriList.add(uri);
-        }
-        Future.collect(futureList).addEventListener(new 
FutureEventListener<List<Set<String>>>() {
-            @Override
-            public void onSuccess(List<Set<String>> resultList) {
-                for (int i = resultList.size() - 1; i >= 0; i--) {
-                    Set<String> logs = resultList.get(i);
-                    if (logs.size() < maxLogsPerSubnamespace) {
-                        URI uri = uriList.get(i);
-                        createLogInNamespace(uri, logName, createPromise);
-                        return;
-                    }
-                }
-                // All sub namespaces are full
-                createSubNamespace().addEventListener(new 
FutureEventListener<URI>() {
-                    @Override
-                    public void onSuccess(URI uri) {
-                        // the new namespace will be propagated to the 
namespace cache by the namespace listener
-                        // so we don't need to cache it here. we could go 
ahead to create the stream under this
-                        // namespace, as we are using sequential znode. we are 
mostly the first guy who create
-                        // the log under this namespace.
-                        createLogInNamespace(uri, logName, createPromise);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        createPromise.setException(cause);
-                    }
-                });
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                createPromise.setException(cause);
-            }
-        });
-    }
-
-    private String getNamespaceFromZkPath(String zkPath) throws 
UnexpectedException {
-        String parts[] = zkPath.split(SUB_NAMESPACE_PREFIX);
-        if (parts.length <= 0) {
-            throw new UnexpectedException("Invalid namespace @ " + zkPath);
-        }
-        return SUB_NAMESPACE_PREFIX + parts[parts.length - 1];
-    }
-
-    Future<URI> createSubNamespace() {
-        final Promise<URI> promise = new Promise<URI>();
-
-        final String nsPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES 
+ "/" + SUB_NAMESPACE_PREFIX;
-        try {
-            zkc.get().create(nsPath, new byte[0], zkc.getDefaultACL(), 
CreateMode.PERSISTENT_SEQUENTIAL,
-                    new AsyncCallback.StringCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object 
ctx, String name) {
-                            if (Code.OK.intValue() == rc) {
-                                try {
-                                    URI newUri = 
getSubNamespaceURI(getNamespaceFromZkPath(name));
-                                    logger.info("Created sub namespace {}", 
newUri);
-                                    promise.setValue(newUri);
-                                } catch (UnexpectedException ue) {
-                                    promise.setException(ue);
-                                } catch (URISyntaxException e) {
-                                    promise.setException(new 
UnexpectedException("Invalid namespace " + name + " is created."));
-                                }
-                            } else {
-                                
promise.setException(KeeperException.create(Code.get(rc)));
-                            }
-                        }
-                    }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            promise.setException(e);
-        } catch (InterruptedException e) {
-            promise.setException(e);
-        }
-
-        return promise;
-    }
-
-    /**
-     * Create a log under the namespace. To guarantee there is only one 
creation happens at time
-     * in a federated namespace, we use CAS operation in zookeeper.
-     *
-     * @param uri
-     *          namespace
-     * @param logName
-     *          name of the log
-     * @param createPromise
-     *          the promise representing the creation result.
-     */
-    private void createLogInNamespace(final URI uri,
-                                      final String logName,
-                                      final Promise<URI> createPromise) {
-        // TODO: rewrite this after we bump to zk 3.5, where we will have 
asynchronous version of multi
-        scheduler.submit(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    createLogInNamespaceSync(uri, logName);
-                    createPromise.setValue(uri);
-                } catch (InterruptedException e) {
-                    createPromise.setException(e);
-                } catch (IOException e) {
-                    createPromise.setException(e);
-                } catch (KeeperException.BadVersionException bve) {
-                    fetchSubNamespacesAndCreateLog(logName, createPromise);
-                } catch (KeeperException e) {
-                    createPromise.setException(e);
-                }
-            }
-        });
-    }
-
-    void createLogInNamespaceSync(URI uri, String logName)
-            throws InterruptedException, IOException, KeeperException {
-        Transaction txn = zkc.get().transaction();
-        // we don't have the zk version yet. set it to 0 instead of -1, to 
prevent non CAS operation.
-        int zkVersion = null == zkSubnamespacesVersion.get() ? 0 : 
zkSubnamespacesVersion.get();
-        txn.setData(zkSubnamespacesPath, uri.getPath().getBytes(UTF_8), 
zkVersion);
-        String logPath = uri.getPath() + "/" + logName;
-        txn.create(logPath, new byte[0], zkc.getDefaultACL(), 
CreateMode.PERSISTENT);
-        try {
-            txn.commit();
-            // if the transaction succeed, the zk version is advanced
-            setZkSubnamespacesVersion(zkVersion + 1);
-        } catch (KeeperException ke) {
-            List<OpResult> opResults = ke.getResults();
-            OpResult createResult = opResults.get(1);
-            if (createResult instanceof OpResult.ErrorResult) {
-                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) 
createResult;
-                if (Code.NODEEXISTS.intValue() == errorResult.getErr()) {
-                    throw new LogExistsException("Log " + logName + " already 
exists");
-                }
-            }
-            OpResult setResult = opResults.get(0);
-            if (setResult instanceof OpResult.ErrorResult) {
-                OpResult.ErrorResult errorResult = (OpResult.ErrorResult) 
setResult;
-                if (Code.BADVERSION.intValue() == errorResult.getErr()) {
-                    throw KeeperException.create(Code.BADVERSION);
-                }
-            }
-            throw new ZKException("ZK exception in creating log " + logName + 
" in " + uri, ke);
-        }
-    }
-
-    void setZkSubnamespacesVersion(int zkVersion) {
-        Integer oldVersion;
-        boolean done = false;
-        while (!done) {
-            oldVersion = zkSubnamespacesVersion.get();
-            if (null == oldVersion) {
-                done = zkSubnamespacesVersion.compareAndSet(null, zkVersion);
-                continue;
-            }
-            if (oldVersion < zkVersion) {
-                done = zkSubnamespacesVersion.compareAndSet(oldVersion, 
zkVersion);
-                continue;
-            } else {
-                done = true;
-            }
-        }
-    }
-
-    @Override
-    public Future<Optional<URI>> getLogLocation(final String logName) {
-        if (duplicatedLogFound.get()) {
-            return duplicatedLogException(duplicatedLogName.get());
-        }
-        URI location = log2Locations.get(logName);
-        if (null != location) {
-            return postStateCheck(Future.value(Optional.of(location)));
-        }
-        if (!forceCheckLogExistence) {
-            Optional<URI> result = Optional.absent();
-            return Future.value(result);
-        }
-        return postStateCheck(fetchLogLocation(logName).onSuccess(
-                new AbstractFunction1<Optional<URI>, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(Optional<URI> uriOptional) {
-                        if (uriOptional.isPresent()) {
-                            log2Locations.putIfAbsent(logName, 
uriOptional.get());
-                        }
-                        return BoxedUnit.UNIT;
-                    }
-                }));
-    }
-
-    private Future<Optional<URI>> fetchLogLocation(final String logName) {
-        final Promise<Optional<URI>> fetchPromise = new 
Promise<Optional<URI>>();
-
-        Set<URI> uris = subNamespaces.keySet();
-        List<Future<Optional<URI>>> fetchFutures = 
Lists.newArrayListWithExpectedSize(uris.size());
-        for (URI uri : uris) {
-            fetchFutures.add(fetchLogLocation(uri, logName));
-        }
-        Future.collect(fetchFutures).addEventListener(new 
FutureEventListener<List<Optional<URI>>>() {
-            @Override
-            public void onSuccess(List<Optional<URI>> fetchResults) {
-                Optional<URI> result = Optional.absent();
-                for (Optional<URI> fetchResult : fetchResults) {
-                    if (result.isPresent()) {
-                        if (fetchResult.isPresent()) {
-                            logger.error("Log {} is found in multiple sub 
namespaces : {} & {}.",
-                                    new Object[] { logName, result.get(), 
fetchResult.get() });
-                            duplicatedLogName.compareAndSet(null, logName);
-                            duplicatedLogFound.set(true);
-                            fetchPromise.setException(new 
UnexpectedException("Log " + logName
-                                    + " is found in multiple sub namespaces : "
-                                    + result.get() + " & " + 
fetchResult.get()));
-                            return;
-                        }
-                    } else {
-                        result = fetchResult;
-                    }
-                }
-                fetchPromise.setValue(result);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                fetchPromise.setException(cause);
-            }
-        });
-        return fetchPromise;
-    }
-
-    private Future<Optional<URI>> fetchLogLocation(final URI uri, String 
logName) {
-        final Promise<Optional<URI>> fetchPromise = new 
Promise<Optional<URI>>();
-        final String logRootPath = uri.getPath() + "/" + logName;
-        try {
-            zkc.get().exists(logRootPath, false, new 
AsyncCallback.StatCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
-                    if (Code.OK.intValue() == rc) {
-                        fetchPromise.setValue(Optional.of(uri));
-                    } else if (Code.NONODE.intValue() == rc) {
-                        fetchPromise.setValue(Optional.<URI>absent());
-                    } else {
-                        
fetchPromise.setException(KeeperException.create(Code.get(rc)));
-                    }
-                }
-            }, null);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            fetchPromise.setException(e);
-        } catch (InterruptedException e) {
-            fetchPromise.setException(e);
-        }
-        return fetchPromise;
-    }
-
-    @Override
-    public Future<Iterator<String>> getLogs() {
-        if (duplicatedLogFound.get()) {
-            return duplicatedLogException(duplicatedLogName.get());
-        }
-        return postStateCheck(retrieveLogs().map(
-                new AbstractFunction1<List<Set<String>>, Iterator<String>>() {
-                    @Override
-                    public Iterator<String> apply(List<Set<String>> 
resultList) {
-                        return getIterator(resultList);
-                    }
-                }));
-    }
-
-    private Future<List<Set<String>>> retrieveLogs() {
-        Collection<SubNamespace> subNss = subNamespaces.values();
-        List<Future<Set<String>>> logsList = 
Lists.newArrayListWithExpectedSize(subNss.size());
-        for (SubNamespace subNs : subNss) {
-            logsList.add(subNs.getLogs());
-        }
-        return Future.collect(logsList);
-    }
-
-    private Iterator<String> getIterator(List<Set<String>> resultList) {
-        List<Iterator<String>> iterList = 
Lists.newArrayListWithExpectedSize(resultList.size());
-        for (Set<String> result : resultList) {
-            iterList.add(result.iterator());
-        }
-        return Iterators.concat(iterList.iterator());
-    }
-
-    @Override
-    public void registerNamespaceListener(NamespaceListener listener) {
-        registerListener(listener);
-    }
-
-    @Override
-    protected void watchNamespaceChanges() {
-        // as the federated namespace already started watching namespace 
changes,
-        // we don't need to do any actions here
-    }
-
-    private void notifyOnNamespaceChanges() {
-        retrieveLogs().onSuccess(new AbstractFunction1<List<Set<String>>, 
BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(List<Set<String>> resultList) {
-                for (NamespaceListener listener : listeners) {
-                    listener.onStreamsChanged(getIterator(resultList));
-                }
-                return BoxedUnit.UNIT;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
deleted file mode 100644
index d7ff4fb..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import com.twitter.distributedlog.util.Allocator;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.client.LedgerHandle;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
-
-/**
- * Allocate log segments
- */
-class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, 
Object> {
-
-    private static class NewLogSegmentEntryWriterFn extends 
AbstractFunction1<LedgerHandle, LogSegmentEntryWriter> {
-
-        static final Function1<LedgerHandle, LogSegmentEntryWriter> INSTANCE =
-                new NewLogSegmentEntryWriterFn();
-
-        private NewLogSegmentEntryWriterFn() {}
-
-        @Override
-        public LogSegmentEntryWriter apply(LedgerHandle lh) {
-            return new BKLogSegmentEntryWriter(lh);
-        }
-    }
-
-    LedgerAllocator allocator;
-
-    BKLogSegmentAllocator(LedgerAllocator allocator) {
-        this.allocator = allocator;
-    }
-
-    @Override
-    public void allocate() throws IOException {
-        allocator.allocate();
-    }
-
-    @Override
-    public Future<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn,
-                                                   final 
Transaction.OpListener<LogSegmentEntryWriter> listener) {
-        return allocator.tryObtain(txn, new 
Transaction.OpListener<LedgerHandle>() {
-            @Override
-            public void onCommit(LedgerHandle lh) {
-                listener.onCommit(new BKLogSegmentEntryWriter(lh));
-            }
-
-            @Override
-            public void onAbort(Throwable t) {
-                listener.onAbort(t);
-            }
-        }).map(NewLogSegmentEntryWriterFn.INSTANCE);
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        return allocator.asyncClose();
-    }
-
-    @Override
-    public Future<Void> delete() {
-        return allocator.delete();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
deleted file mode 100644
index f85760d..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ /dev/null
@@ -1,837 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.logsegment;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.EndOfLogSegmentException;
-import com.twitter.distributedlog.exceptions.ReadCancelledException;
-import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryReader;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * BookKeeper ledger based log segment entry reader.
- */
-public class BKLogSegmentEntryReader implements Runnable, 
LogSegmentEntryReader, AsyncCallback.OpenCallback {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(BKLogSegmentEntryReader.class);
-
-    private class CacheEntry implements Runnable, AsyncCallback.ReadCallback,
-            AsyncCallback.ReadLastConfirmedAndEntryCallback {
-
-        protected final long entryId;
-        private boolean done;
-        private LedgerEntry entry;
-        private int rc;
-
-        private CacheEntry(long entryId) {
-            this.entryId = entryId;
-            this.entry = null;
-            this.rc = BKException.Code.UnexpectedConditionException;
-            this.done = false;
-        }
-
-        long getEntryId() {
-            return entryId;
-        }
-
-        synchronized boolean isDone() {
-            return done;
-        }
-
-        void setValue(LedgerEntry entry) {
-            synchronized (this) {
-                if (done) {
-                    return;
-                }
-                this.rc = BKException.Code.OK;
-                this.entry = entry;
-            }
-            setDone(true);
-        }
-
-        void setException(int rc) {
-            synchronized (this) {
-                if (done) {
-                    return;
-                }
-                this.rc = rc;
-            }
-            setDone(false);
-        }
-
-        void setDone(boolean success) {
-            synchronized (this) {
-                this.done = true;
-            }
-            onReadEntryDone(success);
-        }
-
-        synchronized boolean isSuccess() {
-            return BKException.Code.OK == rc;
-        }
-
-        synchronized LedgerEntry getEntry() {
-            return this.entry;
-        }
-
-        synchronized int getRc() {
-            return rc;
-        }
-
-        @Override
-        public void readComplete(int rc,
-                                 LedgerHandle lh,
-                                 Enumeration<LedgerEntry> entries,
-                                 Object ctx) {
-            if (failureInjector.shouldInjectCorruption(entryId, entryId)) {
-                rc = BKException.Code.DigestMatchException;
-            }
-            processReadEntries(rc, lh, entries, ctx);
-        }
-
-        void processReadEntries(int rc,
-                                LedgerHandle lh,
-                                Enumeration<LedgerEntry> entries,
-                                Object ctx) {
-            if (isDone()) {
-                return;
-            }
-            if (!checkReturnCodeAndHandleFailure(rc, false)) {
-                return;
-            }
-            LedgerEntry entry = null;
-            while (entries.hasMoreElements()) {
-                // more entries are returned
-                if (null != entry) {
-                    
setException(BKException.Code.UnexpectedConditionException);
-                    return;
-                }
-                entry = entries.nextElement();
-            }
-            if (null == entry || entry.getEntryId() != entryId) {
-                setException(BKException.Code.UnexpectedConditionException);
-                return;
-            }
-            setValue(entry);
-        }
-
-        @Override
-        public void readLastConfirmedAndEntryComplete(int rc,
-                                                      long entryId,
-                                                      LedgerEntry entry,
-                                                      Object ctx) {
-            if (failureInjector.shouldInjectCorruption(this.entryId, 
this.entryId)) {
-                rc = BKException.Code.DigestMatchException;
-            }
-            processReadEntry(rc, entryId, entry, ctx);
-        }
-
-        void processReadEntry(int rc,
-                              long entryId,
-                              LedgerEntry entry,
-                              Object ctx) {
-            if (isDone()) {
-                return;
-            }
-            if (!checkReturnCodeAndHandleFailure(rc, true)) {
-                return;
-            }
-            if (null != entry && this.entryId == entryId) {
-                setValue(entry);
-                return;
-            }
-            // the long poll is timeout or interrupted; we will retry it again.
-            issueRead(this);
-        }
-
-        /**
-         * Check return code and retry if needed.
-         *
-         * @param rc the return code
-         * @param isLongPoll is it a long poll request
-         * @return is the request successful or not
-         */
-        boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) {
-            if (BKException.Code.OK == rc) {
-                numReadErrors.set(0);
-                return true;
-            }
-            if (BKException.Code.BookieHandleNotAvailableException == rc ||
-                    (isLongPoll && 
BKException.Code.NoSuchLedgerExistsException == rc)) {
-                int numErrors = Math.max(1, numReadErrors.incrementAndGet());
-                int nextReadBackoffTime = Math.min(numErrors * 
readAheadWaitTime, maxReadBackoffTime);
-                scheduler.schedule(
-                        getSegment().getLogSegmentId(),
-                        this,
-                        nextReadBackoffTime,
-                        TimeUnit.MILLISECONDS);
-            } else {
-                setException(rc);
-            }
-            return false;
-        }
-
-        @Override
-        public void run() {
-            issueRead(this);
-        }
-    }
-
-    private class PendingReadRequest {
-        private final int numEntries;
-        private final List<Entry.Reader> entries;
-        private final Promise<List<Entry.Reader>> promise;
-
-        PendingReadRequest(int numEntries) {
-            this.numEntries = numEntries;
-            if (numEntries == 1) {
-                this.entries = new ArrayList<Entry.Reader>(1);
-            } else {
-                this.entries = new ArrayList<Entry.Reader>();
-            }
-            this.promise = new Promise<List<Entry.Reader>>();
-        }
-
-        Promise<List<Entry.Reader>> getPromise() {
-            return promise;
-        }
-
-        void setException(Throwable throwable) {
-            FutureUtils.setException(promise, throwable);
-        }
-
-        void addEntry(Entry.Reader entry) {
-            entries.add(entry);
-        }
-
-        void complete() {
-            FutureUtils.setValue(promise, entries);
-            onEntriesConsumed(entries.size());
-        }
-
-        boolean hasReadEntries() {
-            return entries.size() > 0;
-        }
-
-        boolean hasReadEnoughEntries() {
-            return entries.size() >= numEntries;
-        }
-    }
-
-    private final BookKeeper bk;
-    private final DistributedLogConfiguration conf;
-    private final OrderedScheduler scheduler;
-    private final long lssn;
-    private final long startSequenceId;
-    private final boolean envelopeEntries;
-    private final boolean deserializeRecordSet;
-    private final int numPrefetchEntries;
-    private final int maxPrefetchEntries;
-    // state
-    private Promise<Void> closePromise = null;
-    private LogSegmentMetadata metadata;
-    private LedgerHandle lh;
-    private final List<LedgerHandle> openLedgerHandles;
-    private CacheEntry outstandingLongPoll;
-    private long nextEntryId;
-    private final AtomicReference<Throwable> lastException = new 
AtomicReference<Throwable>(null);
-    private final AtomicLong scheduleCount = new AtomicLong(0);
-    private volatile boolean hasCaughtupOnInprogress = false;
-    private final CopyOnWriteArraySet<StateChangeListener> 
stateChangeListeners =
-            new CopyOnWriteArraySet<StateChangeListener>();
-    // read retries
-    private int readAheadWaitTime;
-    private final int maxReadBackoffTime;
-    private final AtomicInteger numReadErrors = new AtomicInteger(0);
-    private final boolean skipBrokenEntries;
-    // readahead cache
-    int cachedEntries = 0;
-    int numOutstandingEntries = 0;
-    final LinkedBlockingQueue<CacheEntry> readAheadEntries;
-    // request queue
-    final LinkedList<PendingReadRequest> readQueue;
-
-    // failure injector
-    private final AsyncFailureInjector failureInjector;
-    // Stats
-    private final Counter skippedBrokenEntriesCounter;
-
-    BKLogSegmentEntryReader(LogSegmentMetadata metadata,
-                            LedgerHandle lh,
-                            long startEntryId,
-                            BookKeeper bk,
-                            OrderedScheduler scheduler,
-                            DistributedLogConfiguration conf,
-                            StatsLogger statsLogger,
-                            AsyncFailureInjector failureInjector) {
-        this.metadata = metadata;
-        this.lssn = metadata.getLogSegmentSequenceNumber();
-        this.startSequenceId = metadata.getStartSequenceId();
-        this.envelopeEntries = metadata.getEnvelopeEntries();
-        this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads();
-        this.lh = lh;
-        this.nextEntryId = Math.max(startEntryId, 0);
-        this.bk = bk;
-        this.conf = conf;
-        this.numPrefetchEntries = conf.getNumPrefetchEntriesPerLogSegment();
-        this.maxPrefetchEntries = conf.getMaxPrefetchEntriesPerLogSegment();
-        this.scheduler = scheduler;
-        this.openLedgerHandles = Lists.newArrayList();
-        this.openLedgerHandles.add(lh);
-        this.outstandingLongPoll = null;
-        // create the readahead queue
-        this.readAheadEntries = new LinkedBlockingQueue<CacheEntry>();
-        // create the read request queue
-        this.readQueue = new LinkedList<PendingReadRequest>();
-        // read backoff settings
-        this.readAheadWaitTime = conf.getReadAheadWaitTime();
-        this.maxReadBackoffTime = 4 * conf.getReadAheadWaitTime();
-        // other read settings
-        this.skipBrokenEntries = conf.getReadAheadSkipBrokenEntries();
-
-        // Failure Injection
-        this.failureInjector = failureInjector;
-        // Stats
-        this.skippedBrokenEntriesCounter = 
statsLogger.getCounter("skipped_broken_entries");
-    }
-
-    @VisibleForTesting
-    public synchronized CacheEntry getOutstandingLongPoll() {
-        return outstandingLongPoll;
-    }
-
-    @VisibleForTesting
-    LinkedBlockingQueue<CacheEntry> getReadAheadEntries() {
-        return this.readAheadEntries;
-    }
-
-    synchronized LedgerHandle getLh() {
-        return lh;
-    }
-
-    @Override
-    public synchronized LogSegmentMetadata getSegment() {
-        return metadata;
-    }
-
-    @VisibleForTesting
-    synchronized long getNextEntryId() {
-        return nextEntryId;
-    }
-
-    @Override
-    public void start() {
-        prefetchIfNecessary();
-    }
-
-    @Override
-    public boolean hasCaughtUpOnInprogress() {
-        return hasCaughtupOnInprogress;
-    }
-
-    @Override
-    public LogSegmentEntryReader registerListener(StateChangeListener 
listener) {
-        stateChangeListeners.add(listener);
-        return this;
-    }
-
-    @Override
-    public LogSegmentEntryReader unregisterListener(StateChangeListener 
listener) {
-        stateChangeListeners.remove(listener);
-        return this;
-    }
-
-    private void notifyCaughtupOnInprogress() {
-        for (StateChangeListener listener : stateChangeListeners) {
-            listener.onCaughtupOnInprogress();
-        }
-    }
-
-    //
-    // Process on Log Segment Metadata Updates
-    //
-
-    @Override
-    public synchronized void onLogSegmentMetadataUpdated(LogSegmentMetadata 
segment) {
-        if (metadata == segment ||
-                LogSegmentMetadata.COMPARATOR.compare(metadata, segment) == 0 
||
-                !(metadata.isInProgress() && !segment.isInProgress())) {
-            return;
-        }
-        // segment is closed from inprogress, then re-open the log segment
-        bk.asyncOpenLedger(
-                segment.getLogSegmentId(),
-                BookKeeper.DigestType.CRC32,
-                conf.getBKDigestPW().getBytes(UTF_8),
-                this,
-                segment);
-    }
-
-    @Override
-    public void openComplete(int rc, LedgerHandle lh, Object ctx) {
-        LogSegmentMetadata segment = (LogSegmentMetadata) ctx;
-        if (BKException.Code.OK != rc) {
-            // fail current reader or retry opening the reader
-            failOrRetryOpenLedger(rc, segment);
-            return;
-        }
-        // switch to new ledger handle if the log segment is moved to 
completed.
-        CacheEntry longPollRead = null;
-        synchronized (this) {
-            if (isClosed()) {
-                lh.asyncClose(new AsyncCallback.CloseCallback() {
-                    @Override
-                    public void closeComplete(int rc, LedgerHandle lh, Object 
ctx) {
-                        logger.debug("Close the open ledger {} since the log 
segment reader is already closed",
-                                lh.getId());
-                    }
-                }, null);
-                return;
-            }
-            this.metadata = segment;
-            this.lh = lh;
-            this.openLedgerHandles.add(lh);
-            longPollRead = outstandingLongPoll;
-        }
-        if (null != longPollRead) {
-            // reissue the long poll read when the log segment state is changed
-            issueRead(longPollRead);
-        }
-        // notify readers
-        notifyReaders();
-    }
-
-    private void failOrRetryOpenLedger(int rc, final LogSegmentMetadata 
segment) {
-        if (isClosed()) {
-            return;
-        }
-        if (isBeyondLastAddConfirmed()) {
-            // if the reader is already caught up, let's fail the reader 
immediately
-            // as we need to pull the latest metadata of this log segment.
-            setException(new BKTransmitException("Failed to open ledger for 
reading log segment " + getSegment(), rc),
-                    true);
-            return;
-        }
-        // the reader is still catching up, retry opening the log segment later
-        scheduler.schedule(segment.getLogSegmentId(), new Runnable() {
-            @Override
-            public void run() {
-                onLogSegmentMetadataUpdated(segment);
-            }
-        }, conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
-    }
-
-    //
-    // Change the state of this reader
-    //
-
-    private boolean checkClosedOrInError() {
-        if (null != lastException.get()) {
-            cancelAllPendingReads(lastException.get());
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Set the reader into error state with return code <i>rc</i>.
-     *
-     * @param throwable exception indicating the error
-     * @param isBackground is the reader set exception by background reads or 
foreground reads
-     */
-    private void setException(Throwable throwable, boolean isBackground) {
-        lastException.compareAndSet(null, throwable);
-        if (isBackground) {
-            notifyReaders();
-        }
-    }
-
-    /**
-     * Notify the readers with the state change.
-     */
-    private void notifyReaders() {
-        processReadRequests();
-    }
-
-    private void cancelAllPendingReads(Throwable throwExc) {
-        List<PendingReadRequest> requestsToCancel;
-        synchronized (readQueue) {
-            requestsToCancel = 
Lists.newArrayListWithExpectedSize(readQueue.size());
-            requestsToCancel.addAll(readQueue);
-            readQueue.clear();
-        }
-        for (PendingReadRequest request : requestsToCancel) {
-            request.setException(throwExc);
-        }
-    }
-
-    //
-    // Background Read Operations
-    //
-
-    private void onReadEntryDone(boolean success) {
-        // we successfully read an entry
-        synchronized (this) {
-            --numOutstandingEntries;
-        }
-        // notify reader that there is entry ready
-        notifyReaders();
-        // stop prefetch if we already encountered exceptions
-        if (success) {
-            prefetchIfNecessary();
-        }
-    }
-
-    private void onEntriesConsumed(int numEntries) {
-        synchronized (this) {
-            cachedEntries -= numEntries;
-        }
-        prefetchIfNecessary();
-    }
-
-    private void prefetchIfNecessary() {
-        List<CacheEntry> entriesToFetch;
-        synchronized (this) {
-            if (cachedEntries >= maxPrefetchEntries) {
-                return;
-            }
-            // we don't have enough entries, do prefetch
-            int numEntriesToFetch = numPrefetchEntries - numOutstandingEntries;
-            if (numEntriesToFetch <= 0) {
-                return;
-            }
-            entriesToFetch = new ArrayList<CacheEntry>(numEntriesToFetch);
-            for (int i = 0; i < numEntriesToFetch; i++) {
-                if (cachedEntries >= maxPrefetchEntries) {
-                    break;
-                }
-                if ((isLedgerClosed() && nextEntryId > getLastAddConfirmed()) 
||
-                        (!isLedgerClosed() && nextEntryId > 
getLastAddConfirmed() + 1)) {
-                    break;
-                }
-                CacheEntry entry = new CacheEntry(nextEntryId);
-                entriesToFetch.add(entry);
-                readAheadEntries.add(entry);
-                ++numOutstandingEntries;
-                ++cachedEntries;
-                ++nextEntryId;
-            }
-        }
-        for (CacheEntry entry : entriesToFetch) {
-            issueRead(entry);
-        }
-    }
-
-
-    private void issueRead(CacheEntry cacheEntry) {
-        if (isClosed()) {
-            return;
-        }
-        if (isLedgerClosed()) {
-            if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) {
-                issueSimpleRead(cacheEntry);
-                return;
-            } else {
-                // Reach the end of stream
-                notifyReaders();
-            }
-        } else { // the ledger is still in progress
-            if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) {
-                issueSimpleRead(cacheEntry);
-            } else {
-                issueLongPollRead(cacheEntry);
-            }
-        }
-    }
-
-    private void issueSimpleRead(CacheEntry cacheEntry) {
-        getLh().asyncReadEntries(cacheEntry.entryId, cacheEntry.entryId, 
cacheEntry, null);
-    }
-
-    private void issueLongPollRead(CacheEntry cacheEntry) {
-        // register the read as outstanding reads
-        synchronized (this) {
-            this.outstandingLongPoll = cacheEntry;
-        }
-
-        if (!hasCaughtupOnInprogress) {
-            hasCaughtupOnInprogress = true;
-            notifyCaughtupOnInprogress();
-        }
-        getLh().asyncReadLastConfirmedAndEntry(
-                cacheEntry.entryId,
-                conf.getReadLACLongPollTimeout(),
-                false,
-                cacheEntry,
-                null);
-    }
-
-    //
-    // Foreground Read Operations
-    //
-
-    Entry.Reader processReadEntry(LedgerEntry entry) throws IOException {
-        return Entry.newBuilder()
-                .setLogSegmentInfo(lssn, startSequenceId)
-                .setEntryId(entry.getEntryId())
-                .setEnvelopeEntry(envelopeEntries)
-                .deserializeRecordSet(deserializeRecordSet)
-                .setInputStream(entry.getEntryInputStream())
-                .buildReader();
-    }
-
-    @Override
-    public Future<List<Entry.Reader>> readNext(int numEntries) {
-        final PendingReadRequest readRequest = new 
PendingReadRequest(numEntries);
-
-        if (checkClosedOrInError()) {
-            readRequest.setException(lastException.get());
-        } else {
-            boolean wasQueueEmpty;
-            synchronized (readQueue) {
-                wasQueueEmpty = readQueue.isEmpty();
-                readQueue.add(readRequest);
-            }
-            if (wasQueueEmpty) {
-                processReadRequests();
-            }
-        }
-        return readRequest.getPromise();
-    }
-
-    private void processReadRequests() {
-        if (isClosed()) {
-            // the reader is already closed.
-            return;
-        }
-
-        long prevCount = scheduleCount.getAndIncrement();
-        if (0 == prevCount) {
-            scheduler.submit(getSegment().getLogSegmentId(), this);
-        }
-    }
-
-    /**
-     * The core function to propagate fetched entries to read requests
-     */
-    @Override
-    public void run() {
-        long scheduleCountLocal = scheduleCount.get();
-        while (true) {
-            PendingReadRequest nextRequest = null;
-            synchronized (readQueue) {
-                nextRequest = readQueue.peek();
-            }
-
-            // if read queue is empty, nothing to read, return
-            if (null == nextRequest) {
-                scheduleCount.set(0L);
-                return;
-            }
-
-            // if the oldest pending promise is interrupted then we must
-            // mark the reader in error and abort all pending reads since
-            // we don't know the last consumed read
-            if (null == lastException.get()) {
-                if (nextRequest.getPromise().isInterrupted().isDefined()) {
-                    setException(new DLInterruptedException("Interrupted on 
reading log segment "
-                            + getSegment() + " : " + 
nextRequest.getPromise().isInterrupted().get()), false);
-                }
-            }
-
-            // if the reader is in error state, stop read
-            if (checkClosedOrInError()) {
-                return;
-            }
-
-            // read entries from readahead cache to satisfy next read request
-            readEntriesFromReadAheadCache(nextRequest);
-
-            // check if we can satisfy the read request
-            if (nextRequest.hasReadEntries()) {
-                PendingReadRequest request;
-                synchronized (readQueue) {
-                    request = readQueue.poll();
-                }
-                if (null != request && nextRequest == request) {
-                    request.complete();
-                } else {
-                    DLIllegalStateException ise = new 
DLIllegalStateException("Unexpected condition at reading from "
-                            + getSegment());
-                    nextRequest.setException(ise);
-                    if (null != request) {
-                        request.setException(ise);
-                    }
-                    setException(ise, false);
-                }
-            } else {
-                if (0 == scheduleCountLocal) {
-                    return;
-                }
-                scheduleCountLocal = scheduleCount.decrementAndGet();
-            }
-        }
-    }
-
-    private void readEntriesFromReadAheadCache(PendingReadRequest nextRequest) 
{
-        while (!nextRequest.hasReadEnoughEntries()) {
-            CacheEntry entry;
-            boolean hitEndOfLogSegment;
-            synchronized (this) {
-                entry = readAheadEntries.peek();
-                hitEndOfLogSegment = (null == entry) && isEndOfLogSegment();
-            }
-            // reach end of log segment
-            if (hitEndOfLogSegment) {
-                setException(new 
EndOfLogSegmentException(getSegment().getZNodeName()), false);
-                return;
-            }
-            if (null == entry) {
-                return;
-            }
-            // entry is not complete yet.
-            if (!entry.isDone()) {
-                // we already reached end of the log segment
-                if (isEndOfLogSegment(entry.getEntryId())) {
-                    setException(new 
EndOfLogSegmentException(getSegment().getZNodeName()), false);
-                }
-                return;
-            }
-            if (entry.isSuccess()) {
-                CacheEntry removedEntry = readAheadEntries.poll();
-                if (entry != removedEntry) {
-                    DLIllegalStateException ise = new 
DLIllegalStateException("Unexpected condition at reading from "
-                            + getSegment());
-                    setException(ise, false);
-                    return;
-                }
-                try {
-                    nextRequest.addEntry(processReadEntry(entry.getEntry()));
-                } catch (IOException e) {
-                    setException(e, false);
-                    return;
-                }
-            } else if (skipBrokenEntries && 
BKException.Code.DigestMatchException == entry.getRc()) {
-                // skip this entry and move forward
-                skippedBrokenEntriesCounter.inc();
-                readAheadEntries.poll();
-                continue;
-            } else {
-                setException(new BKTransmitException("Encountered issue on 
reading entry " + entry.getEntryId()
-                        + " @ log segment " + getSegment(), entry.getRc()), 
false);
-                return;
-            }
-        }
-    }
-
-    //
-    // State Management
-    //
-
-    private synchronized boolean isEndOfLogSegment() {
-        return isEndOfLogSegment(nextEntryId);
-    }
-
-    private boolean isEndOfLogSegment(long entryId) {
-        return isLedgerClosed() && entryId > getLastAddConfirmed();
-    }
-
-    @Override
-    public synchronized boolean isBeyondLastAddConfirmed() {
-        return isBeyondLastAddConfirmed(nextEntryId);
-    }
-
-    private boolean isBeyondLastAddConfirmed(long entryId) {
-        return entryId > getLastAddConfirmed();
-    }
-
-    private boolean isNotBeyondLastAddConfirmed(long entryId) {
-        return entryId <= getLastAddConfirmed();
-    }
-
-    private boolean isLedgerClosed() {
-        return getLh().isClosed();
-    }
-
-    @Override
-    public long getLastAddConfirmed() {
-        return getLh().getLastAddConfirmed();
-    }
-
-    synchronized boolean isClosed() {
-        return null != closePromise;
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        final Promise<Void> closeFuture;
-        ReadCancelledException exception;
-        LedgerHandle[] lhsToClose;
-        synchronized (this) {
-            if (null != closePromise) {
-                return closePromise;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-            lhsToClose = openLedgerHandles.toArray(new 
LedgerHandle[openLedgerHandles.size()]);
-            // set the exception to cancel pending and subsequent reads
-            exception = new 
ReadCancelledException(getSegment().getZNodeName(), "Reader was closed");
-            setException(exception, false);
-        }
-
-        // cancel all pending reads
-        cancelAllPendingReads(exception);
-
-        // close all the open ledger
-        BKUtils.closeLedgers(lhsToClose).proxyTo(closeFuture);
-        return closeFuture;
-    }
-}

Reply via email to