http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
deleted file mode 100644
index a081606..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.auditor;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.BookKeeperClientBuilder;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.util.DLUtils;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeperAccessor;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * DL Auditor will audit DL namespace, e.g. find leaked ledger, report disk 
usage by streams.
- */
-public class DLAuditor {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(DLAuditor.class);
-
-    private final DistributedLogConfiguration conf;
-
-    public DLAuditor(DistributedLogConfiguration conf) {
-        this.conf = conf;
-    }
-
-    private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace 
namespace) {
-        NamespaceDriver driver = namespace.getNamespaceDriver();
-        assert(driver instanceof BKNamespaceDriver);
-        return ((BKNamespaceDriver) driver).getWriterZKC();
-    }
-
-    private BookKeeperClient getBookKeeperClient(DistributedLogNamespace 
namespace) {
-        NamespaceDriver driver = namespace.getNamespaceDriver();
-        assert(driver instanceof BKNamespaceDriver);
-        return ((BKNamespaceDriver) driver).getReaderBKC();
-    }
-
-    private String validateAndGetZKServers(List<URI> uris) {
-        URI firstURI = uris.get(0);
-        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI);
-        for (URI uri : uris) {
-            if 
(!zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) {
-                throw new IllegalArgumentException("Uris don't belong to same 
zookeeper cluster");
-            }
-        }
-        return zkServers;
-    }
-
-    private BKDLConfig resolveBKDLConfig(ZooKeeperClient zkc, List<URI> uris) 
throws IOException {
-        URI firstURI = uris.get(0);
-        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, firstURI);
-        for (URI uri : uris) {
-            BKDLConfig anotherConfig = BKDLConfig.resolveDLConfig(zkc, uri);
-            if (!(Objects.equal(bkdlConfig.getBkLedgersPath(), 
anotherConfig.getBkLedgersPath())
-                    && Objects.equal(bkdlConfig.getBkZkServersForWriter(), 
anotherConfig.getBkZkServersForWriter()))) {
-                throw new IllegalArgumentException("Uris don't use same 
bookkeeper cluster");
-            }
-        }
-        return bkdlConfig;
-    }
-
-    public Pair<Set<Long>, Set<Long>> collectLedgers(List<URI> uris, 
List<List<String>> allocationPaths)
-            throws IOException {
-        Preconditions.checkArgument(uris.size() > 0, "No uri provided to 
audit");
-
-        String zkServers = validateAndGetZKServers(uris);
-        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                conf.getZKRetryBackoffStartMillis(),
-                conf.getZKRetryBackoffMaxMillis(),
-                Integer.MAX_VALUE);
-        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
-                .name("DLAuditor-ZK")
-                .zkServers(zkServers)
-                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                .retryPolicy(retryPolicy)
-                .zkAclId(conf.getZkAclId())
-                .build();
-        ExecutorService executorService = Executors.newCachedThreadPool();
-        try {
-            BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris);
-            logger.info("Resolved bookkeeper config : {}", bkdlConfig);
-
-            BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder()
-                    .name("DLAuditor-BK")
-                    .dlConfig(conf)
-                    .zkServers(bkdlConfig.getBkZkServersForWriter())
-                    .ledgersPath(bkdlConfig.getBkLedgersPath())
-                    .build();
-            try {
-                Set<Long> bkLedgers = collectLedgersFromBK(bkc, 
executorService);
-                Set<Long> dlLedgers = collectLedgersFromDL(uris, 
allocationPaths);
-                return Pair.of(bkLedgers, dlLedgers);
-            } finally {
-                bkc.close();
-            }
-        } finally {
-            zkc.close();
-            executorService.shutdown();
-        }
-    }
-
-    /**
-     * Find leak ledgers phase 1: collect ledgers set.
-     */
-    private Set<Long> collectLedgersFromBK(BookKeeperClient bkc,
-                                           final ExecutorService 
executorService)
-            throws IOException {
-        LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
-
-        final Set<Long> ledgers = new HashSet<Long>();
-        final SettableFuture<Void> doneFuture = SettableFuture.create();
-
-        BookkeeperInternalCallbacks.Processor<Long> collector =
-                new BookkeeperInternalCallbacks.Processor<Long>() {
-            @Override
-            public void process(Long lid,
-                                final AsyncCallback.VoidCallback cb) {
-                synchronized (ledgers) {
-                    ledgers.add(lid);
-                    if (0 == ledgers.size() % 1000) {
-                        logger.info("Collected {} ledgers", ledgers.size());
-                    }
-                }
-                executorService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        cb.processResult(BKException.Code.OK, null, null);
-                    }
-                });
-
-            }
-        };
-        AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx) {
-                if (BKException.Code.OK == rc) {
-                    doneFuture.set(null);
-                } else {
-                    doneFuture.setException(BKException.create(rc));
-                }
-            }
-        };
-        lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK,
-                BKException.Code.ZKException);
-        try {
-            doneFuture.get();
-            logger.info("Collected total {} ledgers", ledgers.size());
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new DLInterruptedException("Interrupted on collecting 
ledgers : ", e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof IOException) {
-                throw (IOException)(e.getCause());
-            } else {
-                throw new IOException("Failed to collect ledgers : ", 
e.getCause());
-            }
-        }
-        return ledgers;
-    }
-
-    /**
-     * Find leak ledgers phase 2: collect ledgers from uris.
-     */
-    private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> 
allocationPaths)
-            throws IOException {
-        final Set<Long> ledgers = new TreeSet<Long>();
-        List<DistributedLogNamespace> namespaces =
-                new ArrayList<DistributedLogNamespace>(uris.size());
-        try {
-            for (URI uri : uris) {
-                namespaces.add(
-                        DistributedLogNamespaceBuilder.newBuilder()
-                                .conf(conf)
-                                .uri(uri)
-                                .build());
-            }
-            final CountDownLatch doneLatch = new CountDownLatch(uris.size());
-            final AtomicInteger numFailures = new AtomicInteger(0);
-            ExecutorService executor = 
Executors.newFixedThreadPool(uris.size());
-            try {
-                int i = 0;
-                for (final DistributedLogNamespace namespace : namespaces) {
-                    final DistributedLogNamespace dlNamespace = namespace;
-                    final URI uri = uris.get(i);
-                    final List<String> aps = allocationPaths.get(i);
-                    i++;
-                    executor.submit(new Runnable() {
-                        @Override
-                        public void run() {
-                            try {
-                                logger.info("Collecting ledgers from {} : {}", 
uri, aps);
-                                collectLedgersFromAllocator(uri, namespace, 
aps, ledgers);
-                                synchronized (ledgers) {
-                                    logger.info("Collected {} ledgers from 
allocators for {} : {} ",
-                                            new Object[]{ledgers.size(), uri, 
ledgers});
-                                }
-                                collectLedgersFromDL(uri, namespace, ledgers);
-                            } catch (IOException e) {
-                                numFailures.incrementAndGet();
-                                logger.info("Error to collect ledgers from DL 
: ", e);
-                            }
-                            doneLatch.countDown();
-                        }
-                    });
-                }
-                try {
-                    doneLatch.await();
-                    if (numFailures.get() > 0) {
-                        throw new IOException(numFailures.get() + " errors to 
collect ledgers from DL");
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    logger.warn("Interrupted on collecting ledgers from DL : 
", e);
-                    throw new DLInterruptedException("Interrupted on 
collecting ledgers from DL : ", e);
-                }
-            } finally {
-                executor.shutdown();
-            }
-        } finally {
-            for (DistributedLogNamespace namespace : namespaces) {
-                namespace.close();
-            }
-        }
-        return ledgers;
-    }
-
-    private void collectLedgersFromAllocator(final URI uri,
-                                             final DistributedLogNamespace 
namespace,
-                                             final List<String> 
allocationPaths,
-                                             final Set<Long> ledgers) throws 
IOException {
-        final LinkedBlockingQueue<String> poolQueue =
-                new LinkedBlockingQueue<String>();
-        for (String allocationPath : allocationPaths) {
-            String rootPath = uri.getPath() + "/" + allocationPath;
-            try {
-                List<String> pools = 
getZooKeeperClient(namespace).get().getChildren(rootPath, false);
-                for (String pool : pools) {
-                    poolQueue.add(rootPath + "/" + pool);
-                }
-            } catch (KeeperException e) {
-                throw new ZKException("Failed to get list of pools from " + 
rootPath, e);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new DLInterruptedException("Interrupted on getting list 
of pools from " + rootPath, e);
-            }
-        }
-
-
-        logger.info("Collecting ledgers from allocators for {} : {}", uri, 
poolQueue);
-
-        executeAction(poolQueue, 10, new Action<String>() {
-            @Override
-            public void execute(String poolPath) throws IOException {
-                try {
-                    collectLedgersFromPool(poolPath);
-                } catch (InterruptedException e) {
-                    throw new DLInterruptedException("Interrupted on 
collecting ledgers from allocation pool " + poolPath, e);
-                } catch (KeeperException e) {
-                    throw new ZKException("Failed to collect ledgers from 
allocation pool " + poolPath, e.code());
-                }
-            }
-
-            private void collectLedgersFromPool(String poolPath)
-                    throws InterruptedException, 
ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
-                List<String> allocators = getZooKeeperClient(namespace).get()
-                                        .getChildren(poolPath, false);
-                for (String allocator : allocators) {
-                    String allocatorPath = poolPath + "/" + allocator;
-                    byte[] data = 
getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat());
-                    if (null != data && data.length > 0) {
-                        try {
-                            long ledgerId = DLUtils.bytes2LogSegmentId(data);
-                            synchronized (ledgers) {
-                                ledgers.add(ledgerId);
-                            }
-                        } catch (NumberFormatException nfe) {
-                            logger.warn("Invalid ledger found in allocator 
path {} : ", allocatorPath, nfe);
-                        }
-                    }
-                }
-            }
-        });
-
-        logger.info("Collected ledgers from allocators for {}.", uri);
-    }
-
-    private void collectLedgersFromDL(final URI uri,
-                                      final DistributedLogNamespace namespace,
-                                      final Set<Long> ledgers) throws 
IOException {
-        logger.info("Enumerating {} to collect streams.", uri);
-        Iterator<String> streams = namespace.getLogs();
-        final LinkedBlockingQueue<String> streamQueue = new 
LinkedBlockingQueue<String>();
-        while (streams.hasNext()) {
-            streamQueue.add(streams.next());
-        }
-
-        logger.info("Collected {} streams from uri {} : {}",
-                    new Object[] { streamQueue.size(), uri, streams });
-
-        executeAction(streamQueue, 10, new Action<String>() {
-            @Override
-            public void execute(String stream) throws IOException {
-                collectLedgersFromStream(namespace, stream, ledgers);
-            }
-        });
-    }
-
-    private List<Long> collectLedgersFromStream(DistributedLogNamespace 
namespace,
-                                                String stream,
-                                                Set<Long> ledgers)
-            throws IOException {
-        DistributedLogManager dlm = namespace.openLog(stream);
-        try {
-            List<LogSegmentMetadata> segments = dlm.getLogSegments();
-            List<Long> sLedgers = new ArrayList<Long>();
-            for (LogSegmentMetadata segment : segments) {
-                synchronized (ledgers) {
-                    ledgers.add(segment.getLogSegmentId());
-                }
-                sLedgers.add(segment.getLogSegmentId());
-            }
-            return sLedgers;
-        } finally {
-            dlm.close();
-        }
-    }
-
-    /**
-     * Calculating stream space usage from given <i>uri</i>.
-     *
-     * @param uri dl uri
-     * @throws IOException
-     */
-    public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws 
IOException {
-        logger.info("Collecting stream space usage for {}.", uri);
-        DistributedLogNamespace namespace = 
DistributedLogNamespaceBuilder.newBuilder()
-                .conf(conf)
-                .uri(uri)
-                .build();
-        try {
-            return calculateStreamSpaceUsage(uri, namespace);
-        } finally {
-            namespace.close();
-        }
-    }
-
-    private Map<String, Long> calculateStreamSpaceUsage(
-            final URI uri, final DistributedLogNamespace namespace)
-        throws IOException {
-        Iterator<String> streams = namespace.getLogs();
-        final LinkedBlockingQueue<String> streamQueue = new 
LinkedBlockingQueue<String>();
-        while (streams.hasNext()) {
-            streamQueue.add(streams.next());
-        }
-
-        final Map<String, Long> streamSpaceUsageMap =
-                new ConcurrentSkipListMap<String, Long>();
-        final AtomicInteger numStreamsCollected = new AtomicInteger(0);
-
-        executeAction(streamQueue, 10, new Action<String>() {
-            @Override
-            public void execute(String stream) throws IOException {
-                streamSpaceUsageMap.put(stream,
-                        calculateStreamSpaceUsage(namespace, stream));
-                if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
-                    logger.info("Calculated {} streams from uri {}.", 
numStreamsCollected.get(), uri);
-                }
-            }
-        });
-
-        return streamSpaceUsageMap;
-    }
-
-    private long calculateStreamSpaceUsage(final DistributedLogNamespace 
namespace,
-                                           final String stream) throws 
IOException {
-        DistributedLogManager dlm = namespace.openLog(stream);
-        long totalBytes = 0;
-        try {
-            List<LogSegmentMetadata> segments = dlm.getLogSegments();
-            for (LogSegmentMetadata segment : segments) {
-                try {
-                    LedgerHandle lh = 
getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(),
-                            BookKeeper.DigestType.CRC32, 
conf.getBKDigestPW().getBytes(UTF_8));
-                    totalBytes += lh.getLength();
-                    lh.close();
-                } catch (BKException e) {
-                    logger.error("Failed to open ledger {} : ", 
segment.getLogSegmentId(), e);
-                    throw new IOException("Failed to open ledger " + 
segment.getLogSegmentId(), e);
-                } catch (InterruptedException e) {
-                    logger.warn("Interrupted on opening ledger {} : ", 
segment.getLogSegmentId(), e);
-                    Thread.currentThread().interrupt();
-                    throw new DLInterruptedException("Interrupted on opening 
ledger " + segment.getLogSegmentId(), e);
-                }
-            }
-        } finally {
-            dlm.close();
-        }
-        return totalBytes;
-    }
-
-    public long calculateLedgerSpaceUsage(URI uri) throws IOException {
-        List<URI> uris = Lists.newArrayList(uri);
-        String zkServers = validateAndGetZKServers(uris);
-        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
-                conf.getZKRetryBackoffStartMillis(),
-                conf.getZKRetryBackoffMaxMillis(),
-                Integer.MAX_VALUE);
-        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
-                .name("DLAuditor-ZK")
-                .zkServers(zkServers)
-                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                .retryPolicy(retryPolicy)
-                .zkAclId(conf.getZkAclId())
-                .build();
-        ExecutorService executorService = Executors.newCachedThreadPool();
-        try {
-            BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris);
-            logger.info("Resolved bookkeeper config : {}", bkdlConfig);
-
-            BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder()
-                    .name("DLAuditor-BK")
-                    .dlConfig(conf)
-                    .zkServers(bkdlConfig.getBkZkServersForWriter())
-                    .ledgersPath(bkdlConfig.getBkLedgersPath())
-                    .build();
-            try {
-                return calculateLedgerSpaceUsage(bkc, executorService);
-            } finally {
-                bkc.close();
-            }
-        } finally {
-            zkc.close();
-            executorService.shutdown();
-        }
-    }
-
-    private long calculateLedgerSpaceUsage(BookKeeperClient bkc,
-                                           final ExecutorService 
executorService)
-        throws IOException {
-        final AtomicLong totalBytes = new AtomicLong(0);
-        final AtomicLong totalEntries = new AtomicLong(0);
-        final AtomicLong numLedgers = new AtomicLong(0);
-
-        LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
-
-        final SettableFuture<Void> doneFuture = SettableFuture.create();
-        final BookKeeper bk = bkc.get();
-
-        BookkeeperInternalCallbacks.Processor<Long> collector =
-                new BookkeeperInternalCallbacks.Processor<Long>() {
-            @Override
-            public void process(final Long lid,
-                                final AsyncCallback.VoidCallback cb) {
-                numLedgers.incrementAndGet();
-                executorService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        bk.asyncOpenLedgerNoRecovery(lid, 
BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8),
-                                new 
org.apache.bookkeeper.client.AsyncCallback.OpenCallback() {
-                            @Override
-                            public void openComplete(int rc, LedgerHandle lh, 
Object ctx) {
-                                final int cbRc;
-                                if (BKException.Code.OK == rc) {
-                                    totalBytes.addAndGet(lh.getLength());
-                                    
totalEntries.addAndGet(lh.getLastAddConfirmed() + 1);
-                                    cbRc = rc;
-                                } else {
-                                    cbRc = BKException.Code.ZKException;
-                                }
-                                executorService.submit(new Runnable() {
-                                    @Override
-                                    public void run() {
-                                        cb.processResult(cbRc, null, null);
-                                    }
-                                });
-                            }
-                        }, null);
-                    }
-                });
-            }
-        };
-        AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx) {
-                if (BKException.Code.OK == rc) {
-                    doneFuture.set(null);
-                } else {
-                    doneFuture.setException(BKException.create(rc));
-                }
-            }
-        };
-        lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK, 
BKException.Code.ZKException);
-        try {
-            doneFuture.get();
-            logger.info("calculated {} ledgers\n\ttotal bytes = {}\n\ttotal 
entries = {}",
-                    new Object[] { numLedgers.get(), totalBytes.get(), 
totalEntries.get() });
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new DLInterruptedException("Interrupted on calculating 
ledger space : ", e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof IOException) {
-                throw (IOException)(e.getCause());
-            } else {
-                throw new IOException("Failed to calculate ledger space : ", 
e.getCause());
-            }
-        }
-        return totalBytes.get();
-    }
-
-    public void close() {
-        // no-op
-    }
-
-    static interface Action<T> {
-        void execute(T item) throws IOException ;
-    }
-
-    static <T> void executeAction(final LinkedBlockingQueue<T> queue,
-                                  final int numThreads,
-                                  final Action<T> action) throws IOException {
-        final CountDownLatch failureLatch = new CountDownLatch(1);
-        final CountDownLatch doneLatch = new CountDownLatch(queue.size());
-        final AtomicInteger numFailures = new AtomicInteger(0);
-        final AtomicInteger completedThreads = new AtomicInteger(0);
-
-        ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
-        try {
-            for (int i = 0 ; i < numThreads; i++) {
-                executorService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        while (true) {
-                            T item = queue.poll();
-                            if (null == item) {
-                                break;
-                            }
-                            try {
-                                action.execute(item);
-                            } catch (IOException ioe) {
-                                logger.error("Failed to execute action on item 
'{}'", item, ioe);
-                                numFailures.incrementAndGet();
-                                failureLatch.countDown();
-                                break;
-                            }
-                            doneLatch.countDown();
-                        }
-                        if (numFailures.get() == 0 && 
completedThreads.incrementAndGet() == numThreads) {
-                            failureLatch.countDown();
-                        }
-                    }
-                });
-            }
-            try {
-                failureLatch.await();
-                if (numFailures.get() > 0) {
-                    throw new IOException("Encountered " + numFailures.get() + 
" failures on executing action.");
-                }
-                doneLatch.await();
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                logger.warn("Interrupted on executing action", ie);
-                throw new DLInterruptedException("Interrupted on executing 
action", ie);
-            }
-        } finally {
-            executorService.shutdown();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java
deleted file mode 100644
index 871997f..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-
-/**
- * Provider returns quorum configs based on dynamic configuration.
- */
-public class DynamicQuorumConfigProvider implements QuorumConfigProvider {
-
-    private final DynamicDistributedLogConfiguration conf;
-
-    public DynamicQuorumConfigProvider(DynamicDistributedLogConfiguration 
conf) {
-        this.conf = conf;
-    }
-
-    @Override
-    public QuorumConfig getQuorumConfig() {
-        return conf.getQuorumConfig();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java
deleted file mode 100644
index 6c3f06e..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-/**
- * Provider that returns an immutable quorum config.
- */
-public class ImmutableQuorumConfigProvider implements QuorumConfigProvider {
-
-    private final QuorumConfig quorumConfig;
-
-    public ImmutableQuorumConfigProvider(QuorumConfig quorumConfig) {
-        this.quorumConfig = quorumConfig;
-    }
-
-    @Override
-    public QuorumConfig getQuorumConfig() {
-        return quorumConfig;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java
deleted file mode 100644
index c14f374..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-import com.twitter.distributedlog.util.Allocator;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-import java.io.IOException;
-
-public interface LedgerAllocator extends Allocator<LedgerHandle, Object> {
-
-    /**
-     * Start the ledger allocator. The implementaion should not be blocking 
call.
-     */
-    void start() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java
deleted file mode 100644
index b76d03a..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.client.LedgerHandle;
-
-import java.io.IOException;
-
-/**
- * Delegator of the underlying allocator. If it owns the allocator, it takes
- * the responsibility of start the allocator and close the allocator.
- */
-public class LedgerAllocatorDelegator implements LedgerAllocator {
-
-    private final LedgerAllocator allocator;
-    private final boolean ownAllocator;
-
-    /**
-     * Create an allocator's delegator.
-     *
-     * @param allocator
-     *          the underlying allocator
-     * @param ownAllocator
-     *          whether to own the allocator
-     */
-    public LedgerAllocatorDelegator(LedgerAllocator allocator,
-                                    boolean ownAllocator)
-            throws IOException {
-        this.allocator = allocator;
-        this.ownAllocator = ownAllocator;
-        if (this.ownAllocator) {
-            this.allocator.start();
-        }
-    }
-
-    @Override
-    public void start() throws IOException {
-        // no-op
-    }
-
-    @Override
-    public Future<Void> delete() {
-        return Future.exception(new UnsupportedOperationException("Can't 
delete an allocator by delegator"));
-    }
-
-    @Override
-    public void allocate() throws IOException {
-        this.allocator.allocate();
-    }
-
-    @Override
-    public Future<LedgerHandle> tryObtain(Transaction<Object> txn,
-                                          OpListener<LedgerHandle> listener) {
-        return this.allocator.tryObtain(txn, listener);
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        if (ownAllocator) {
-            return this.allocator.asyncClose();
-        } else {
-            return Future.value(null);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java
deleted file mode 100644
index dd0894e..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class LedgerAllocatorPool implements LedgerAllocator {
-
-    static final Logger logger = 
LoggerFactory.getLogger(LedgerAllocatorPool.class);
-
-    private final DistributedLogConfiguration conf;
-    private final QuorumConfigProvider quorumConfigProvider;
-    private final BookKeeperClient bkc;
-    private final ZooKeeperClient zkc;
-    private final ScheduledExecutorService scheduledExecutorService;
-    private final String poolPath;
-    private final int corePoolSize;
-
-    private final LinkedList<SimpleLedgerAllocator> pendingList =
-            new LinkedList<SimpleLedgerAllocator>();
-    private final LinkedList<SimpleLedgerAllocator> allocatingList =
-            new LinkedList<SimpleLedgerAllocator>();
-    private final Map<String, SimpleLedgerAllocator> rescueMap =
-            new HashMap<String, SimpleLedgerAllocator>();
-    private final Map<LedgerHandle, SimpleLedgerAllocator> obtainMap =
-            new HashMap<LedgerHandle, SimpleLedgerAllocator>();
-    private final Map<SimpleLedgerAllocator, LedgerHandle> reverseObtainMap =
-            new HashMap<SimpleLedgerAllocator, LedgerHandle>();
-
-    public LedgerAllocatorPool(String poolPath, int corePoolSize,
-                               DistributedLogConfiguration conf,
-                               ZooKeeperClient zkc,
-                               BookKeeperClient bkc,
-                               ScheduledExecutorService 
scheduledExecutorService) throws IOException {
-        this.poolPath = poolPath;
-        this.corePoolSize = corePoolSize;
-        this.conf = conf;
-        this.quorumConfigProvider =
-                new ImmutableQuorumConfigProvider(conf.getQuorumConfig());
-        this.zkc = zkc;
-        this.bkc = bkc;
-        this.scheduledExecutorService = scheduledExecutorService;
-        initializePool();
-    }
-
-    @Override
-    public void start() throws IOException {
-        for (LedgerAllocator allocator : pendingList) {
-            // issue allocating requests during initialize
-            allocator.allocate();
-        }
-    }
-
-    @VisibleForTesting
-    synchronized int pendingListSize() {
-        return pendingList.size();
-    }
-
-    @VisibleForTesting
-    synchronized int allocatingListSize() {
-        return allocatingList.size();
-    }
-
-    @VisibleForTesting
-    public synchronized int obtainMapSize() {
-        return obtainMap.size();
-    }
-
-    @VisibleForTesting
-    synchronized int rescueSize() {
-        return rescueMap.size();
-    }
-
-    @VisibleForTesting
-    synchronized SimpleLedgerAllocator getLedgerAllocator(LedgerHandle lh) {
-        return obtainMap.get(lh);
-    }
-
-    private void initializePool() throws IOException {
-        try {
-            List<String> allocators;
-            try {
-                allocators = zkc.get().getChildren(poolPath, false);
-            } catch (KeeperException.NoNodeException e) {
-                logger.info("Allocator Pool {} doesn't exist. Creating it.", 
poolPath);
-                ZkUtils.createFullPathOptimistic(zkc.get(), poolPath, new 
byte[0], zkc.getDefaultACL(),
-                        CreateMode.PERSISTENT);
-                allocators = zkc.get().getChildren(poolPath, false);
-            }
-            if (null == allocators) {
-                allocators = new ArrayList<String>();
-            }
-            if (allocators.size() < corePoolSize) {
-                createAllocators(corePoolSize - allocators.size());
-                allocators = zkc.get().getChildren(poolPath, false);
-            }
-            initializeAllocators(allocators);
-        } catch (InterruptedException ie) {
-            throw new DLInterruptedException("Interrupted when ensuring " + 
poolPath + " created : ", ie);
-        } catch (KeeperException ke) {
-            throw new IOException("Encountered zookeeper exception when 
initializing pool " + poolPath + " : ", ke);
-        }
-    }
-
-    private void createAllocators(int numAllocators) throws 
InterruptedException, IOException {
-        final AtomicInteger numPendings = new AtomicInteger(numAllocators);
-        final AtomicInteger numFailures = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(1);
-        AsyncCallback.StringCallback createCallback = new 
AsyncCallback.StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, String 
name) {
-                if (KeeperException.Code.OK.intValue() != rc) {
-                    numFailures.incrementAndGet();
-                    latch.countDown();
-                    return;
-                }
-                if (numPendings.decrementAndGet() == 0 && numFailures.get() == 
0) {
-                    latch.countDown();
-                }
-            }
-        };
-        for (int i = 0; i < numAllocators; i++) {
-            zkc.get().create(poolPath + "/A", new byte[0],
-                             zkc.getDefaultACL(),
-                             CreateMode.PERSISTENT_SEQUENTIAL,
-                             createCallback, null);
-        }
-        latch.await();
-        if (numFailures.get() > 0) {
-            throw new IOException("Failed to create " + numAllocators + " 
allocators.");
-        }
-    }
-
-    /**
-     * Initialize simple allocators with given list of allocator names 
<i>allocators</i>.
-     * It initializes a simple allocator with its simple allocator path.
-     */
-    private void initializeAllocators(List<String> allocators) throws 
IOException, InterruptedException {
-        final AtomicInteger numPendings = new AtomicInteger(allocators.size());
-        final AtomicInteger numFailures = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(numPendings.get() > 0 
? 1 : 0);
-        AsyncCallback.DataCallback dataCallback = new 
AsyncCallback.DataCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, byte[] 
data, Stat stat) {
-                if (KeeperException.Code.OK.intValue() != rc) {
-                    numFailures.incrementAndGet();
-                    latch.countDown();
-                    return;
-                }
-                Versioned<byte[]> allocatorData =
-                        new Versioned<byte[]>(data, new 
ZkVersion(stat.getVersion()));
-                SimpleLedgerAllocator allocator =
-                        new SimpleLedgerAllocator(path, allocatorData, 
quorumConfigProvider, zkc, bkc);
-                allocator.start();
-                pendingList.add(allocator);
-                if (numPendings.decrementAndGet() == 0 && numFailures.get() == 
0) {
-                    latch.countDown();
-                }
-            }
-        };
-        for (String name : allocators) {
-            String path = poolPath + "/" + name;
-            zkc.get().getData(path, false, dataCallback, null);
-        }
-        latch.await();
-        if (numFailures.get() > 0) {
-            throw new IOException("Failed to initialize allocators : " + 
allocators);
-        }
-    }
-
-    private void scheduleAllocatorRescue(final SimpleLedgerAllocator 
ledgerAllocator) {
-        try {
-            scheduledExecutorService.schedule(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        rescueAllocator(ledgerAllocator);
-                    } catch (DLInterruptedException dle) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }, conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException ree) {
-            logger.warn("Failed to schedule rescuing ledger allocator {} : ", 
ledgerAllocator.allocatePath, ree);
-        }
-    }
-
-    /**
-     * Rescue a ledger allocator from an ERROR state
-     * @param ledgerAllocator
-     *          ledger allocator to rescue
-     */
-    private void rescueAllocator(final SimpleLedgerAllocator ledgerAllocator) 
throws DLInterruptedException {
-        SimpleLedgerAllocator oldAllocator;
-        synchronized (this) {
-            oldAllocator = rescueMap.put(ledgerAllocator.allocatePath, 
ledgerAllocator);
-        }
-        if (oldAllocator != null) {
-            logger.info("ledger allocator {} is being rescued.", 
ledgerAllocator.allocatePath);
-            return;
-        }
-        try {
-            zkc.get().getData(ledgerAllocator.allocatePath, false, new 
AsyncCallback.DataCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
-                    boolean retry = false;
-                    SimpleLedgerAllocator newAllocator = null;
-                    if (KeeperException.Code.OK.intValue() == rc) {
-                        Versioned<byte[]> allocatorData =
-                                new Versioned<byte[]>(data, new 
ZkVersion(stat.getVersion()));
-                        logger.info("Rescuing ledger allocator {}.", path);
-                        newAllocator = new SimpleLedgerAllocator(path, 
allocatorData, quorumConfigProvider, zkc, bkc);
-                        newAllocator.start();
-                        logger.info("Rescued ledger allocator {}.", path);
-                    } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                        logger.info("Ledger allocator {} doesn't exist, skip 
rescuing it.", path);
-                    } else {
-                        retry = true;
-                    }
-                    synchronized (LedgerAllocatorPool.this) {
-                        rescueMap.remove(ledgerAllocator.allocatePath);
-                        if (null != newAllocator) {
-                            pendingList.addLast(newAllocator);
-                        }
-                    }
-                    if (retry) {
-                        scheduleAllocatorRescue(ledgerAllocator);
-                    }
-                }
-            }, null);
-        } catch (InterruptedException ie) {
-            logger.warn("Interrupted on rescuing ledger allocator {} : ", 
ledgerAllocator.allocatePath, ie);
-            synchronized (LedgerAllocatorPool.this) {
-                rescueMap.remove(ledgerAllocator.allocatePath);
-            }
-            throw new DLInterruptedException("Interrupted on rescuing ledger 
allocator " + ledgerAllocator.allocatePath, ie);
-        } catch (IOException ioe) {
-            logger.warn("Failed to rescue ledger allocator {}, retry rescuing 
it later : ", ledgerAllocator.allocatePath, ioe);
-            synchronized (LedgerAllocatorPool.this) {
-                rescueMap.remove(ledgerAllocator.allocatePath);
-            }
-            scheduleAllocatorRescue(ledgerAllocator);
-        }
-    }
-
-    @Override
-    public void allocate() throws IOException {
-        SimpleLedgerAllocator allocator;
-        synchronized (this) {
-            if (pendingList.isEmpty()) {
-                // if no ledger allocator available, we should fail it 
immediately, which the request will be redirected to other
-                // proxies
-                throw new IOException("No ledger allocator available under " + 
poolPath + ".");
-            } else {
-                allocator = pendingList.removeFirst();
-            }
-        }
-        boolean success = false;
-        try {
-            allocator.allocate();
-            synchronized (this) {
-                allocatingList.addLast(allocator);
-            }
-            success = true;
-        } finally {
-            if (!success) {
-                rescueAllocator(allocator);
-            }
-        }
-    }
-
-    @Override
-    public Future<LedgerHandle> tryObtain(final Transaction<Object> txn,
-                                          final 
Transaction.OpListener<LedgerHandle> listener) {
-        final SimpleLedgerAllocator allocator;
-        synchronized (this) {
-            if (allocatingList.isEmpty()) {
-                return Future.exception(new IOException("No ledger allocator 
available under " + poolPath + "."));
-            } else {
-                allocator = allocatingList.removeFirst();
-            }
-        }
-
-        final Promise<LedgerHandle> tryObtainPromise = new 
Promise<LedgerHandle>();
-        final FutureEventListener<LedgerHandle> tryObtainListener = new 
FutureEventListener<LedgerHandle>() {
-            @Override
-            public void onSuccess(LedgerHandle lh) {
-                synchronized (LedgerAllocatorPool.this) {
-                    obtainMap.put(lh, allocator);
-                    reverseObtainMap.put(allocator, lh);
-                    tryObtainPromise.setValue(lh);
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                try {
-                    rescueAllocator(allocator);
-                } catch (IOException ioe) {
-                    logger.info("Failed to rescue allocator {}", 
allocator.allocatePath, ioe);
-                }
-                tryObtainPromise.setException(cause);
-            }
-        };
-
-        allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() {
-            @Override
-            public void onCommit(LedgerHandle lh) {
-                confirmObtain(allocator);
-                listener.onCommit(lh);
-            }
-
-            @Override
-            public void onAbort(Throwable t) {
-                abortObtain(allocator);
-                listener.onAbort(t);
-            }
-        }).addEventListener(tryObtainListener);
-        return tryObtainPromise;
-    }
-
-    void confirmObtain(SimpleLedgerAllocator allocator) {
-        synchronized (this) {
-            LedgerHandle lh = reverseObtainMap.remove(allocator);
-            if (null != lh) {
-                obtainMap.remove(lh);
-            }
-        }
-        synchronized (this) {
-            pendingList.addLast(allocator);
-        }
-    }
-
-    void abortObtain(SimpleLedgerAllocator allocator) {
-        synchronized (this) {
-            LedgerHandle lh = reverseObtainMap.remove(allocator);
-            if (null != lh) {
-                obtainMap.remove(lh);
-            }
-        }
-        // if a ledger allocator is aborted, it is better to rescue it. since 
the ledger allocator might
-        // already encounter BadVersion exception.
-        try {
-            rescueAllocator(allocator);
-        } catch (DLInterruptedException e) {
-            logger.warn("Interrupted on rescuing ledger allocator pool {} : ", 
poolPath, e);
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        List<LedgerAllocator> allocatorsToClose;
-        synchronized (this) {
-            allocatorsToClose = Lists.newArrayListWithExpectedSize(
-                    pendingList.size() + allocatingList.size() + 
obtainMap.size());
-            for (LedgerAllocator allocator : pendingList) {
-                allocatorsToClose.add(allocator);
-            }
-            for (LedgerAllocator allocator : allocatingList) {
-                allocatorsToClose.add(allocator);
-            }
-            for (LedgerAllocator allocator : obtainMap.values()) {
-                allocatorsToClose.add(allocator);
-            }
-        }
-        return FutureUtils.processList(allocatorsToClose, new 
Function<LedgerAllocator, Future<Void>>() {
-            @Override
-            public Future<Void> apply(LedgerAllocator allocator) {
-                return allocator.asyncClose();
-            }
-        }, scheduledExecutorService).map(new AbstractFunction1<List<Void>, 
Void>() {
-            @Override
-            public Void apply(List<Void> values) {
-                return null;
-            }
-        });
-    }
-
-    @Override
-    public Future<Void> delete() {
-        List<LedgerAllocator> allocatorsToDelete;
-        synchronized (this) {
-            allocatorsToDelete = Lists.newArrayListWithExpectedSize(
-                    pendingList.size() + allocatingList.size() + 
obtainMap.size());
-            for (LedgerAllocator allocator : pendingList) {
-                allocatorsToDelete.add(allocator);
-            }
-            for (LedgerAllocator allocator : allocatingList) {
-                allocatorsToDelete.add(allocator);
-            }
-            for (LedgerAllocator allocator : obtainMap.values()) {
-                allocatorsToDelete.add(allocator);
-            }
-        }
-        return FutureUtils.processList(allocatorsToDelete, new 
Function<LedgerAllocator, Future<Void>>() {
-            @Override
-            public Future<Void> apply(LedgerAllocator allocator) {
-                return allocator.delete();
-            }
-        }, scheduledExecutorService).flatMap(new AbstractFunction1<List<Void>, 
Future<Void>>() {
-            @Override
-            public Future<Void> apply(List<Void> values) {
-                return Utils.zkDelete(zkc, poolPath, new ZkVersion(-1));
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java
deleted file mode 100644
index 0db6d74..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-
-import java.io.IOException;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class LedgerAllocatorUtils {
-
-    /**
-     * Create ledger allocator pool.
-     *
-     * @param poolPath
-     *          ledger allocator pool path.
-     * @param corePoolSize
-     *          ledger allocator pool core size.
-     * @param conf
-     *          distributedlog configuration.
-     * @param zkc
-     *          zookeeper client
-     * @param bkc
-     *          bookkeeper client
-     * @return ledger allocator
-     * @throws IOException
-     */
-    public static LedgerAllocator createLedgerAllocatorPool(
-            String poolPath,
-            int corePoolSize,
-            DistributedLogConfiguration conf,
-            ZooKeeperClient zkc,
-            BookKeeperClient bkc,
-            ScheduledExecutorService scheduledExecutorService) throws 
IOException {
-        return new LedgerAllocatorPool(poolPath, corePoolSize, conf, zkc, bkc, 
scheduledExecutorService);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java
deleted file mode 100644
index a9cc16c..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-import com.google.common.base.Objects;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Configuration for quorums
- */
-public class QuorumConfig {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(QuorumConfig.class);
-
-    private final int ensembleSize;
-    private final int writeQuorumSize;
-    private final int ackQuorumSize;
-
-    public QuorumConfig(int ensembleSize,
-                        int writeQuorumSize,
-                        int ackQuorumSize) {
-        this.ensembleSize = ensembleSize;
-        if (this.ensembleSize < writeQuorumSize) {
-            this.writeQuorumSize = this.ensembleSize;
-            logger.warn("Setting write quorum size {} greater than ensemble 
size {}",
-                    writeQuorumSize, this.ensembleSize);
-        } else {
-            this.writeQuorumSize = writeQuorumSize;
-        }
-        if (this.writeQuorumSize < ackQuorumSize) {
-            this.ackQuorumSize = this.writeQuorumSize;
-            logger.warn("Setting write ack quorum size {} greater than write 
quorum size {}",
-                    ackQuorumSize, this.writeQuorumSize);
-        } else {
-            this.ackQuorumSize = ackQuorumSize;
-        }
-    }
-
-    public int getEnsembleSize() {
-        return ensembleSize;
-    }
-
-    public int getWriteQuorumSize() {
-        return writeQuorumSize;
-    }
-
-    public int getAckQuorumSize() {
-        return ackQuorumSize;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(ensembleSize, writeQuorumSize, ackQuorumSize);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof QuorumConfig)) {
-            return false;
-        }
-        QuorumConfig other = (QuorumConfig) obj;
-        return ensembleSize == other.ensembleSize
-                && writeQuorumSize == other.writeQuorumSize
-                && ackQuorumSize == other.ackQuorumSize;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("QuorumConfig[ensemble=")
-          .append(ensembleSize).append(", write quorum=")
-          .append(writeQuorumSize).append(", ack quorum=")
-          .append(ackQuorumSize).append("]");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java
deleted file mode 100644
index 2f65427..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-/**
- * Provider to provide quorum config
- */
-public interface QuorumConfigProvider {
-
-    /**
-     * Get the quorum config for a given log stream.
-     *
-     * @return quorum config
-     */
-    QuorumConfig getQuorumConfig();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java
deleted file mode 100644
index ab5976e..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.bk;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.distributedlog.zk.ZKTransaction;
-import com.twitter.distributedlog.zk.ZKVersionedSetOp;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Allocator to allocate ledgers.
- */
-public class SimpleLedgerAllocator implements LedgerAllocator, 
FutureEventListener<LedgerHandle>, OpListener<Version> {
-
-    static final Logger LOG = 
LoggerFactory.getLogger(SimpleLedgerAllocator.class);
-
-    static enum Phase {
-        ALLOCATING, ALLOCATED, HANDING_OVER, HANDED_OVER, ERROR
-    }
-
-    static class AllocationException extends IOException {
-
-        private static final long serialVersionUID = -1111397872059426882L;
-
-        private final Phase phase;
-
-        public AllocationException(Phase phase, String msg) {
-            super(msg);
-            this.phase = phase;
-        }
-
-        public Phase getPhase() {
-            return this.phase;
-        }
-
-    }
-
-    static class ConcurrentObtainException extends AllocationException {
-
-        private static final long serialVersionUID = -8532471098537176913L;
-
-        public ConcurrentObtainException(Phase phase, String msg) {
-            super(phase, msg);
-        }
-    }
-
-    // zookeeper client
-    final ZooKeeperClient zkc;
-    // bookkeeper client
-    final BookKeeperClient bkc;
-    // znode path
-    final String allocatePath;
-    // allocation phase
-    Phase phase = Phase.HANDED_OVER;
-    // version
-    ZkVersion version = new ZkVersion(-1);
-    // outstanding allocation
-    Promise<LedgerHandle> allocatePromise;
-    // outstanding tryObtain transaction
-    Transaction<Object> tryObtainTxn = null;
-    OpListener<LedgerHandle> tryObtainListener = null;
-    // ledger id left from previous allocation
-    Long ledgerIdLeftFromPrevAllocation = null;
-    // Allocated Ledger
-    LedgerHandle allocatedLh = null;
-
-    Future<Void> closeFuture = null;
-    final LinkedList<Future<Void>> ledgerDeletions =
-            new LinkedList<Future<Void>>();
-
-    // Ledger configuration
-    private final QuorumConfigProvider quorumConfigProvider;
-
-    static Future<Versioned<byte[]>> getAndCreateAllocationData(final String 
allocatePath,
-                                                                final 
ZooKeeperClient zkc) {
-        return Utils.zkGetData(zkc, allocatePath, false)
-                .flatMap(new AbstractFunction1<Versioned<byte[]>, 
Future<Versioned<byte[]>>>() {
-            @Override
-            public Future<Versioned<byte[]>> apply(Versioned<byte[]> result) {
-                if (null != result && null != result.getVersion() && null != 
result.getValue()) {
-                    return Future.value(result);
-                }
-                return createAllocationData(allocatePath, zkc);
-            }
-        });
-    }
-
-    private static Future<Versioned<byte[]>> createAllocationData(final String 
allocatePath,
-                                                                  final 
ZooKeeperClient zkc) {
-        try {
-            final Promise<Versioned<byte[]>> promise = new 
Promise<Versioned<byte[]>>();
-            zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES,
-                    zkc.getDefaultACL(), CreateMode.PERSISTENT,
-                    new org.apache.zookeeper.AsyncCallback.Create2Callback() {
-                        @Override
-                        public void processResult(int rc, String path, Object 
ctx, String name, Stat stat) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.setValue(new 
Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
-                                        new ZkVersion(stat.getVersion())));
-                            } else if 
(KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                                Utils.zkGetData(zkc, allocatePath, 
false).proxyTo(promise);
-                            } else {
-                                promise.setException(FutureUtils.zkException(
-                                        
KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
-                            }
-                        }
-                    }, null);
-            return promise;
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, allocatePath));
-        } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, allocatePath));
-        }
-    }
-
-    public static Future<SimpleLedgerAllocator> of(final String allocatePath,
-                                                   final Versioned<byte[]> 
allocationData,
-                                                   final QuorumConfigProvider 
quorumConfigProvider,
-                                                   final ZooKeeperClient zkc,
-                                                   final BookKeeperClient bkc) 
{
-        if (null != allocationData && null != allocationData.getValue()
-                && null != allocationData.getVersion()) {
-            return Future.value(new SimpleLedgerAllocator(allocatePath, 
allocationData,
-                    quorumConfigProvider, zkc, bkc));
-        }
-        return getAndCreateAllocationData(allocatePath, zkc)
-                .map(new AbstractFunction1<Versioned<byte[]>, 
SimpleLedgerAllocator>() {
-            @Override
-            public SimpleLedgerAllocator apply(Versioned<byte[]> 
allocationData) {
-                return new SimpleLedgerAllocator(allocatePath, allocationData,
-                        quorumConfigProvider, zkc, bkc);
-            }
-        });
-    }
-
-    /**
-     * Construct a ledger allocator.
-     *
-     * @param allocatePath
-     *          znode path to store the allocated ledger.
-     * @param allocationData
-     *          allocation data.
-     * @param quorumConfigProvider
-     *          Quorum configuration provider.
-     * @param zkc
-     *          zookeeper client.
-     * @param bkc
-     *          bookkeeper client.
-     */
-    public SimpleLedgerAllocator(String allocatePath,
-                                 Versioned<byte[]> allocationData,
-                                 QuorumConfigProvider quorumConfigProvider,
-                                 ZooKeeperClient zkc,
-                                 BookKeeperClient bkc) {
-        this.zkc = zkc;
-        this.bkc = bkc;
-        this.allocatePath = allocatePath;
-        this.quorumConfigProvider = quorumConfigProvider;
-        initialize(allocationData);
-    }
-
-    /**
-     * Initialize the allocator.
-     *
-     * @param allocationData
-     *          Allocation Data.
-     */
-    private void initialize(Versioned<byte[]> allocationData) {
-        setVersion((ZkVersion) allocationData.getVersion());
-        byte[] data = allocationData.getValue();
-        if (null != data && data.length > 0) {
-            // delete the allocated ledger since this is left by last 
allocation.
-            try {
-                ledgerIdLeftFromPrevAllocation = 
DLUtils.bytes2LogSegmentId(data);
-            } catch (NumberFormatException nfe) {
-                LOG.warn("Invalid data found in allocator path {} : ", 
allocatePath, nfe);
-            }
-        }
-
-    }
-
-    private synchronized void 
deleteLedgerLeftFromPreviousAllocationIfNecessary() {
-        if (null != ledgerIdLeftFromPrevAllocation) {
-            LOG.info("Deleting allocated-but-unused ledger left from previous 
allocation {}.", ledgerIdLeftFromPrevAllocation);
-            deleteLedger(ledgerIdLeftFromPrevAllocation);
-            ledgerIdLeftFromPrevAllocation = null;
-        }
-    }
-
-    @Override
-    public synchronized void allocate() throws IOException {
-        if (Phase.ERROR == phase) {
-            throw new AllocationException(Phase.ERROR, "Error on ledger 
allocator for " + allocatePath);
-        }
-        if (Phase.HANDED_OVER == phase) {
-            // issue an allocate request when ledger is already handed over.
-            allocateLedger();
-        }
-    }
-
-    @Override
-    public synchronized Future<LedgerHandle> tryObtain(final 
Transaction<Object> txn,
-                                                       final 
OpListener<LedgerHandle> listener) {
-        if (Phase.ERROR == phase) {
-            return Future.exception(new AllocationException(Phase.ERROR,
-                    "Error on allocating ledger under " + allocatePath));
-        }
-        if (Phase.HANDING_OVER == phase || Phase.HANDED_OVER == phase || null 
!= tryObtainTxn) {
-            return Future.exception(new ConcurrentObtainException(phase,
-                    "Ledger handle is handling over to another thread : " + 
phase));
-        }
-        tryObtainTxn = txn;
-        tryObtainListener = listener;
-        if (null != allocatedLh) {
-            completeAllocation(allocatedLh);
-        }
-        return allocatePromise;
-    }
-
-    @Override
-    public void onCommit(Version r) {
-        confirmObtain((ZkVersion) r);
-    }
-
-    private void confirmObtain(ZkVersion zkVersion) {
-        boolean shouldAllocate = false;
-        OpListener<LedgerHandle> listenerToNotify = null;
-        LedgerHandle lhToNotify = null;
-        synchronized (this) {
-            if (Phase.HANDING_OVER == phase) {
-                setPhase(Phase.HANDED_OVER);
-                setVersion(zkVersion);
-                listenerToNotify = tryObtainListener;
-                lhToNotify = allocatedLh;
-                // reset the state
-                allocatedLh = null;
-                allocatePromise = null;
-                tryObtainTxn = null;
-                tryObtainListener = null;
-                // mark flag to issue an allocation request
-                shouldAllocate = true;
-            }
-        }
-        if (null != listenerToNotify && null != lhToNotify) {
-            // notify the listener
-            listenerToNotify.onCommit(lhToNotify);
-        }
-        if (shouldAllocate) {
-            // issue an allocation request
-            allocateLedger();
-        }
-    }
-
-    @Override
-    public void onAbort(Throwable t) {
-        OpListener<LedgerHandle> listenerToNotify;
-        synchronized (this) {
-            listenerToNotify = tryObtainListener;
-            if (t instanceof KeeperException &&
-                    ((KeeperException) t).code() == 
KeeperException.Code.BADVERSION) {
-                LOG.info("Set ledger allocator {} to ERROR state after hit bad 
version : version = {}",
-                        allocatePath, getVersion());
-                setPhase(Phase.ERROR);
-            } else {
-                if (Phase.HANDING_OVER == phase) {
-                    setPhase(Phase.ALLOCATED);
-                    tryObtainTxn = null;
-                    tryObtainListener = null;
-                }
-            }
-        }
-        if (null != listenerToNotify) {
-            listenerToNotify.onAbort(t);
-        }
-    }
-
-    private synchronized void setPhase(Phase phase) {
-        this.phase = phase;
-        LOG.info("Ledger allocator {} moved to phase {} : version = {}.",
-                new Object[] { allocatePath, phase, version });
-    }
-
-    private synchronized void allocateLedger() {
-        // make sure previous allocation is already handed over.
-        if (Phase.HANDED_OVER != phase) {
-            LOG.error("Trying allocate ledger for {} in phase {}, giving up.", 
allocatePath, phase);
-            return;
-        }
-        setPhase(Phase.ALLOCATING);
-        allocatePromise = new Promise<LedgerHandle>();
-        QuorumConfig quorumConfig = quorumConfigProvider.getQuorumConfig();
-        bkc.createLedger(
-                quorumConfig.getEnsembleSize(),
-                quorumConfig.getWriteQuorumSize(),
-                quorumConfig.getAckQuorumSize()
-        ).addEventListener(this);
-    }
-
-    private synchronized void completeAllocation(LedgerHandle lh) {
-        allocatedLh = lh;
-        if (null == tryObtainTxn) {
-            return;
-        }
-        org.apache.zookeeper.Op zkSetDataOp = org.apache.zookeeper.Op.setData(
-                allocatePath, DistributedLogConstants.EMPTY_BYTES, 
version.getZnodeVersion());
-        ZKVersionedSetOp commitOp = new ZKVersionedSetOp(zkSetDataOp, this);
-        tryObtainTxn.addOp(commitOp);
-        setPhase(Phase.HANDING_OVER);
-        FutureUtils.setValue(allocatePromise, lh);
-    }
-
-    private synchronized void failAllocation(Throwable cause) {
-        FutureUtils.setException(allocatePromise, cause);
-    }
-
-    @Override
-    public void onSuccess(LedgerHandle lh) {
-        // a ledger is created, update the ledger to allocation path before 
handling it over for usage.
-        markAsAllocated(lh);
-    }
-
-    @Override
-    public void onFailure(Throwable cause) {
-        LOG.error("Error creating ledger for allocating {} : ", allocatePath, 
cause);
-        setPhase(Phase.ERROR);
-        failAllocation(cause);
-    }
-
-    private synchronized ZkVersion getVersion() {
-        return version;
-    }
-
-    private synchronized void setVersion(ZkVersion newVersion) {
-        Version.Occurred occurred = newVersion.compare(version);
-        if (occurred == Version.Occurred.AFTER) {
-            LOG.info("Ledger allocator for {} moved version from {} to {}.",
-                    new Object[] { allocatePath, version, newVersion });
-            version = newVersion;
-        } else {
-            LOG.warn("Ledger allocator for {} received an old version {}, 
current version is {}.",
-                    new Object[] { allocatePath, newVersion , version });
-        }
-    }
-
-    private void markAsAllocated(final LedgerHandle lh) {
-        byte[] data = DLUtils.logSegmentId2Bytes(lh.getId());
-        Utils.zkSetData(zkc, allocatePath, data, getVersion())
-            .addEventListener(new FutureEventListener<ZkVersion>() {
-                @Override
-                public void onSuccess(ZkVersion version) {
-                    // we only issue deleting ledger left from previous 
allocation when we could allocate first ledger
-                    // as zookeeper version could prevent us doing stupid 
things.
-                    deleteLedgerLeftFromPreviousAllocationIfNecessary();
-                    setVersion(version);
-                    setPhase(Phase.ALLOCATED);
-                    // complete the allocation after it is marked as allocated
-                    completeAllocation(lh);
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    setPhase(Phase.ERROR);
-                    deleteLedger(lh.getId());
-                    LOG.error("Fail mark ledger {} as allocated under {} : ",
-                            new Object[] { lh.getId(), allocatePath, cause });
-                    // fail the allocation since failed to mark it as allocated
-                    failAllocation(cause);
-                }
-            });
-    }
-
-    void deleteLedger(final long ledgerId) {
-        final Future<Void> deleteFuture = bkc.deleteLedger(ledgerId, true);
-        synchronized (ledgerDeletions) {
-            ledgerDeletions.add(deleteFuture);
-        }
-        deleteFuture.onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable cause) {
-                LOG.error("Error deleting ledger {} for ledger allocator {}, 
retrying : ",
-                        new Object[] { ledgerId, allocatePath, cause });
-                if (!isClosing()) {
-                    deleteLedger(ledgerId);
-                }
-                return BoxedUnit.UNIT;
-            }
-        }).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                synchronized (ledgerDeletions) {
-                    ledgerDeletions.remove(deleteFuture);
-                }
-                return BoxedUnit.UNIT;
-            }
-        });
-    }
-
-    private synchronized boolean isClosing() {
-        return closeFuture != null;
-    }
-
-    private Future<Void> closeInternal(boolean cleanup) {
-        Promise<Void> closePromise;
-        synchronized (this) {
-            if (null != closeFuture) {
-                return closeFuture;
-            }
-            closePromise = new Promise<Void>();
-            closeFuture = closePromise;
-        }
-        if (!cleanup) {
-            LOG.info("Abort ledger allocator without cleaning up on {}.", 
allocatePath);
-            FutureUtils.setValue(closePromise, null);
-            return closePromise;
-        }
-        cleanupAndClose(closePromise);
-        return closePromise;
-    }
-
-    private void cleanupAndClose(final Promise<Void> closePromise) {
-        LOG.info("Closing ledger allocator on {}.", allocatePath);
-        final ZKTransaction txn = new ZKTransaction(zkc);
-        // try obtain ledger handle
-        tryObtain(txn, new OpListener<LedgerHandle>() {
-            @Override
-            public void onCommit(LedgerHandle r) {
-                // no-op
-                complete();
-            }
-
-            @Override
-            public void onAbort(Throwable t) {
-                // no-op
-                complete();
-            }
-
-            private void complete() {
-                FutureUtils.setValue(closePromise, null);
-                LOG.info("Closed ledger allocator on {}.", allocatePath);
-            }
-        }).addEventListener(new FutureEventListener<LedgerHandle>() {
-            @Override
-            public void onSuccess(LedgerHandle lh) {
-                // try obtain succeed
-                // if we could obtain the ledger handle, we have the 
responsibility to close it
-                deleteLedger(lh.getId());
-                // wait for deletion to be completed
-                List<Future<Void>> outstandingDeletions;
-                synchronized (ledgerDeletions) {
-                    outstandingDeletions = Lists.newArrayList(ledgerDeletions);
-                }
-                Future.collect(outstandingDeletions).addEventListener(new 
FutureEventListener<List<Void>>() {
-                    @Override
-                    public void onSuccess(List<Void> values) {
-                        txn.execute();
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        LOG.debug("Fail to obtain the allocated ledger handle 
when closing the allocator : ", cause);
-                        FutureUtils.setValue(closePromise, null);
-                    }
-                });
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                LOG.debug("Fail to obtain the allocated ledger handle when 
closing the allocator : ", cause);
-                FutureUtils.setValue(closePromise, null);
-            }
-        });
-
-    }
-
-    @Override
-    public void start() {
-        // nop
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        return closeInternal(false);
-    }
-
-    @Override
-    public Future<Void> delete() {
-        return closeInternal(true).flatMap(new AbstractFunction1<Void, 
Future<Void>>() {
-            @Override
-            public Future<Void> apply(Void value) {
-                return Utils.zkDelete(zkc, allocatePath, getVersion());
-            }
-        });
-    }
-
-}


Reply via email to