http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
deleted file mode 100644
index 03d70bd..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ /dev/null
@@ -1,2873 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.tools;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import com.twitter.distributedlog.BKDistributedLogNamespace;
-import com.twitter.distributedlog.Entry;
-import com.twitter.distributedlog.MetadataAccessor;
-import com.twitter.distributedlog.callback.NamespaceListener;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.util.Utils;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeperAccessor;
-import org.apache.bookkeeper.client.BookKeeperAdmin;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.client.LedgerReader;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.distributedlog.AsyncLogReader;
-import com.twitter.distributedlog.AsyncLogWriter;
-import com.twitter.distributedlog.BookKeeperClient;
-import com.twitter.distributedlog.BookKeeperClientBuilder;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.LogReader;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.auditor.DLAuditor;
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.MetadataUpdater;
-import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Await;
-import com.twitter.util.FutureEventListener;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class DistributedLogTool extends Tool {
-
-    static final Logger logger = 
LoggerFactory.getLogger(DistributedLogTool.class);
-
-    static final List<String> EMPTY_LIST = Lists.newArrayList();
-
-    static int compareByCompletionTime(long time1, long time2) {
-        return time1 > time2 ? 1 : (time1 < time2 ? -1 : 0);
-    }
-
-    static final Comparator<LogSegmentMetadata> LOGSEGMENT_COMPARATOR_BY_TIME 
= new Comparator<LogSegmentMetadata>() {
-        @Override
-        public int compare(LogSegmentMetadata o1, LogSegmentMetadata o2) {
-            if (o1.isInProgress() && o2.isInProgress()) {
-                return compareByCompletionTime(o1.getFirstTxId(), 
o2.getFirstTxId());
-            } else if (!o1.isInProgress() && !o2.isInProgress()) {
-                return compareByCompletionTime(o1.getCompletionTime(), 
o2.getCompletionTime());
-            } else if (o1.isInProgress() && !o2.isInProgress()) {
-                return compareByCompletionTime(o1.getFirstTxId(), 
o2.getCompletionTime());
-            } else {
-                return compareByCompletionTime(o1.getCompletionTime(), 
o2.getFirstTxId());
-            }
-        }
-    };
-
-    static DLSN parseDLSN(String dlsnStr) throws ParseException {
-        if (dlsnStr.equals("InitialDLSN")) {
-            return DLSN.InitialDLSN;
-        }
-        String[] parts = dlsnStr.split(",");
-        if (parts.length != 3) {
-            throw new ParseException("Invalid dlsn : " + dlsnStr);
-        }
-        try {
-            return new DLSN(Long.parseLong(parts[0]), 
Long.parseLong(parts[1]), Long.parseLong(parts[2]));
-        } catch (Exception nfe) {
-            throw new ParseException("Invalid dlsn : " + dlsnStr);
-        }
-    }
-
-    /**
-     * Per DL Command, which parses basic options. e.g. uri.
-     */
-    protected abstract static class PerDLCommand extends OptsCommand {
-
-        protected Options options = new Options();
-        protected final DistributedLogConfiguration dlConf;
-        protected URI uri;
-        protected String zkAclId = null;
-        protected boolean force = false;
-        protected DistributedLogNamespace namespace = null;
-
-        protected PerDLCommand(String name, String description) {
-            super(name, description);
-            dlConf = new DistributedLogConfiguration();
-            // Tools are allowed to read old metadata as long as they can 
interpret it
-            dlConf.setDLLedgerMetadataSkipMinVersionCheck(true);
-            options.addOption("u", "uri", true, "DistributedLog URI");
-            options.addOption("c", "conf", true, "DistributedLog Configuration 
File");
-            options.addOption("a", "zk-acl-id", true, "Zookeeper ACL ID");
-            options.addOption("f", "force", false, "Force command (no warnings 
or prompts)");
-        }
-
-        @Override
-        protected int runCmd(CommandLine commandLine) throws Exception {
-            try {
-                parseCommandLine(commandLine);
-            } catch (ParseException pe) {
-                System.err.println("ERROR: failed to parse commandline : '" + 
pe.getMessage() + "'");
-                printUsage();
-                return -1;
-            }
-            try {
-                return runCmd();
-            } finally {
-                if (null != namespace) {
-                    namespace.close();
-                }
-            }
-        }
-
-        protected abstract int runCmd() throws Exception;
-
-        @Override
-        protected Options getOptions() {
-            return options;
-        }
-
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            if (!cmdline.hasOption("u")) {
-                throw new ParseException("No distributedlog uri provided.");
-            }
-            uri = URI.create(cmdline.getOptionValue("u"));
-            if (cmdline.hasOption("c")) {
-                String configFile = cmdline.getOptionValue("c");
-                try {
-                    dlConf.loadConf(new File(configFile).toURI().toURL());
-                } catch (ConfigurationException e) {
-                    throw new ParseException("Failed to load distributedlog 
configuration from " + configFile + ".");
-                } catch (MalformedURLException e) {
-                    throw new ParseException("Failed to load distributedlog 
configuration from " + configFile + ": malformed uri.");
-                }
-            }
-            if (cmdline.hasOption("a")) {
-                zkAclId = cmdline.getOptionValue("a");
-            }
-            if (cmdline.hasOption("f")) {
-                force = true;
-            }
-        }
-
-        protected DistributedLogConfiguration getConf() {
-            return dlConf;
-        }
-
-        protected URI getUri() {
-            return uri;
-        }
-
-        protected void setUri(URI uri) {
-            this.uri = uri;
-        }
-
-        protected String getZkAclId() {
-            return zkAclId;
-        }
-
-        protected void setZkAclId(String zkAclId) {
-            this.zkAclId = zkAclId;
-        }
-
-        protected boolean getForce() {
-            return force;
-        }
-
-        protected void setForce(boolean force) {
-            this.force = force;
-        }
-
-        protected DistributedLogNamespace getNamespace() throws IOException {
-            if (null == this.namespace) {
-                this.namespace = DistributedLogNamespaceBuilder.newBuilder()
-                        .uri(getUri())
-                        .conf(getConf())
-                        .build();
-            }
-            return this.namespace;
-        }
-
-        protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws 
IOException {
-            return getNamespace()
-                    .getNamespaceDriver()
-                    .getLogStreamMetadataStore(NamespaceDriver.Role.READER)
-                    .getLogSegmentMetadataStore();
-        }
-
-        protected ZooKeeperClient getZooKeeperClient() throws IOException {
-            NamespaceDriver driver = getNamespace().getNamespaceDriver();
-            assert(driver instanceof BKNamespaceDriver);
-            return ((BKNamespaceDriver) driver).getWriterZKC();
-        }
-
-        protected BookKeeperClient getBookKeeperClient() throws IOException {
-            NamespaceDriver driver = getNamespace().getNamespaceDriver();
-            assert(driver instanceof BKNamespaceDriver);
-            return ((BKNamespaceDriver) driver).getReaderBKC();
-        }
-    }
-
-    /**
-     * Base class for simple command with no resource setup requirements.
-     */
-    public abstract static class SimpleCommand extends OptsCommand {
-
-        protected final Options options = new Options();
-
-        SimpleCommand(String name, String description) {
-            super(name, description);
-        }
-
-        @Override
-        protected int runCmd(CommandLine commandLine) throws Exception {
-            try {
-                parseCommandLine(commandLine);
-            } catch (ParseException pe) {
-                System.err.println("ERROR: failed to parse commandline : '" + 
pe.getMessage() + "'");
-                printUsage();
-                return -1;
-            }
-            return runSimpleCmd();
-        }
-
-        abstract protected int runSimpleCmd() throws Exception;
-
-        abstract protected void parseCommandLine(CommandLine cmdline) throws 
ParseException;
-
-        @Override
-        protected Options getOptions() {
-            return options;
-        }
-    }
-
-    /**
-     * Per Stream Command, which parse common options for per stream. e.g. 
stream name.
-     */
-    abstract static class PerStreamCommand extends PerDLCommand {
-
-        protected String streamName;
-
-        protected PerStreamCommand(String name, String description) {
-            super(name, description);
-            options.addOption("s", "stream", true, "Stream Name");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            if (!cmdline.hasOption("s")) {
-                throw new ParseException("No stream name provided.");
-            }
-            streamName = cmdline.getOptionValue("s");
-        }
-
-        protected String getStreamName() {
-            return streamName;
-        }
-
-        protected void setStreamName(String streamName) {
-            this.streamName = streamName;
-        }
-    }
-
-    /**
-     * NOTE: we might consider adding a command to 'delete' namespace. The 
implementation of the namespace
-     *       driver should implement the 'delete' operation.
-     */
-    protected static class DeleteAllocatorPoolCommand extends PerDLCommand {
-
-        int concurrency = 1;
-        String allocationPoolPath = 
DistributedLogConstants.ALLOCATION_POOL_NODE;
-
-        DeleteAllocatorPoolCommand() {
-            super("delete_allocator_pool", "Delete allocator pool for a given 
distributedlog instance");
-            options.addOption("t", "concurrency", true, "Concurrency on 
deleting allocator pool.");
-            options.addOption("ap", "allocation-pool-path", true, "Ledger 
Allocation Pool Path");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            if (cmdline.hasOption("t")) {
-                concurrency = Integer.parseInt(cmdline.getOptionValue("t"));
-                if (concurrency <= 0) {
-                    throw new ParseException("Invalid concurrency value : " + 
concurrency + ": it must be greater or equal to 0.");
-                }
-            }
-            if (cmdline.hasOption("ap")) {
-                allocationPoolPath = cmdline.getOptionValue("ap");
-                if (!allocationPoolPath.startsWith(".") || 
!allocationPoolPath.contains("allocation")) {
-                    throw new ParseException("Invalid allocation pool path : " 
+ allocationPoolPath + ": it must starts with a '.' and must contains 
'allocation'");
-                }
-            }
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            String rootPath = getUri().getPath() + "/" + allocationPoolPath;
-            final ScheduledExecutorService allocationExecutor = 
Executors.newSingleThreadScheduledExecutor();
-            ExecutorService executorService = 
Executors.newFixedThreadPool(concurrency);
-            Preconditions.checkArgument(getNamespace() instanceof 
BKDistributedLogNamespace);
-            BKDistributedLogNamespace bkns = (BKDistributedLogNamespace) 
getNamespace();
-            final ZooKeeperClient zkc = ((BKNamespaceDriver) 
bkns.getNamespaceDriver()).getWriterZKC();
-            final BookKeeperClient bkc = ((BKNamespaceDriver) 
bkns.getNamespaceDriver()).getReaderBKC();
-            try {
-                List<String> pools = zkc.get().getChildren(rootPath, false);
-                final LinkedBlockingQueue<String> poolsToDelete = new 
LinkedBlockingQueue<String>();
-                if (getForce() || IOUtils.confirmPrompt("Are you sure you want 
to delete allocator pools : " + pools)) {
-                    for (String pool : pools) {
-                        poolsToDelete.add(rootPath + "/" + pool);
-                    }
-                    final CountDownLatch doneLatch = new 
CountDownLatch(concurrency);
-                    for (int i = 0; i < concurrency; i++) {
-                        final int tid = i;
-                        executorService.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                while (!poolsToDelete.isEmpty()) {
-                                    String poolPath = poolsToDelete.poll();
-                                    if (null == poolPath) {
-                                        break;
-                                    }
-                                    try {
-                                        LedgerAllocator allocator =
-                                                
LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(),
-                                                        zkc, bkc,
-                                                        allocationExecutor);
-                                        allocator.delete();
-                                        System.out.println("Deleted allocator 
pool : " + poolPath + " .");
-                                    } catch (IOException ioe) {
-                                        System.err.println("Failed to delete 
allocator pool " + poolPath + " : " + ioe.getMessage());
-                                    }
-                                }
-                                doneLatch.countDown();
-                                System.out.println("Thread " + tid + " is 
done.");
-                            }
-                        });
-                    }
-                    doneLatch.await();
-                }
-            } finally {
-                executorService.shutdown();
-                allocationExecutor.shutdown();
-            }
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "delete_allocator_pool";
-        }
-    }
-
-    public static class ListCommand extends PerDLCommand {
-
-        boolean printMetadata = false;
-        boolean printHex = false;
-
-        ListCommand() {
-            super("list", "list streams of a given distributedlog instance");
-            options.addOption("m", "meta", false, "Print metadata associated 
with each stream");
-            options.addOption("x", "hex", false, "Print metadata in hex 
format");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            printMetadata = cmdline.hasOption("m");
-            printHex = cmdline.hasOption("x");
-        }
-
-        @Override
-        protected String getUsage() {
-            return "list [options]";
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            printStreams(getNamespace());
-            return 0;
-        }
-
-        protected void printStreams(DistributedLogNamespace namespace) throws 
Exception {
-            Iterator<String> streams = namespace.getLogs();
-            System.out.println("Streams under " + getUri() + " : ");
-            System.out.println("--------------------------------");
-            while (streams.hasNext()) {
-                String streamName = streams.next();
-                System.out.println(streamName);
-                if (!printMetadata) {
-                    continue;
-                }
-                MetadataAccessor accessor =
-                        
namespace.getNamespaceDriver().getMetadataAccessor(streamName);
-                byte[] metadata = accessor.getMetadata();
-                if (null == metadata || metadata.length == 0) {
-                    continue;
-                }
-                if (printHex) {
-                    System.out.println(Hex.encodeHexString(metadata));
-                } else {
-                    System.out.println(new String(metadata, UTF_8));
-                }
-                System.out.println("");
-            }
-            System.out.println("--------------------------------");
-        }
-    }
-
-    public static class WatchNamespaceCommand extends PerDLCommand implements 
NamespaceListener {
-        private Set<String> currentSet = Sets.<String>newHashSet();
-        private CountDownLatch doneLatch = new CountDownLatch(1);
-
-        WatchNamespaceCommand() {
-            super("watch", "watch and report changes for a dl namespace");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-        }
-
-        @Override
-        protected String getUsage() {
-            return "watch [options]";
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            watchAndReportChanges(getNamespace());
-            doneLatch.await();
-            return 0;
-        }
-
-        @Override
-        public synchronized void onStreamsChanged(Iterator<String> streams) {
-            Set<String> updatedSet = Sets.newHashSet(streams);
-            Set<String> oldStreams = Sets.difference(currentSet, updatedSet);
-            Set<String> newStreams = Sets.difference(updatedSet, currentSet);
-            currentSet = updatedSet;
-
-            System.out.println("Old streams : ");
-            for (String stream : oldStreams) {
-                System.out.println(stream);
-            }
-
-            System.out.println("New streams : ");
-            for (String stream : newStreams) {
-                System.out.println(stream);
-            }
-
-            System.out.println("");
-        }
-
-        protected void watchAndReportChanges(DistributedLogNamespace 
namespace) throws Exception {
-            namespace.registerNamespaceListener(this);
-        }
-    }
-
-    protected static class InspectCommand extends PerDLCommand {
-
-        int numThreads = 1;
-        String streamPrefix = null;
-        boolean printInprogressOnly = false;
-        boolean dumpEntries = false;
-        boolean orderByTime = false;
-        boolean printStreamsOnly = false;
-        boolean checkInprogressOnly = false;
-
-        InspectCommand() {
-            super("inspect", "Inspect streams under a given dl uri to find any 
potential corruptions");
-            options.addOption("t", "threads", true, "Number threads to do 
inspection.");
-            options.addOption("ft", "filter", true, "Stream filter by prefix");
-            options.addOption("i", "inprogress", false, "Print inprogress log 
segments only");
-            options.addOption("d", "dump", false, "Dump entries of inprogress 
log segments");
-            options.addOption("ot", "orderbytime", false, "Order the log 
segments by completion time");
-            options.addOption("pso", "print-stream-only", false, "Print 
streams only");
-            options.addOption("cio", "check-inprogress-only", false, "Check 
duplicated inprogress only");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            if (cmdline.hasOption("t")) {
-                numThreads = Integer.parseInt(cmdline.getOptionValue("t"));
-            }
-            if (cmdline.hasOption("ft")) {
-                streamPrefix = cmdline.getOptionValue("ft");
-            }
-            printInprogressOnly = cmdline.hasOption("i");
-            dumpEntries = cmdline.hasOption("d");
-            orderByTime = cmdline.hasOption("ot");
-            printStreamsOnly = cmdline.hasOption("pso");
-            checkInprogressOnly = cmdline.hasOption("cio");
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> 
corruptedCandidates =
-                    new TreeMap<String, List<Pair<LogSegmentMetadata, 
List<String>>>>();
-            inspectStreams(corruptedCandidates);
-            System.out.println("Corrupted Candidates : ");
-            if (printStreamsOnly) {
-                System.out.println(corruptedCandidates.keySet());
-                return 0;
-            }
-            for (Map.Entry<String, List<Pair<LogSegmentMetadata, 
List<String>>>> entry : corruptedCandidates.entrySet()) {
-                System.out.println(entry.getKey() + " : \n");
-                for (Pair<LogSegmentMetadata, List<String>> pair : 
entry.getValue()) {
-                    System.out.println("\t - " + pair.getLeft());
-                    if (printInprogressOnly && dumpEntries) {
-                        int i = 0;
-                        for (String entryData : pair.getRight()) {
-                            System.out.println("\t" + i + "\t: " + entryData);
-                            ++i;
-                        }
-                    }
-                }
-                System.out.println();
-            }
-            return 0;
-        }
-
-        private void inspectStreams(final SortedMap<String, 
List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates)
-                throws Exception {
-            Iterator<String> streamCollection = getNamespace().getLogs();
-            final List<String> streams = new ArrayList<String>();
-            while (streamCollection.hasNext()) {
-                String s = streamCollection.next();
-                if (null != streamPrefix) {
-                    if (s.startsWith(streamPrefix)) {
-                        streams.add(s);
-                    }
-                } else {
-                    streams.add(s);
-                }
-            }
-            if (0 == streams.size()) {
-                return;
-            }
-            println("Streams : " + streams);
-            if (!getForce() && !IOUtils.confirmPrompt("Are you sure you want 
to inspect " + streams.size() + " streams")) {
-                return;
-            }
-            numThreads = Math.min(streams.size(), numThreads);
-            final int numStreamsPerThreads = streams.size() / numThreads;
-            Thread[] threads = new Thread[numThreads];
-            for (int i = 0; i < numThreads; i++) {
-                final int tid = i;
-                threads[i] = new Thread("Inspect-" + i) {
-                    @Override
-                    public void run() {
-                        try {
-                            inspectStreams(streams, tid, numStreamsPerThreads, 
corruptedCandidates);
-                            System.out.println("Thread " + tid + " finished.");
-                        } catch (Exception e) {
-                            System.err.println("Thread " + tid + " quits with 
exception : " + e.getMessage());
-                        }
-                    }
-                };
-                threads[i].start();
-            }
-            for (int i = 0; i < numThreads; i++) {
-                threads[i].join();
-            }
-        }
-
-        private void inspectStreams(List<String> streams,
-                                    int tid,
-                                    int numStreamsPerThreads,
-                                    SortedMap<String, 
List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates)
-                throws Exception {
-            int startIdx = tid * numStreamsPerThreads;
-            int endIdx = Math.min(streams.size(), (tid + 1) * 
numStreamsPerThreads);
-            for (int i = startIdx; i < endIdx; i++) {
-                String s = streams.get(i);
-                BookKeeperClient bkc = getBookKeeperClient();
-                DistributedLogManager dlm = getNamespace().openLog(s);
-                try {
-                    List<LogSegmentMetadata> segments = dlm.getLogSegments();
-                    if (segments.size() <= 1) {
-                        continue;
-                    }
-                    boolean isCandidate = false;
-                    if (checkInprogressOnly) {
-                        Set<Long> inprogressSeqNos = new HashSet<Long>();
-                        for (LogSegmentMetadata segment : segments) {
-                            if (segment.isInProgress()) {
-                                
inprogressSeqNos.add(segment.getLogSegmentSequenceNumber());
-                            }
-                        }
-                        for (LogSegmentMetadata segment : segments) {
-                            if (!segment.isInProgress() && 
inprogressSeqNos.contains(segment.getLogSegmentSequenceNumber())) {
-                                isCandidate = true;
-                            }
-                        }
-                    } else {
-                        LogSegmentMetadata firstSegment = segments.get(0);
-                        long lastSeqNo = 
firstSegment.getLogSegmentSequenceNumber();
-
-                        for (int j = 1; j < segments.size(); j++) {
-                            LogSegmentMetadata nextSegment = segments.get(j);
-                            if (lastSeqNo + 1 != 
nextSegment.getLogSegmentSequenceNumber()) {
-                                isCandidate = true;
-                                break;
-                            }
-                            ++lastSeqNo;
-                        }
-                    }
-                    if (isCandidate) {
-                        if (orderByTime) {
-                            Collections.sort(segments, 
LOGSEGMENT_COMPARATOR_BY_TIME);
-                        }
-                        List<Pair<LogSegmentMetadata, List<String>>> ledgers =
-                                new ArrayList<Pair<LogSegmentMetadata, 
List<String>>>();
-                        for (LogSegmentMetadata seg : segments) {
-                            LogSegmentMetadata segment = seg;
-                            List<String> dumpedEntries = new 
ArrayList<String>();
-                            if (segment.isInProgress()) {
-                                LedgerHandle lh = 
bkc.get().openLedgerNoRecovery(segment.getLogSegmentId(), 
BookKeeper.DigestType.CRC32,
-                                                                               
  dlConf.getBKDigestPW().getBytes(UTF_8));
-                                try {
-                                    long lac = lh.readLastConfirmed();
-                                    segment = 
segment.mutator().setLastEntryId(lac).build();
-                                    if (printInprogressOnly && dumpEntries && 
lac >= 0) {
-                                        Enumeration<LedgerEntry> entries = 
lh.readEntries(0L, lac);
-                                        while (entries.hasMoreElements()) {
-                                            LedgerEntry entry = 
entries.nextElement();
-                                            dumpedEntries.add(new 
String(entry.getEntry(), UTF_8));
-                                        }
-                                    }
-                                } finally {
-                                    lh.close();
-                                }
-                            }
-                            if (printInprogressOnly) {
-                                if (segment.isInProgress()) {
-                                    ledgers.add(Pair.of(segment, 
dumpedEntries));
-                                }
-                            } else {
-                                ledgers.add(Pair.of(segment, EMPTY_LIST));
-                            }
-                        }
-                        synchronized (corruptedCandidates) {
-                            corruptedCandidates.put(s, ledgers);
-                        }
-                    }
-                } finally {
-                    dlm.close();
-                }
-            }
-        }
-
-        @Override
-        protected String getUsage() {
-            return "inspect [options]";
-        }
-    }
-
-    protected static class TruncateCommand extends PerDLCommand {
-
-        int numThreads = 1;
-        String streamPrefix = null;
-        boolean deleteStream = false;
-
-        TruncateCommand() {
-            super("truncate", "truncate streams under a given dl uri");
-            options.addOption("t", "threads", true, "Number threads to do 
truncation");
-            options.addOption("ft", "filter", true, "Stream filter by prefix");
-            options.addOption("d", "delete", false, "Delete Stream");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            if (cmdline.hasOption("t")) {
-                numThreads = Integer.parseInt(cmdline.getOptionValue("t"));
-            }
-            if (cmdline.hasOption("ft")) {
-                streamPrefix = cmdline.getOptionValue("ft");
-            }
-            if (cmdline.hasOption("d")) {
-                deleteStream = true;
-            }
-        }
-
-        @Override
-        protected String getUsage() {
-            return "truncate [options]";
-        }
-
-        protected void setFilter(String filter) {
-            this.streamPrefix = filter;
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            getConf().setZkAclId(getZkAclId());
-            return truncateStreams(getNamespace());
-        }
-
-        private int truncateStreams(final DistributedLogNamespace namespace) 
throws Exception {
-            Iterator<String> streamCollection = namespace.getLogs();
-            final List<String> streams = new ArrayList<String>();
-            while (streamCollection.hasNext()) {
-                String s = streamCollection.next();
-                if (null != streamPrefix) {
-                    if (s.startsWith(streamPrefix)) {
-                        streams.add(s);
-                    }
-                } else {
-                    streams.add(s);
-                }
-            }
-            if (0 == streams.size()) {
-                return 0;
-            }
-            System.out.println("Streams : " + streams);
-            if (!getForce() && !IOUtils.confirmPrompt("Do you want to truncate 
" + streams.size() + " streams ?")) {
-                return 0;
-            }
-            numThreads = Math.min(streams.size(), numThreads);
-            final int numStreamsPerThreads = streams.size() / numThreads + 1;
-            Thread[] threads = new Thread[numThreads];
-            for (int i = 0; i < numThreads; i++) {
-                final int tid = i;
-                threads[i] = new Thread("Truncate-" + i) {
-                    @Override
-                    public void run() {
-                        try {
-                            truncateStreams(namespace, streams, tid, 
numStreamsPerThreads);
-                            System.out.println("Thread " + tid + " finished.");
-                        } catch (IOException e) {
-                            System.err.println("Thread " + tid + " quits with 
exception : " + e.getMessage());
-                        }
-                    }
-                };
-                threads[i].start();
-            }
-            for (int i = 0; i < numThreads; i++) {
-                threads[i].join();
-            }
-            return 0;
-        }
-
-        private void truncateStreams(DistributedLogNamespace namespace, 
List<String> streams,
-                                     int tid, int numStreamsPerThreads) throws 
IOException {
-            int startIdx = tid * numStreamsPerThreads;
-            int endIdx = Math.min(streams.size(), (tid + 1) * 
numStreamsPerThreads);
-            for (int i = startIdx; i < endIdx; i++) {
-                String s = streams.get(i);
-                DistributedLogManager dlm = namespace.openLog(s);
-                try {
-                    if (deleteStream) {
-                        dlm.delete();
-                    } else {
-                        dlm.purgeLogsOlderThan(Long.MAX_VALUE);
-                    }
-                } finally {
-                    dlm.close();
-                }
-            }
-        }
-    }
-
-    public static class SimpleBookKeeperClient {
-        BookKeeperClient bkc;
-        ZooKeeperClient zkc;
-
-        public SimpleBookKeeperClient(DistributedLogConfiguration conf, URI 
uri) {
-            try {
-                zkc = ZooKeeperClientBuilder.newBuilder()
-                    .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
-                    .zkAclId(conf.getZkAclId())
-                    .uri(uri)
-                    .build();
-                BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
-                BKDLConfig.propagateConfiguration(bkdlConfig, conf);
-                bkc = BookKeeperClientBuilder.newBuilder()
-                        .zkc(zkc)
-                        .dlConfig(conf)
-                        .ledgersPath(bkdlConfig.getBkLedgersPath())
-                        .name("dlog")
-                        .build();
-            } catch (Exception e) {
-                close();
-            }
-        }
-        public BookKeeperClient client() {
-            return bkc;
-        }
-        public void close() {
-            if (null != bkc) {
-                bkc.close();
-            }
-            if (null != zkc) {
-                zkc.close();
-            }
-        }
-    }
-
-    protected static class ShowCommand extends PerStreamCommand {
-
-        SimpleBookKeeperClient bkc = null;
-        boolean listSegments = true;
-        boolean listEppStats = false;
-        long firstLid = 0;
-        long lastLid = -1;
-
-        ShowCommand() {
-            super("show", "show metadata of a given stream and list segments");
-            options.addOption("ns", "no-log-segments", false, "Do not list log 
segment metadata");
-            options.addOption("lp", "placement-stats", false, "Show ensemble 
placement stats");
-            options.addOption("fl", "first-ledger", true, "First log sement 
no");
-            options.addOption("ll", "last-ledger", true, "Last log sement no");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            if (cmdline.hasOption("fl")) {
-                try {
-                    firstLid = Long.parseLong(cmdline.getOptionValue("fl"));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid ledger id " + 
cmdline.getOptionValue("fl"));
-                }
-            }
-            if (firstLid < 0) {
-                throw new IllegalArgumentException("Invalid ledger id " + 
firstLid);
-            }
-            if (cmdline.hasOption("ll")) {
-                try {
-                    lastLid = Long.parseLong(cmdline.getOptionValue("ll"));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid ledger id " + 
cmdline.getOptionValue("ll"));
-                }
-            }
-            if (lastLid != -1 && firstLid > lastLid) {
-                throw new IllegalArgumentException("Invalid ledger ids " + 
firstLid + " " + lastLid);
-            }
-            listSegments = !cmdline.hasOption("ns");
-            listEppStats = cmdline.hasOption("lp");
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            DistributedLogManager dlm = 
getNamespace().openLog(getStreamName());
-            try {
-                if (listEppStats) {
-                    bkc = new SimpleBookKeeperClient(getConf(), getUri());
-                }
-                printMetadata(dlm);
-            } finally {
-                dlm.close();
-                if (null != bkc) {
-                    bkc.close();
-                }
-            }
-            return 0;
-        }
-
-        private void printMetadata(DistributedLogManager dlm) throws Exception 
{
-            printHeader(dlm);
-            if (listSegments) {
-                System.out.println("Ledgers : ");
-                List<LogSegmentMetadata> segments = dlm.getLogSegments();
-                for (LogSegmentMetadata segment : segments) {
-                    if (include(segment)) {
-                        printLedgerRow(segment);
-                    }
-                }
-            }
-        }
-
-        private void printHeader(DistributedLogManager dlm) throws Exception {
-            DLSN firstDlsn = Await.result(dlm.getFirstDLSNAsync());
-            boolean endOfStreamMarked = dlm.isEndOfStreamMarked();
-            DLSN lastDlsn = dlm.getLastDLSN();
-            long firstTxnId = dlm.getFirstTxId();
-            long lastTxnId = dlm.getLastTxId();
-            long recordCount = dlm.getLogRecordCount();
-            String result = String.format("Stream : (firstTxId=%d, 
lastTxid=%d, firstDlsn=%s, lastDlsn=%s, endOfStreamMarked=%b, recordCount=%d)",
-                firstTxnId, lastTxnId, getDlsnName(firstDlsn), 
getDlsnName(lastDlsn), endOfStreamMarked, recordCount);
-            System.out.println(result);
-            if (listEppStats) {
-                printEppStatsHeader(dlm);
-            }
-        }
-
-        boolean include(LogSegmentMetadata segment) {
-            return (firstLid <= segment.getLogSegmentSequenceNumber() && 
(lastLid == -1 || lastLid >= segment.getLogSegmentSequenceNumber()));
-        }
-
-        private void printEppStatsHeader(DistributedLogManager dlm) throws 
Exception {
-            String label = "Ledger Placement :";
-            System.out.println(label);
-            Map<BookieSocketAddress, Integer> totals = new 
HashMap<BookieSocketAddress, Integer>();
-            List<LogSegmentMetadata> segments = dlm.getLogSegments();
-            for (LogSegmentMetadata segment : segments) {
-                if (include(segment)) {
-                    merge(totals, getBookieStats(segment));
-                }
-            }
-            List<Map.Entry<BookieSocketAddress, Integer>> entries = new 
ArrayList<Map.Entry<BookieSocketAddress, Integer>>(totals.entrySet());
-            Collections.sort(entries, new 
Comparator<Map.Entry<BookieSocketAddress, Integer>>() {
-                @Override
-                public int compare(Map.Entry<BookieSocketAddress, Integer> o1, 
Map.Entry<BookieSocketAddress, Integer> o2) {
-                    return o2.getValue() - o1.getValue();
-                }
-            });
-            int width = 0;
-            int totalEntries = 0;
-            for (Map.Entry<BookieSocketAddress, Integer> entry : entries) {
-                width = Math.max(width, label.length() + 1 + 
entry.getKey().toString().length());
-                totalEntries += entry.getValue();
-            }
-            for (Map.Entry<BookieSocketAddress, Integer> entry : entries) {
-                System.out.println(String.format("%"+width+"s\t%6.2f%%\t\t%d", 
entry.getKey(), entry.getValue()*1.0/totalEntries, entry.getValue()));
-            }
-        }
-
-        private void printLedgerRow(LogSegmentMetadata segment) throws 
Exception {
-            System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " 
+ segment);
-        }
-
-        private Map<BookieSocketAddress, Integer> 
getBookieStats(LogSegmentMetadata segment) throws Exception {
-            Map<BookieSocketAddress, Integer> stats = new 
HashMap<BookieSocketAddress, Integer>();
-            LedgerHandle lh = 
bkc.client().get().openLedgerNoRecovery(segment.getLogSegmentId(), 
BookKeeper.DigestType.CRC32,
-                    getConf().getBKDigestPW().getBytes(UTF_8));
-            long eidFirst = 0;
-            for (SortedMap.Entry<Long, ArrayList<BookieSocketAddress>> entry : 
LedgerReader.bookiesForLedger(lh).entrySet()) {
-                long eidLast = entry.getKey().longValue();
-                long count = eidLast - eidFirst + 1;
-                for (BookieSocketAddress bookie : entry.getValue()) {
-                    merge(stats, bookie, (int) count);
-                }
-                eidFirst = eidLast;
-            }
-            return stats;
-        }
-
-        void merge(Map<BookieSocketAddress, Integer> m, BookieSocketAddress 
bookie, Integer count) {
-            if (m.containsKey(bookie)) {
-                m.put(bookie, count + m.get(bookie).intValue());
-            } else {
-                m.put(bookie, count);
-            }
-        }
-
-        void merge(Map<BookieSocketAddress, Integer> m1, 
Map<BookieSocketAddress, Integer> m2) {
-            for (Map.Entry<BookieSocketAddress, Integer> entry : 
m2.entrySet()) {
-                merge(m1, entry.getKey(), entry.getValue());
-            }
-        }
-
-        String getDlsnName(DLSN dlsn) {
-            if (dlsn.equals(DLSN.InvalidDLSN)) {
-                return "InvalidDLSN";
-            }
-            return dlsn.toString();
-        }
-
-        @Override
-        protected String getUsage() {
-            return "show [options]";
-        }
-    }
-
-    static class CountCommand extends PerStreamCommand {
-
-        DLSN startDLSN = null;
-        DLSN endDLSN = null;
-
-        protected CountCommand() {
-            super("count", "count number records between dlsns");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            String[] args = cmdline.getArgs();
-            if (args.length < 1) {
-                throw new ParseException("Must specify at least start dlsn.");
-            }
-            if (args.length >= 1) {
-                startDLSN = parseDLSN(args[0]);
-            }
-            if (args.length >= 2) {
-                endDLSN = parseDLSN(args[1]);
-            }
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            DistributedLogManager dlm = 
getNamespace().openLog(getStreamName());
-            try {
-                long count = 0;
-                if (null == endDLSN) {
-                    count = countToLastRecord(dlm);
-                } else {
-                    count = countFromStartToEnd(dlm);
-                }
-                System.out.println("total is " + count + " records.");
-                return 0;
-            } finally {
-                dlm.close();
-            }
-        }
-
-        int countFromStartToEnd(DistributedLogManager dlm) throws Exception {
-            int count = 0;
-            try {
-                LogReader reader = dlm.getInputStream(startDLSN);
-                try {
-                    LogRecordWithDLSN record = reader.readNext(false);
-                    LogRecordWithDLSN preRecord = record;
-                    System.out.println("first record : " + record);
-                    while (null != record) {
-                        if (record.getDlsn().compareTo(endDLSN) > 0) {
-                            break;
-                        }
-                        ++count;
-                        if (count % 1000 == 0) {
-                            logger.info("read {} records from {}...", count, 
getStreamName());
-                        }
-                        preRecord = record;
-                        record = reader.readNext(false);
-                    }
-                    System.out.println("last record : " + preRecord);
-                } finally {
-                    reader.close();
-                }
-            } finally {
-                dlm.close();
-            }
-            return count;
-        }
-
-        long countToLastRecord(DistributedLogManager dlm) throws Exception {
-            return 
Await.result(dlm.getLogRecordCountAsync(startDLSN)).longValue();
-        }
-
-        @Override
-        protected String getUsage() {
-            return "count <start> <end>";
-        }
-    }
-
-    public static class DeleteCommand extends PerStreamCommand {
-
-        protected DeleteCommand() {
-            super("delete", "delete a given stream");
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            getConf().setZkAclId(getZkAclId());
-            DistributedLogManager dlm = 
getNamespace().openLog(getStreamName());
-            try {
-                dlm.delete();
-            } finally {
-                dlm.close();
-            }
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "delete";
-        }
-    }
-
-    public static class DeleteLedgersCommand extends PerDLCommand {
-
-        private final List<Long> ledgers = new ArrayList<Long>();
-
-        int numThreads = 1;
-
-        protected DeleteLedgersCommand() {
-            super("delete_ledgers", "delete given ledgers");
-            options.addOption("l", "ledgers", true, "List of ledgers, 
separated by comma");
-            options.addOption("lf", "ledgers-file", true, "File of list of 
ledgers, each line has a ledger id");
-            options.addOption("t", "concurrency", true, "Number of threads to 
run deletions");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            if (cmdline.hasOption("l") && cmdline.hasOption("lf")) {
-                throw new ParseException("Please specify ledgers: either use 
list or use file only.");
-            }
-            if (!cmdline.hasOption("l") && !cmdline.hasOption("lf")) {
-                throw new ParseException("No ledgers specified. Please specify 
ledgers either use list or use file only.");
-            }
-            if (cmdline.hasOption("l")) {
-                String ledgersStr = cmdline.getOptionValue("l");
-                String[] ledgerStrs = ledgersStr.split(",");
-                for (String ledgerStr : ledgerStrs) {
-                    ledgers.add(Long.parseLong(ledgerStr));
-                }
-            }
-            if (cmdline.hasOption("lf")) {
-                BufferedReader br = null;
-                try {
-
-                    br = new BufferedReader(new InputStreamReader(
-                            new FileInputStream(new 
File(cmdline.getOptionValue("lf"))), UTF_8.name()));
-                    String line;
-                    while ((line = br.readLine()) != null) {
-                        ledgers.add(Long.parseLong(line));
-                    }
-                } catch (FileNotFoundException e) {
-                    throw new ParseException("No ledgers file " + 
cmdline.getOptionValue("lf") + " found.");
-                } catch (IOException e) {
-                    throw new ParseException("Invalid ledgers file " + 
cmdline.getOptionValue("lf") + " found.");
-                } finally {
-                    if (null != br) {
-                        try {
-                            br.close();
-                        } catch (IOException e) {
-                            // no-op
-                        }
-                    }
-                }
-            }
-            if (cmdline.hasOption("t")) {
-                numThreads = Integer.parseInt(cmdline.getOptionValue("t"));
-            }
-        }
-
-        @Override
-        protected String getUsage() {
-            return "delete_ledgers [options]";
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
-            try {
-                final AtomicInteger numLedgers = new AtomicInteger(0);
-                final CountDownLatch doneLatch = new 
CountDownLatch(numThreads);
-                final AtomicInteger numFailures = new AtomicInteger(0);
-                final LinkedBlockingQueue<Long> ledgerQueue =
-                        new LinkedBlockingQueue<Long>();
-                ledgerQueue.addAll(ledgers);
-                for (int i = 0; i < numThreads; i++) {
-                    final int tid = i;
-                    executorService.submit(new Runnable() {
-                        @Override
-                        public void run() {
-                            while (true) {
-                                Long ledger = ledgerQueue.poll();
-                                if (null == ledger) {
-                                    break;
-                                }
-                                try {
-                                    
getBookKeeperClient().get().deleteLedger(ledger);
-                                    int numLedgersDeleted = 
numLedgers.incrementAndGet();
-                                    if (numLedgersDeleted % 1000 == 0) {
-                                        System.out.println("Deleted " + 
numLedgersDeleted + " ledgers.");
-                                    }
-                                } catch 
(BKException.BKNoSuchLedgerExistsException e) {
-                                    int numLedgersDeleted = 
numLedgers.incrementAndGet();
-                                    if (numLedgersDeleted % 1000 == 0) {
-                                        System.out.println("Deleted " + 
numLedgersDeleted + " ledgers.");
-                                    }
-                                } catch (Exception e) {
-                                    numFailures.incrementAndGet();
-                                    break;
-                                }
-                            }
-                            doneLatch.countDown();
-                            System.out.println("Thread " + tid + " quits");
-                        }
-                    });
-                }
-                doneLatch.await();
-                if (numFailures.get() > 0) {
-                    throw new IOException("Encounter " + numFailures.get() + " 
failures during deleting ledgers");
-                }
-            } finally {
-                executorService.shutdown();
-            }
-            return 0;
-        }
-    }
-
-    public static class CreateCommand extends PerDLCommand {
-
-        final List<String> streams = new ArrayList<String>();
-
-        String streamPrefix = null;
-        String streamExpression = null;
-
-        CreateCommand() {
-            super("create", "create streams under a given namespace");
-            options.addOption("r", "prefix", true, "Prefix of stream name. 
E.g. 'QuantumLeapTest-'.");
-            options.addOption("e", "expression", true, "Expression to generate 
stream suffix. " +
-                              "Currently we support range 'x-y', list 'x,y,z' 
and name 'xyz'");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            if (cmdline.hasOption("r")) {
-                streamPrefix = cmdline.getOptionValue("r");
-            }
-            if (cmdline.hasOption("e")) {
-                streamExpression = cmdline.getOptionValue("e");
-            }
-            if (null == streamPrefix || null == streamExpression) {
-                throw new ParseException("Please specify stream prefix & 
expression.");
-            }
-        }
-
-        protected void generateStreams(String streamPrefix, String 
streamExpression) throws ParseException {
-            // parse the stream expression
-            if (streamExpression.contains("-")) {
-                // a range expression
-                String[] parts = streamExpression.split("-");
-                if (parts.length != 2) {
-                    throw new ParseException("Invalid stream index range : " + 
streamExpression);
-                }
-                try {
-                    int start = Integer.parseInt(parts[0]);
-                    int end = Integer.parseInt(parts[1]);
-                    if (start > end) {
-                        throw new ParseException("Invalid stream index range : 
" + streamExpression);
-                    }
-                    for (int i = start; i <= end; i++) {
-                        streams.add(streamPrefix + i);
-                    }
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid stream index range : " + 
streamExpression);
-                }
-            } else if (streamExpression.contains(",")) {
-                // a list expression
-                String[] parts = streamExpression.split(",");
-                try {
-                    for (String part : parts) {
-                        int idx = Integer.parseInt(part);
-                        streams.add(streamPrefix + idx);
-                    }
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid stream suffix list : " + 
streamExpression);
-                }
-            } else {
-                streams.add(streamPrefix + streamExpression);
-            }
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            generateStreams(streamPrefix, streamExpression);
-            if (streams.isEmpty()) {
-                System.out.println("Nothing to create.");
-                return 0;
-            }
-            if (!getForce() && !IOUtils.confirmPrompt("You are going to create 
streams : " + streams)) {
-                return 0;
-            }
-            getConf().setZkAclId(getZkAclId());
-            for (String stream : streams) {
-                getNamespace().createLog(stream);
-            }
-            return 0;
-        }
-
-        @Override
-        protected String getUsage() {
-            return "create [options]";
-        }
-
-        protected void setPrefix(String prefix) {
-            this.streamPrefix = prefix;
-        }
-
-        protected void setExpression(String expression) {
-            this.streamExpression = expression;
-        }
-    }
-
-    protected static class DumpCommand extends PerStreamCommand {
-
-        boolean printHex = false;
-        boolean skipPayload = false;
-        Long fromTxnId = null;
-        DLSN fromDLSN = null;
-        int count = 100;
-
-        DumpCommand() {
-            super("dump", "dump records of a given stream");
-            options.addOption("x", "hex", false, "Print record in hex format");
-            options.addOption("sp", "skip-payload", false, "Skip printing the 
payload of the record");
-            options.addOption("o", "offset", true, "Txn ID to start dumping.");
-            options.addOption("n", "seqno", true, "Sequence Number to start 
dumping");
-            options.addOption("e", "eid", true, "Entry ID to start dumping");
-            options.addOption("t", "slot", true, "Slot to start dumping");
-            options.addOption("l", "limit", true, "Number of entries to dump. 
Default is 100.");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            printHex = cmdline.hasOption("x");
-            skipPayload = cmdline.hasOption("sp");
-            if (cmdline.hasOption("o")) {
-                try {
-                    fromTxnId = Long.parseLong(cmdline.getOptionValue("o"));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid txn id " + 
cmdline.getOptionValue("o"));
-                }
-            }
-            if (cmdline.hasOption("l")) {
-                try {
-                    count = Integer.parseInt(cmdline.getOptionValue("l"));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid count " + 
cmdline.getOptionValue("l"));
-                }
-                if (count <= 0) {
-                    throw new ParseException("Negative count found : " + 
count);
-                }
-            }
-            if (cmdline.hasOption("n")) {
-                long seqno;
-                try {
-                    seqno = Long.parseLong(cmdline.getOptionValue("n"));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid sequence number " + 
cmdline.getOptionValue("n"));
-                }
-                long eid;
-                if (cmdline.hasOption("e")) {
-                    eid = Long.parseLong(cmdline.getOptionValue("e"));
-                } else {
-                    eid = 0;
-                }
-                long slot;
-                if (cmdline.hasOption("t")) {
-                    slot = Long.parseLong(cmdline.getOptionValue("t"));
-                } else {
-                    slot = 0;
-                }
-                fromDLSN = new DLSN(seqno, eid, slot);
-            }
-            if (null == fromTxnId && null == fromDLSN) {
-                throw new ParseException("No start Txn/DLSN is specified.");
-            }
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            DistributedLogManager dlm = 
getNamespace().openLog(getStreamName());
-            long totalCount = dlm.getLogRecordCount();
-            try {
-                AsyncLogReader reader;
-                Object startOffset;
-                try {
-                    DLSN lastDLSN = Await.result(dlm.getLastDLSNAsync());
-                    System.out.println("Last DLSN : " + lastDLSN);
-                    if (null == fromDLSN) {
-                        reader = dlm.getAsyncLogReader(fromTxnId);
-                        startOffset = fromTxnId;
-                    } else {
-                        reader = dlm.getAsyncLogReader(fromDLSN);
-                        startOffset = fromDLSN;
-                    }
-                } catch (LogNotFoundException lee) {
-                    System.out.println("No stream found to dump records.");
-                    return 0;
-                }
-                try {
-                    System.out.println(String.format("Dump records for %s 
(from = %s, dump count = %d, total records = %d)",
-                            getStreamName(), startOffset, count, totalCount));
-
-                    dumpRecords(reader);
-                } finally {
-                    Utils.close(reader);
-                }
-            } finally {
-                dlm.close();
-            }
-            return 0;
-        }
-
-        private void dumpRecords(AsyncLogReader reader) throws Exception {
-            int numRead = 0;
-            LogRecord record = Await.result(reader.readNext());
-            while (record != null) {
-                // dump the record
-                dumpRecord(record);
-                ++numRead;
-                if (numRead >= count) {
-                    break;
-                }
-                record = Await.result(reader.readNext());
-            }
-            if (numRead == 0) {
-                System.out.println("No records.");
-            } else {
-                
System.out.println("------------------------------------------------");
-            }
-        }
-
-        private void dumpRecord(LogRecord record) {
-            
System.out.println("------------------------------------------------");
-            if (record instanceof LogRecordWithDLSN) {
-                System.out.println("Record (txn = " + 
record.getTransactionId() + ", bytes = "
-                        + record.getPayload().length + ", dlsn = "
-                        + ((LogRecordWithDLSN) record).getDlsn() + ", sequence 
id = "
-                        + ((LogRecordWithDLSN) record).getSequenceId() + ")");
-            } else {
-                System.out.println("Record (txn = " + 
record.getTransactionId() + ", bytes = "
-                        + record.getPayload().length + ")");
-            }
-            System.out.println("");
-
-            if (skipPayload) {
-                return;
-            }
-
-            if (printHex) {
-                System.out.println(Hex.encodeHexString(record.getPayload()));
-            } else {
-                System.out.println(new String(record.getPayload(), UTF_8));
-            }
-        }
-
-        @Override
-        protected String getUsage() {
-            return "dump [options]";
-        }
-
-        protected void setFromTxnId(Long fromTxnId) {
-            this.fromTxnId = fromTxnId;
-        }
-    }
-
-    /**
-     * TODO: refactor inspect & inspectstream
-     * TODO: support force
-     *
-     * inspectstream -lac -gap (different options for different operations for 
a single stream)
-     * inspect -lac -gap (inspect the namespace, which will use inspect stream)
-     */
-    static class InspectStreamCommand extends PerStreamCommand {
-
-        InspectStreamCommand() {
-            super("inspectstream", "Inspect a given stream to identify any 
metadata corruptions");
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            DistributedLogManager dlm = 
getNamespace().openLog(getStreamName());
-            try {
-                return inspectAndRepair(dlm.getLogSegments());
-            } finally {
-                dlm.close();
-            }
-        }
-
-        protected int inspectAndRepair(List<LogSegmentMetadata> segments) 
throws Exception {
-            LogSegmentMetadataStore metadataStore = 
getLogSegmentMetadataStore();
-            ZooKeeperClient zkc = getZooKeeperClient();
-            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
-            BKDLConfig.propagateConfiguration(bkdlConfig, getConf());
-            BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder()
-                    .dlConfig(getConf())
-                    .zkServers(bkdlConfig.getBkZkServersForReader())
-                    .ledgersPath(bkdlConfig.getBkLedgersPath())
-                    .name("dlog")
-                    .build();
-            try {
-                List<LogSegmentMetadata> segmentsToRepair = 
inspectLogSegments(bkc, segments);
-                if (segmentsToRepair.isEmpty()) {
-                    System.out.println("The stream is good. No log segments to 
repair.");
-                    return 0;
-                }
-                System.out.println(segmentsToRepair.size() + " segments to 
repair : ");
-                System.out.println(segmentsToRepair);
-                System.out.println();
-                if (!IOUtils.confirmPrompt("Do you want to repair them (Y/N): 
")) {
-                    return 0;
-                }
-                repairLogSegments(metadataStore, bkc, segmentsToRepair);
-                return 0;
-            } finally {
-                bkc.close();
-            }
-        }
-
-        protected List<LogSegmentMetadata> inspectLogSegments(
-                BookKeeperClient bkc, List<LogSegmentMetadata> segments) 
throws Exception {
-            List<LogSegmentMetadata> segmentsToRepair = new 
ArrayList<LogSegmentMetadata>();
-            for (LogSegmentMetadata segment : segments) {
-                if (!segment.isInProgress() && !inspectLogSegment(bkc, 
segment)) {
-                    segmentsToRepair.add(segment);
-                }
-            }
-            return segmentsToRepair;
-        }
-
-        /**
-         * Inspect a given log segment.
-         *
-         * @param bkc
-         *          bookkeeper client
-         * @param metadata
-         *          metadata of the log segment to
-         * @return true if it is a good stream, false if the stream has 
inconsistent metadata.
-         * @throws Exception
-         */
-        protected boolean inspectLogSegment(BookKeeperClient bkc,
-                                            LogSegmentMetadata metadata) 
throws Exception {
-            if (metadata.isInProgress()) {
-                System.out.println("Skip inprogress log segment " + metadata);
-                return true;
-            }
-            long ledgerId = metadata.getLogSegmentId();
-            LedgerHandle lh = bkc.get().openLedger(ledgerId, 
BookKeeper.DigestType.CRC32,
-                    getConf().getBKDigestPW().getBytes(UTF_8));
-            LedgerHandle readLh = bkc.get().openLedger(ledgerId, 
BookKeeper.DigestType.CRC32,
-                    getConf().getBKDigestPW().getBytes(UTF_8));
-            LedgerReader lr = new LedgerReader(bkc.get());
-            final AtomicReference<List<LedgerEntry>> entriesHolder = new 
AtomicReference<List<LedgerEntry>>(null);
-            final AtomicInteger rcHolder = new AtomicInteger(-1234);
-            final CountDownLatch doneLatch = new CountDownLatch(1);
-            try {
-                lr.forwardReadEntriesFromLastConfirmed(readLh, new 
BookkeeperInternalCallbacks.GenericCallback<List<LedgerEntry>>() {
-                    @Override
-                    public void operationComplete(int rc, List<LedgerEntry> 
entries) {
-                        rcHolder.set(rc);
-                        entriesHolder.set(entries);
-                        doneLatch.countDown();
-                    }
-                });
-                doneLatch.await();
-                if (BKException.Code.OK != rcHolder.get()) {
-                    throw BKException.create(rcHolder.get());
-                }
-                List<LedgerEntry> entries = entriesHolder.get();
-                long lastEntryId;
-                if (entries.isEmpty()) {
-                    lastEntryId = LedgerHandle.INVALID_ENTRY_ID;
-                } else {
-                    LedgerEntry lastEntry = entries.get(entries.size() - 1);
-                    lastEntryId = lastEntry.getEntryId();
-                }
-                if (lastEntryId != lh.getLastAddConfirmed()) {
-                    System.out.println("Inconsistent Last Add Confirmed Found 
for LogSegment " + metadata.getLogSegmentSequenceNumber() + ": ");
-                    System.out.println("\t metadata: " + metadata);
-                    System.out.println("\t lac in ledger metadata is " + 
lh.getLastAddConfirmed() + ", but lac in bookies is " + lastEntryId);
-                    return false;
-                } else {
-                    return true;
-                }
-            } finally {
-                lh.close();
-                readLh.close();
-            }
-        }
-
-        protected void repairLogSegments(LogSegmentMetadataStore metadataStore,
-                                         BookKeeperClient bkc,
-                                         List<LogSegmentMetadata> segments) 
throws Exception {
-            BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get());
-            try {
-                MetadataUpdater metadataUpdater = 
LogSegmentMetadataStoreUpdater.createMetadataUpdater(
-                        getConf(), metadataStore);
-                for (LogSegmentMetadata segment : segments) {
-                    repairLogSegment(bkAdmin, metadataUpdater, segment);
-                }
-            } finally {
-                bkAdmin.close();
-            }
-        }
-
-        protected void repairLogSegment(BookKeeperAdmin bkAdmin,
-                                        MetadataUpdater metadataUpdater,
-                                        LogSegmentMetadata segment) throws 
Exception {
-            if (segment.isInProgress()) {
-                System.out.println("Skip inprogress log segment " + segment);
-                return;
-            }
-            LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId(), 
true);
-            long lac = lh.getLastAddConfirmed();
-            Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac);
-            if (!entries.hasMoreElements()) {
-                throw new IOException("Entry " + lac + " isn't found for " + 
segment);
-            }
-            LedgerEntry lastEntry = entries.nextElement();
-            Entry.Reader reader = Entry.newBuilder()
-                    .setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), 
segment.getStartSequenceId())
-                    .setEntryId(lastEntry.getEntryId())
-                    
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
-                    .setInputStream(lastEntry.getEntryInputStream())
-                    .buildReader();
-            LogRecordWithDLSN record = reader.nextRecord();
-            LogRecordWithDLSN lastRecord = null;
-            while (null != record) {
-                lastRecord = record;
-                record = reader.nextRecord();
-            }
-            if (null == lastRecord) {
-                throw new IOException("No record found in entry " + lac + " 
for " + segment);
-            }
-            System.out.println("Updating last record for " + segment + " to " 
+ lastRecord);
-            if (!IOUtils.confirmPrompt("Do you want to make this change (Y/N): 
")) {
-                return;
-            }
-            metadataUpdater.updateLastRecord(segment, lastRecord);
-        }
-
-        @Override
-        protected String getUsage() {
-            return "inspectstream [options]";
-        }
-    }
-
-    static interface BKCommandRunner {
-        int run(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception;
-    }
-
-    abstract static class PerBKCommand extends PerDLCommand {
-
-        protected PerBKCommand(String name, String description) {
-            super(name, description);
-        }
-
-        @Override
-        protected int runCmd() throws Exception {
-            return runBKCommand(new BKCommandRunner() {
-                @Override
-                public int run(ZooKeeperClient zkc, BookKeeperClient bkc) 
throws Exception {
-                    return runBKCmd(zkc, bkc);
-                }
-            });
-        }
-
-        protected int runBKCommand(BKCommandRunner runner) throws Exception {
-            return runner.run(getZooKeeperClient(), getBookKeeperClient());
-        }
-
-        abstract protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient 
bkc) throws Exception;
-    }
-
-    static class RecoverCommand extends PerBKCommand {
-
-        final List<Long> ledgers = new ArrayList<Long>();
-        boolean query = false;
-        boolean dryrun = false;
-        boolean skipOpenLedgers = false;
-        boolean fenceOnly = false;
-        int fenceRate = 1;
-        int concurrency = 1;
-        final Set<BookieSocketAddress> bookiesSrc = new 
HashSet<BookieSocketAddress>();
-        int partition = 0;
-        int numPartitions = 0;
-
-        RecoverCommand() {
-            super("recover", "Recover the ledger data that stored on failed 
bookies");
-            options.addOption("l", "ledger", true, "Specific ledger to 
recover");
-            options.addOption("lf", "ledgerfile", true, "File contains ledgers 
list");
-            options.addOption("q", "query", false, "Query the ledgers that 
contain given bookies");
-            options.addOption("d", "dryrun", false, "Print the recovery plan 
w/o actually recovering");
-            options.addOption("cy", "concurrency", true, "Number of ledgers 
could be recovered in parallel");
-            options.addOption("sk", "skipOpenLedgers", false, "Skip recovering 
open ledgers");
-            options.addOption("p", "partition", true, "partition");
-            options.addOption("n", "num-partitions", true, "num partitions");
-            options.addOption("fo", "fence-only", true, "fence the ledgers 
only w/o re-replicating entries");
-            options.addOption("fr", "fence-rate", true, "rate on fencing 
ledgers");
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws 
ParseException {
-            super.parseCommandLine(cmdline);
-            query = cmdline.hasOption("q");
-            force = cmdline.hasOption("f");
-            dryrun = cmdline.hasOption("d");
-            skipOpenLedgers = cmdline.hasOption("sk");
-            fenceOnly = cmdline.hasOption("fo");
-            if (cmdline.hasOption("l")) {
-                String[] lidStrs = cmdline.getOptionValue("l").split(",");
-                try {
-                    for (String lidStr : lidStrs) {
-                        ledgers.add(Long.parseLong(lidStr));
-                    }
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid ledger id provided : " + 
cmdline.getOptionValue("l"));
-                }
-            }
-            if (cmdline.hasOption("lf")) {
-                String file = cmdline.getOptionValue("lf");
-                try {
-                    BufferedReader br = new BufferedReader(
-                            new InputStreamReader(new FileInputStream(file), 
UTF_8.name()));
-                    try {
-                        String line = br.readLine();
-
-                        while (line != null) {
-                            ledgers.add(Long.parseLong(line));
-                            line = br.readLine();
-                        }
-                    } finally {
-                        br.close();
-                    }
-                } catch (IOException e) {
-                    throw new ParseException("Invalid ledgers file provided : 
" + file);
-                }
-            }
-            if (cmdline.hasOption("cy")) {
-                try {
-                    concurrency = 
Integer.parseInt(cmdline.getOptionValue("cy"));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid concurrency provided : " 
+ cmdline.getOptionValue("cy"));
-                }
-            }
-            if (cmdline.hasOption("p")) {
-                partition = Integer.parseInt(cmdline.getOptionValue("p"));
-            }
-            if (cmdline.hasOption("n")) {
-                numPartitions = Integer.parseInt(cmdline.getOptionValue("n"));
-            }
-            if (cmdline.hasOption("fr")) {
-                fenceRate = Integer.parseInt(cmdline.getOptionValue("fr"));
-            }
-            // Get bookies list to recover
-            String[] args = cmdline.getArgs();
-            final String[] bookieStrs = args[0].split(",");
-            for (String bookieStr : bookieStrs) {
-                final String bookieStrParts[] = bookieStr.split(":");
-                if (bookieStrParts.length != 2) {
-                    throw new ParseException("BookieSrcs has invalid bookie 
address format (host:port expected) : "
-                            + bookieStr);
-                }
-                try {
-                    bookiesSrc.add(new BookieSocketAddress(bookieStrParts[0],
-                            Integer.parseInt(bookieStrParts[1])));
-                } catch (NumberFormatException nfe) {
-                    throw new ParseException("Invalid ledger id provided : " + 
cmdline.getOptionValue("l"));
-                }
-            }
-        }
-
-        @Override
-        protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) 
throws Exception {
-            BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get());
-            try {
-                if (query) {
-                    return bkQuery(bkAdmin, bookiesSrc);
-                }
-                if (fenceOnly) {
-                    return bkFence(bkc, ledgers, fenceRate);
-                }
-                if (!force) {
-                    System.out.println("Bookies : " + bookiesSrc);
-                    if (!IOUtils.confirmPrompt("Do you want to recover them: 
(Y/N)")) {
-                        return -1;
-                    }
-                }
-                if (!ledgers.isEmpty()) {
-                    System.out.println("Ledgers : " + ledgers);
-                    long numProcessed = 0;
-                    Iterator<Long> ledgersIter = ledgers.iterator();
-                    LinkedBlockingQueue<Long> ledgersToProcess = new 
LinkedBlockingQueue<Long>();
-                    while (ledgersIter.hasNext()) {
-                        long lid = ledgersIter.next();
-                        if (numPartitions <=0 || (numPartitions > 0 && lid % 
numPartitions == partition)) {
-                            ledgersToProcess.add(lid);
-                            ++numProcessed;
-                        }
-                        if (ledgersToProcess.size() == 10000) {
-                            System.out.println("Processing " + numProcessed + 
" ledgers");
-                            bkRecovery(ledgersToProcess, bookiesSrc, dryrun, 
skipOpenLedgers);
-                            ledgersToProcess.clear();
-                            System.out.println("Processed " + numProcessed + " 
ledgers");
-                        }
-                    }
-                    if (!ledgersToProcess.isEmpty()) {
-                        System.out.println("Processing " + numProcessed + " 
ledgers");
-                        bkRecovery(ledgersToProcess, bookiesSrc, dryrun, 
skipOpenLedgers);
-                        System.out.println("Processed " + numProcessed + " 
ledgers");
-                    }
-                    System.out.println("Done.");
-                    CountDownLatch latch = new CountDownLatch(1);
-                    latch.await();
-                    return 0;
-                }
-                return bkRecovery(bkAdmin, bookiesSrc, dryrun, 
skipOpenLedgers);
-            } finally {
-                bkAdmin.close();
-            }
-        }
-
-        private int bkFence(final BookKeeperClient bkc, List<Long> ledgers, 
int fenceRate) throws Exception {
-            if (ledgers.isEmpty()) {
-                System.out.println("Nothing to fence. Done.");
-                return 0;
-            }
-            ExecutorService executorService = Executors.newCachedThreadPool();
-            final RateLimiter rateLimiter = RateLimiter.create(fenceRate);
-            final byte[] passwd = getConf().getBKDigestPW().getBytes(UTF_8);
-            final CountDownLatch latch = new CountDownLatch(ledgers.size());
-            final AtomicInteger numPendings = new 
AtomicInteger(ledgers.size());
-            final LinkedBlockingQueue<Long> ledgersQueue = new 
LinkedBlockingQueue<Long>();
-            ledgersQueue.addAll(ledgers);
-
-            for (int i = 0; i < concurrency; i++) {
-                executorService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        while (!ledgersQueue.isEmpty()) {
-                            rateLimiter.acquire();
-                            Long lid = ledgersQueue.poll();
-                            if (null == lid) {
-                                break;
-                            }
-                            System.out.println("Fencing ledger " + lid);
-                            int numRetries = 3;
-                            while (numRetries > 0) {
-                                try {
-                                    LedgerHandle lh = 
bkc.get().openLedger(lid, BookKeeper.DigestType.CRC32, passwd);
-                                    lh.close();
-                                    System.out.println("Fenced ledger " + lid 
+ ", " + numPendings.decrementAndGet() + " left.");
-                                    latch.countDown();
-                                } catch 
(BKException.BKNoSuchLedgerExistsException bke) {
-                                    System.out.println("Skipped fence 
non-exist ledger " + lid + ", " + numPendings.decrementAndGet() + " left.");
-                                    latch.countDown();
-                                } catch (BKException.BKLedgerRecoveryException 
lre) {
-                                    --numRetries;
-                                    continue;
-                                } catch (Exception e) {
-                                    e.printStackTrace();
-                                    break;
-                                }
-                                numRetries = 0;
-                            }
-                        }
-                        System.out.println("Thread exits");
-                    }
-                });
-            }
-            latch.await();
-            SchedulerUtils.shutdownScheduler(executorService, 2, 
TimeUnit.MINUTES);
-            return 0;
-        }
-
-        private int bkQuery(BookKeeperAdmin bkAdmin, Set<BookieSocketAddress> 
bookieAddrs)
-                throws InterruptedException, BKException {
-            SortedMap<Long, LedgerMetadata> ledgersContainBookies =
-                    bkAdmin.getLedgersContainBookies(bookieAddrs);
-            System.err.println("NOTE: Bookies in inspection list are marked 
with '*'.");
-            for (Map.Entry<Long, LedgerMetadata> ledger : 
ledgersContainBookies.entrySet()) {
-                System.out.println("ledger " + ledger.getKey() + " : " + 
ledger.getValue().getState());
-                Map<Long, Integer> numBookiesToReplacePerEnsemble =
-                        inspectLedger(ledger.getValue(), bookieAddrs);
-                System.out.print("summary: [");
-                for (Map.Entry<Long, Integer> entry : 
numBookiesToReplacePerEnsemble.entrySet()) {
-                    System.out.print(entry.getKey() + "=" + entry.getValue() + 
", ");
-                }
-                System.out.println("]");
-                System.out.println();
-            }
-            System.out.println("Done");
-            return 0;
-        }
-
-        private Map<Long, Integer> inspectLedger(LedgerMetadata metadata, 
Set<BookieSocketAddress> bookiesToInspect) {
-            Map<Long, Integer> numBookiesToReplacePerEnsemble = new 
TreeMap<Long, Integer>();
-            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> ensemble : 
metadata.getEnsembles().entrySet()) {
-                ArrayList<BookieSocketAddress> bookieList = 
ensemble.getValue();
-                System.out.print(ensemble.getKey() + ":\t");
-                int numBookiesToReplace = 0;
-                for (BookieSocketAddress bookie: bookieList) {
-                    System.out.print(bookie.toString());
-                    if (bookiesToInspect.contains(bookie)) {
-                        System.out.print("*");
-                        ++numBookiesToReplace;
-                    } else {
-                        System.out.print(" ");
-                    }
-                    System.out.print(" ");
-                }
-                System.out.println();
-                numBookiesToReplacePerEnsemble.put(ensemble.getKey(), 
numBookiesToReplace);
-            }
-            return numBookiesToReplacePerEnsemble;
-        }
-
-        private int bkRecovery(final LinkedBlockingQueue<Long> ledgers, final 
Set<BookieSocketAddress> bookieAddrs,
-                               final boolean dryrun, final boolean 
skipOpenLedgers)
-                throws Exception {
-            return runBKCommand(new BKCommandRunner() {
-                @Override
-                public int run(ZooKeeperClient zkc, BookKeeperClient bkc) 
throws Exception {
-                    BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get());
-                    try {
-                        bkRecovery(bkAdmin, ledgers, bookieAddrs, dryrun, 
skipOpenLedgers);
-                        return 0;
-                    } finally {
-                        bkAdmin.close();
-                    }
-                }
-            });
-        }
-
-        private int bkRecovery(final BookKeeperAdmin bkAdmin, final 
LinkedBlockingQueue<Long> ledgers,
-                               final Set<BookieSocketAddress> bookieAddrs,
-                               final boolean dryrun, final boolean 
skipOpenLedgers)
-                throws InterruptedException, BKException {
-            final AtomicInteger numPendings = new 
AtomicInteger(ledgers.size());
-            final ExecutorService executorService = 
Executors.newCachedThreadPool();
-            final CountDownLatch doneLatch = new CountDownLatch(concurrency);
-            Runnable r = new Runnable() {
-                @Override
-                public void run() {
-                    while (!ledgers.isEmpty()) {
-                        long lid = -1L;
-      

<TRUNCATED>

Reply via email to