http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java deleted file mode 100644 index 03d70bd..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java +++ /dev/null @@ -1,2873 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.tools; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.net.MalformedURLException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; -import com.twitter.distributedlog.BKDistributedLogNamespace; -import com.twitter.distributedlog.Entry; -import com.twitter.distributedlog.MetadataAccessor; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.distributedlog.impl.BKNamespaceDriver; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.distributedlog.namespace.NamespaceDriver; -import com.twitter.distributedlog.util.Utils; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeperAccessor; -import org.apache.bookkeeper.client.BookKeeperAdmin; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.LedgerMetadata; -import org.apache.bookkeeper.client.LedgerReader; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; -import org.apache.bookkeeper.util.IOUtils; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.RateLimiter; -import com.twitter.distributedlog.AsyncLogReader; -import com.twitter.distributedlog.AsyncLogWriter; -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.BookKeeperClientBuilder; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.LogReader; -import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.LogRecordWithDLSN; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.auditor.DLAuditor; -import com.twitter.distributedlog.bk.LedgerAllocator; -import com.twitter.distributedlog.bk.LedgerAllocatorUtils; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.MetadataUpdater; -import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.util.Await; -import com.twitter.util.FutureEventListener; - -import static com.google.common.base.Charsets.UTF_8; - -public class DistributedLogTool extends Tool { - - static final Logger logger = LoggerFactory.getLogger(DistributedLogTool.class); - - static final List<String> EMPTY_LIST = Lists.newArrayList(); - - static int compareByCompletionTime(long time1, long time2) { - return time1 > time2 ? 1 : (time1 < time2 ? -1 : 0); - } - - static final Comparator<LogSegmentMetadata> LOGSEGMENT_COMPARATOR_BY_TIME = new Comparator<LogSegmentMetadata>() { - @Override - public int compare(LogSegmentMetadata o1, LogSegmentMetadata o2) { - if (o1.isInProgress() && o2.isInProgress()) { - return compareByCompletionTime(o1.getFirstTxId(), o2.getFirstTxId()); - } else if (!o1.isInProgress() && !o2.isInProgress()) { - return compareByCompletionTime(o1.getCompletionTime(), o2.getCompletionTime()); - } else if (o1.isInProgress() && !o2.isInProgress()) { - return compareByCompletionTime(o1.getFirstTxId(), o2.getCompletionTime()); - } else { - return compareByCompletionTime(o1.getCompletionTime(), o2.getFirstTxId()); - } - } - }; - - static DLSN parseDLSN(String dlsnStr) throws ParseException { - if (dlsnStr.equals("InitialDLSN")) { - return DLSN.InitialDLSN; - } - String[] parts = dlsnStr.split(","); - if (parts.length != 3) { - throw new ParseException("Invalid dlsn : " + dlsnStr); - } - try { - return new DLSN(Long.parseLong(parts[0]), Long.parseLong(parts[1]), Long.parseLong(parts[2])); - } catch (Exception nfe) { - throw new ParseException("Invalid dlsn : " + dlsnStr); - } - } - - /** - * Per DL Command, which parses basic options. e.g. uri. - */ - protected abstract static class PerDLCommand extends OptsCommand { - - protected Options options = new Options(); - protected final DistributedLogConfiguration dlConf; - protected URI uri; - protected String zkAclId = null; - protected boolean force = false; - protected DistributedLogNamespace namespace = null; - - protected PerDLCommand(String name, String description) { - super(name, description); - dlConf = new DistributedLogConfiguration(); - // Tools are allowed to read old metadata as long as they can interpret it - dlConf.setDLLedgerMetadataSkipMinVersionCheck(true); - options.addOption("u", "uri", true, "DistributedLog URI"); - options.addOption("c", "conf", true, "DistributedLog Configuration File"); - options.addOption("a", "zk-acl-id", true, "Zookeeper ACL ID"); - options.addOption("f", "force", false, "Force command (no warnings or prompts)"); - } - - @Override - protected int runCmd(CommandLine commandLine) throws Exception { - try { - parseCommandLine(commandLine); - } catch (ParseException pe) { - System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'"); - printUsage(); - return -1; - } - try { - return runCmd(); - } finally { - if (null != namespace) { - namespace.close(); - } - } - } - - protected abstract int runCmd() throws Exception; - - @Override - protected Options getOptions() { - return options; - } - - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - if (!cmdline.hasOption("u")) { - throw new ParseException("No distributedlog uri provided."); - } - uri = URI.create(cmdline.getOptionValue("u")); - if (cmdline.hasOption("c")) { - String configFile = cmdline.getOptionValue("c"); - try { - dlConf.loadConf(new File(configFile).toURI().toURL()); - } catch (ConfigurationException e) { - throw new ParseException("Failed to load distributedlog configuration from " + configFile + "."); - } catch (MalformedURLException e) { - throw new ParseException("Failed to load distributedlog configuration from " + configFile + ": malformed uri."); - } - } - if (cmdline.hasOption("a")) { - zkAclId = cmdline.getOptionValue("a"); - } - if (cmdline.hasOption("f")) { - force = true; - } - } - - protected DistributedLogConfiguration getConf() { - return dlConf; - } - - protected URI getUri() { - return uri; - } - - protected void setUri(URI uri) { - this.uri = uri; - } - - protected String getZkAclId() { - return zkAclId; - } - - protected void setZkAclId(String zkAclId) { - this.zkAclId = zkAclId; - } - - protected boolean getForce() { - return force; - } - - protected void setForce(boolean force) { - this.force = force; - } - - protected DistributedLogNamespace getNamespace() throws IOException { - if (null == this.namespace) { - this.namespace = DistributedLogNamespaceBuilder.newBuilder() - .uri(getUri()) - .conf(getConf()) - .build(); - } - return this.namespace; - } - - protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException { - return getNamespace() - .getNamespaceDriver() - .getLogStreamMetadataStore(NamespaceDriver.Role.READER) - .getLogSegmentMetadataStore(); - } - - protected ZooKeeperClient getZooKeeperClient() throws IOException { - NamespaceDriver driver = getNamespace().getNamespaceDriver(); - assert(driver instanceof BKNamespaceDriver); - return ((BKNamespaceDriver) driver).getWriterZKC(); - } - - protected BookKeeperClient getBookKeeperClient() throws IOException { - NamespaceDriver driver = getNamespace().getNamespaceDriver(); - assert(driver instanceof BKNamespaceDriver); - return ((BKNamespaceDriver) driver).getReaderBKC(); - } - } - - /** - * Base class for simple command with no resource setup requirements. - */ - public abstract static class SimpleCommand extends OptsCommand { - - protected final Options options = new Options(); - - SimpleCommand(String name, String description) { - super(name, description); - } - - @Override - protected int runCmd(CommandLine commandLine) throws Exception { - try { - parseCommandLine(commandLine); - } catch (ParseException pe) { - System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'"); - printUsage(); - return -1; - } - return runSimpleCmd(); - } - - abstract protected int runSimpleCmd() throws Exception; - - abstract protected void parseCommandLine(CommandLine cmdline) throws ParseException; - - @Override - protected Options getOptions() { - return options; - } - } - - /** - * Per Stream Command, which parse common options for per stream. e.g. stream name. - */ - abstract static class PerStreamCommand extends PerDLCommand { - - protected String streamName; - - protected PerStreamCommand(String name, String description) { - super(name, description); - options.addOption("s", "stream", true, "Stream Name"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (!cmdline.hasOption("s")) { - throw new ParseException("No stream name provided."); - } - streamName = cmdline.getOptionValue("s"); - } - - protected String getStreamName() { - return streamName; - } - - protected void setStreamName(String streamName) { - this.streamName = streamName; - } - } - - /** - * NOTE: we might consider adding a command to 'delete' namespace. The implementation of the namespace - * driver should implement the 'delete' operation. - */ - protected static class DeleteAllocatorPoolCommand extends PerDLCommand { - - int concurrency = 1; - String allocationPoolPath = DistributedLogConstants.ALLOCATION_POOL_NODE; - - DeleteAllocatorPoolCommand() { - super("delete_allocator_pool", "Delete allocator pool for a given distributedlog instance"); - options.addOption("t", "concurrency", true, "Concurrency on deleting allocator pool."); - options.addOption("ap", "allocation-pool-path", true, "Ledger Allocation Pool Path"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (cmdline.hasOption("t")) { - concurrency = Integer.parseInt(cmdline.getOptionValue("t")); - if (concurrency <= 0) { - throw new ParseException("Invalid concurrency value : " + concurrency + ": it must be greater or equal to 0."); - } - } - if (cmdline.hasOption("ap")) { - allocationPoolPath = cmdline.getOptionValue("ap"); - if (!allocationPoolPath.startsWith(".") || !allocationPoolPath.contains("allocation")) { - throw new ParseException("Invalid allocation pool path : " + allocationPoolPath + ": it must starts with a '.' and must contains 'allocation'"); - } - } - } - - @Override - protected int runCmd() throws Exception { - String rootPath = getUri().getPath() + "/" + allocationPoolPath; - final ScheduledExecutorService allocationExecutor = Executors.newSingleThreadScheduledExecutor(); - ExecutorService executorService = Executors.newFixedThreadPool(concurrency); - Preconditions.checkArgument(getNamespace() instanceof BKDistributedLogNamespace); - BKDistributedLogNamespace bkns = (BKDistributedLogNamespace) getNamespace(); - final ZooKeeperClient zkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getWriterZKC(); - final BookKeeperClient bkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getReaderBKC(); - try { - List<String> pools = zkc.get().getChildren(rootPath, false); - final LinkedBlockingQueue<String> poolsToDelete = new LinkedBlockingQueue<String>(); - if (getForce() || IOUtils.confirmPrompt("Are you sure you want to delete allocator pools : " + pools)) { - for (String pool : pools) { - poolsToDelete.add(rootPath + "/" + pool); - } - final CountDownLatch doneLatch = new CountDownLatch(concurrency); - for (int i = 0; i < concurrency; i++) { - final int tid = i; - executorService.submit(new Runnable() { - @Override - public void run() { - while (!poolsToDelete.isEmpty()) { - String poolPath = poolsToDelete.poll(); - if (null == poolPath) { - break; - } - try { - LedgerAllocator allocator = - LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(), - zkc, bkc, - allocationExecutor); - allocator.delete(); - System.out.println("Deleted allocator pool : " + poolPath + " ."); - } catch (IOException ioe) { - System.err.println("Failed to delete allocator pool " + poolPath + " : " + ioe.getMessage()); - } - } - doneLatch.countDown(); - System.out.println("Thread " + tid + " is done."); - } - }); - } - doneLatch.await(); - } - } finally { - executorService.shutdown(); - allocationExecutor.shutdown(); - } - return 0; - } - - @Override - protected String getUsage() { - return "delete_allocator_pool"; - } - } - - public static class ListCommand extends PerDLCommand { - - boolean printMetadata = false; - boolean printHex = false; - - ListCommand() { - super("list", "list streams of a given distributedlog instance"); - options.addOption("m", "meta", false, "Print metadata associated with each stream"); - options.addOption("x", "hex", false, "Print metadata in hex format"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - printMetadata = cmdline.hasOption("m"); - printHex = cmdline.hasOption("x"); - } - - @Override - protected String getUsage() { - return "list [options]"; - } - - @Override - protected int runCmd() throws Exception { - printStreams(getNamespace()); - return 0; - } - - protected void printStreams(DistributedLogNamespace namespace) throws Exception { - Iterator<String> streams = namespace.getLogs(); - System.out.println("Streams under " + getUri() + " : "); - System.out.println("--------------------------------"); - while (streams.hasNext()) { - String streamName = streams.next(); - System.out.println(streamName); - if (!printMetadata) { - continue; - } - MetadataAccessor accessor = - namespace.getNamespaceDriver().getMetadataAccessor(streamName); - byte[] metadata = accessor.getMetadata(); - if (null == metadata || metadata.length == 0) { - continue; - } - if (printHex) { - System.out.println(Hex.encodeHexString(metadata)); - } else { - System.out.println(new String(metadata, UTF_8)); - } - System.out.println(""); - } - System.out.println("--------------------------------"); - } - } - - public static class WatchNamespaceCommand extends PerDLCommand implements NamespaceListener { - private Set<String> currentSet = Sets.<String>newHashSet(); - private CountDownLatch doneLatch = new CountDownLatch(1); - - WatchNamespaceCommand() { - super("watch", "watch and report changes for a dl namespace"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - } - - @Override - protected String getUsage() { - return "watch [options]"; - } - - @Override - protected int runCmd() throws Exception { - watchAndReportChanges(getNamespace()); - doneLatch.await(); - return 0; - } - - @Override - public synchronized void onStreamsChanged(Iterator<String> streams) { - Set<String> updatedSet = Sets.newHashSet(streams); - Set<String> oldStreams = Sets.difference(currentSet, updatedSet); - Set<String> newStreams = Sets.difference(updatedSet, currentSet); - currentSet = updatedSet; - - System.out.println("Old streams : "); - for (String stream : oldStreams) { - System.out.println(stream); - } - - System.out.println("New streams : "); - for (String stream : newStreams) { - System.out.println(stream); - } - - System.out.println(""); - } - - protected void watchAndReportChanges(DistributedLogNamespace namespace) throws Exception { - namespace.registerNamespaceListener(this); - } - } - - protected static class InspectCommand extends PerDLCommand { - - int numThreads = 1; - String streamPrefix = null; - boolean printInprogressOnly = false; - boolean dumpEntries = false; - boolean orderByTime = false; - boolean printStreamsOnly = false; - boolean checkInprogressOnly = false; - - InspectCommand() { - super("inspect", "Inspect streams under a given dl uri to find any potential corruptions"); - options.addOption("t", "threads", true, "Number threads to do inspection."); - options.addOption("ft", "filter", true, "Stream filter by prefix"); - options.addOption("i", "inprogress", false, "Print inprogress log segments only"); - options.addOption("d", "dump", false, "Dump entries of inprogress log segments"); - options.addOption("ot", "orderbytime", false, "Order the log segments by completion time"); - options.addOption("pso", "print-stream-only", false, "Print streams only"); - options.addOption("cio", "check-inprogress-only", false, "Check duplicated inprogress only"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (cmdline.hasOption("t")) { - numThreads = Integer.parseInt(cmdline.getOptionValue("t")); - } - if (cmdline.hasOption("ft")) { - streamPrefix = cmdline.getOptionValue("ft"); - } - printInprogressOnly = cmdline.hasOption("i"); - dumpEntries = cmdline.hasOption("d"); - orderByTime = cmdline.hasOption("ot"); - printStreamsOnly = cmdline.hasOption("pso"); - checkInprogressOnly = cmdline.hasOption("cio"); - } - - @Override - protected int runCmd() throws Exception { - SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates = - new TreeMap<String, List<Pair<LogSegmentMetadata, List<String>>>>(); - inspectStreams(corruptedCandidates); - System.out.println("Corrupted Candidates : "); - if (printStreamsOnly) { - System.out.println(corruptedCandidates.keySet()); - return 0; - } - for (Map.Entry<String, List<Pair<LogSegmentMetadata, List<String>>>> entry : corruptedCandidates.entrySet()) { - System.out.println(entry.getKey() + " : \n"); - for (Pair<LogSegmentMetadata, List<String>> pair : entry.getValue()) { - System.out.println("\t - " + pair.getLeft()); - if (printInprogressOnly && dumpEntries) { - int i = 0; - for (String entryData : pair.getRight()) { - System.out.println("\t" + i + "\t: " + entryData); - ++i; - } - } - } - System.out.println(); - } - return 0; - } - - private void inspectStreams(final SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates) - throws Exception { - Iterator<String> streamCollection = getNamespace().getLogs(); - final List<String> streams = new ArrayList<String>(); - while (streamCollection.hasNext()) { - String s = streamCollection.next(); - if (null != streamPrefix) { - if (s.startsWith(streamPrefix)) { - streams.add(s); - } - } else { - streams.add(s); - } - } - if (0 == streams.size()) { - return; - } - println("Streams : " + streams); - if (!getForce() && !IOUtils.confirmPrompt("Are you sure you want to inspect " + streams.size() + " streams")) { - return; - } - numThreads = Math.min(streams.size(), numThreads); - final int numStreamsPerThreads = streams.size() / numThreads; - Thread[] threads = new Thread[numThreads]; - for (int i = 0; i < numThreads; i++) { - final int tid = i; - threads[i] = new Thread("Inspect-" + i) { - @Override - public void run() { - try { - inspectStreams(streams, tid, numStreamsPerThreads, corruptedCandidates); - System.out.println("Thread " + tid + " finished."); - } catch (Exception e) { - System.err.println("Thread " + tid + " quits with exception : " + e.getMessage()); - } - } - }; - threads[i].start(); - } - for (int i = 0; i < numThreads; i++) { - threads[i].join(); - } - } - - private void inspectStreams(List<String> streams, - int tid, - int numStreamsPerThreads, - SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates) - throws Exception { - int startIdx = tid * numStreamsPerThreads; - int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads); - for (int i = startIdx; i < endIdx; i++) { - String s = streams.get(i); - BookKeeperClient bkc = getBookKeeperClient(); - DistributedLogManager dlm = getNamespace().openLog(s); - try { - List<LogSegmentMetadata> segments = dlm.getLogSegments(); - if (segments.size() <= 1) { - continue; - } - boolean isCandidate = false; - if (checkInprogressOnly) { - Set<Long> inprogressSeqNos = new HashSet<Long>(); - for (LogSegmentMetadata segment : segments) { - if (segment.isInProgress()) { - inprogressSeqNos.add(segment.getLogSegmentSequenceNumber()); - } - } - for (LogSegmentMetadata segment : segments) { - if (!segment.isInProgress() && inprogressSeqNos.contains(segment.getLogSegmentSequenceNumber())) { - isCandidate = true; - } - } - } else { - LogSegmentMetadata firstSegment = segments.get(0); - long lastSeqNo = firstSegment.getLogSegmentSequenceNumber(); - - for (int j = 1; j < segments.size(); j++) { - LogSegmentMetadata nextSegment = segments.get(j); - if (lastSeqNo + 1 != nextSegment.getLogSegmentSequenceNumber()) { - isCandidate = true; - break; - } - ++lastSeqNo; - } - } - if (isCandidate) { - if (orderByTime) { - Collections.sort(segments, LOGSEGMENT_COMPARATOR_BY_TIME); - } - List<Pair<LogSegmentMetadata, List<String>>> ledgers = - new ArrayList<Pair<LogSegmentMetadata, List<String>>>(); - for (LogSegmentMetadata seg : segments) { - LogSegmentMetadata segment = seg; - List<String> dumpedEntries = new ArrayList<String>(); - if (segment.isInProgress()) { - LedgerHandle lh = bkc.get().openLedgerNoRecovery(segment.getLogSegmentId(), BookKeeper.DigestType.CRC32, - dlConf.getBKDigestPW().getBytes(UTF_8)); - try { - long lac = lh.readLastConfirmed(); - segment = segment.mutator().setLastEntryId(lac).build(); - if (printInprogressOnly && dumpEntries && lac >= 0) { - Enumeration<LedgerEntry> entries = lh.readEntries(0L, lac); - while (entries.hasMoreElements()) { - LedgerEntry entry = entries.nextElement(); - dumpedEntries.add(new String(entry.getEntry(), UTF_8)); - } - } - } finally { - lh.close(); - } - } - if (printInprogressOnly) { - if (segment.isInProgress()) { - ledgers.add(Pair.of(segment, dumpedEntries)); - } - } else { - ledgers.add(Pair.of(segment, EMPTY_LIST)); - } - } - synchronized (corruptedCandidates) { - corruptedCandidates.put(s, ledgers); - } - } - } finally { - dlm.close(); - } - } - } - - @Override - protected String getUsage() { - return "inspect [options]"; - } - } - - protected static class TruncateCommand extends PerDLCommand { - - int numThreads = 1; - String streamPrefix = null; - boolean deleteStream = false; - - TruncateCommand() { - super("truncate", "truncate streams under a given dl uri"); - options.addOption("t", "threads", true, "Number threads to do truncation"); - options.addOption("ft", "filter", true, "Stream filter by prefix"); - options.addOption("d", "delete", false, "Delete Stream"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (cmdline.hasOption("t")) { - numThreads = Integer.parseInt(cmdline.getOptionValue("t")); - } - if (cmdline.hasOption("ft")) { - streamPrefix = cmdline.getOptionValue("ft"); - } - if (cmdline.hasOption("d")) { - deleteStream = true; - } - } - - @Override - protected String getUsage() { - return "truncate [options]"; - } - - protected void setFilter(String filter) { - this.streamPrefix = filter; - } - - @Override - protected int runCmd() throws Exception { - getConf().setZkAclId(getZkAclId()); - return truncateStreams(getNamespace()); - } - - private int truncateStreams(final DistributedLogNamespace namespace) throws Exception { - Iterator<String> streamCollection = namespace.getLogs(); - final List<String> streams = new ArrayList<String>(); - while (streamCollection.hasNext()) { - String s = streamCollection.next(); - if (null != streamPrefix) { - if (s.startsWith(streamPrefix)) { - streams.add(s); - } - } else { - streams.add(s); - } - } - if (0 == streams.size()) { - return 0; - } - System.out.println("Streams : " + streams); - if (!getForce() && !IOUtils.confirmPrompt("Do you want to truncate " + streams.size() + " streams ?")) { - return 0; - } - numThreads = Math.min(streams.size(), numThreads); - final int numStreamsPerThreads = streams.size() / numThreads + 1; - Thread[] threads = new Thread[numThreads]; - for (int i = 0; i < numThreads; i++) { - final int tid = i; - threads[i] = new Thread("Truncate-" + i) { - @Override - public void run() { - try { - truncateStreams(namespace, streams, tid, numStreamsPerThreads); - System.out.println("Thread " + tid + " finished."); - } catch (IOException e) { - System.err.println("Thread " + tid + " quits with exception : " + e.getMessage()); - } - } - }; - threads[i].start(); - } - for (int i = 0; i < numThreads; i++) { - threads[i].join(); - } - return 0; - } - - private void truncateStreams(DistributedLogNamespace namespace, List<String> streams, - int tid, int numStreamsPerThreads) throws IOException { - int startIdx = tid * numStreamsPerThreads; - int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads); - for (int i = startIdx; i < endIdx; i++) { - String s = streams.get(i); - DistributedLogManager dlm = namespace.openLog(s); - try { - if (deleteStream) { - dlm.delete(); - } else { - dlm.purgeLogsOlderThan(Long.MAX_VALUE); - } - } finally { - dlm.close(); - } - } - } - } - - public static class SimpleBookKeeperClient { - BookKeeperClient bkc; - ZooKeeperClient zkc; - - public SimpleBookKeeperClient(DistributedLogConfiguration conf, URI uri) { - try { - zkc = ZooKeeperClientBuilder.newBuilder() - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .zkAclId(conf.getZkAclId()) - .uri(uri) - .build(); - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri); - BKDLConfig.propagateConfiguration(bkdlConfig, conf); - bkc = BookKeeperClientBuilder.newBuilder() - .zkc(zkc) - .dlConfig(conf) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .name("dlog") - .build(); - } catch (Exception e) { - close(); - } - } - public BookKeeperClient client() { - return bkc; - } - public void close() { - if (null != bkc) { - bkc.close(); - } - if (null != zkc) { - zkc.close(); - } - } - } - - protected static class ShowCommand extends PerStreamCommand { - - SimpleBookKeeperClient bkc = null; - boolean listSegments = true; - boolean listEppStats = false; - long firstLid = 0; - long lastLid = -1; - - ShowCommand() { - super("show", "show metadata of a given stream and list segments"); - options.addOption("ns", "no-log-segments", false, "Do not list log segment metadata"); - options.addOption("lp", "placement-stats", false, "Show ensemble placement stats"); - options.addOption("fl", "first-ledger", true, "First log sement no"); - options.addOption("ll", "last-ledger", true, "Last log sement no"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (cmdline.hasOption("fl")) { - try { - firstLid = Long.parseLong(cmdline.getOptionValue("fl")); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid ledger id " + cmdline.getOptionValue("fl")); - } - } - if (firstLid < 0) { - throw new IllegalArgumentException("Invalid ledger id " + firstLid); - } - if (cmdline.hasOption("ll")) { - try { - lastLid = Long.parseLong(cmdline.getOptionValue("ll")); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid ledger id " + cmdline.getOptionValue("ll")); - } - } - if (lastLid != -1 && firstLid > lastLid) { - throw new IllegalArgumentException("Invalid ledger ids " + firstLid + " " + lastLid); - } - listSegments = !cmdline.hasOption("ns"); - listEppStats = cmdline.hasOption("lp"); - } - - @Override - protected int runCmd() throws Exception { - DistributedLogManager dlm = getNamespace().openLog(getStreamName()); - try { - if (listEppStats) { - bkc = new SimpleBookKeeperClient(getConf(), getUri()); - } - printMetadata(dlm); - } finally { - dlm.close(); - if (null != bkc) { - bkc.close(); - } - } - return 0; - } - - private void printMetadata(DistributedLogManager dlm) throws Exception { - printHeader(dlm); - if (listSegments) { - System.out.println("Ledgers : "); - List<LogSegmentMetadata> segments = dlm.getLogSegments(); - for (LogSegmentMetadata segment : segments) { - if (include(segment)) { - printLedgerRow(segment); - } - } - } - } - - private void printHeader(DistributedLogManager dlm) throws Exception { - DLSN firstDlsn = Await.result(dlm.getFirstDLSNAsync()); - boolean endOfStreamMarked = dlm.isEndOfStreamMarked(); - DLSN lastDlsn = dlm.getLastDLSN(); - long firstTxnId = dlm.getFirstTxId(); - long lastTxnId = dlm.getLastTxId(); - long recordCount = dlm.getLogRecordCount(); - String result = String.format("Stream : (firstTxId=%d, lastTxid=%d, firstDlsn=%s, lastDlsn=%s, endOfStreamMarked=%b, recordCount=%d)", - firstTxnId, lastTxnId, getDlsnName(firstDlsn), getDlsnName(lastDlsn), endOfStreamMarked, recordCount); - System.out.println(result); - if (listEppStats) { - printEppStatsHeader(dlm); - } - } - - boolean include(LogSegmentMetadata segment) { - return (firstLid <= segment.getLogSegmentSequenceNumber() && (lastLid == -1 || lastLid >= segment.getLogSegmentSequenceNumber())); - } - - private void printEppStatsHeader(DistributedLogManager dlm) throws Exception { - String label = "Ledger Placement :"; - System.out.println(label); - Map<BookieSocketAddress, Integer> totals = new HashMap<BookieSocketAddress, Integer>(); - List<LogSegmentMetadata> segments = dlm.getLogSegments(); - for (LogSegmentMetadata segment : segments) { - if (include(segment)) { - merge(totals, getBookieStats(segment)); - } - } - List<Map.Entry<BookieSocketAddress, Integer>> entries = new ArrayList<Map.Entry<BookieSocketAddress, Integer>>(totals.entrySet()); - Collections.sort(entries, new Comparator<Map.Entry<BookieSocketAddress, Integer>>() { - @Override - public int compare(Map.Entry<BookieSocketAddress, Integer> o1, Map.Entry<BookieSocketAddress, Integer> o2) { - return o2.getValue() - o1.getValue(); - } - }); - int width = 0; - int totalEntries = 0; - for (Map.Entry<BookieSocketAddress, Integer> entry : entries) { - width = Math.max(width, label.length() + 1 + entry.getKey().toString().length()); - totalEntries += entry.getValue(); - } - for (Map.Entry<BookieSocketAddress, Integer> entry : entries) { - System.out.println(String.format("%"+width+"s\t%6.2f%%\t\t%d", entry.getKey(), entry.getValue()*1.0/totalEntries, entry.getValue())); - } - } - - private void printLedgerRow(LogSegmentMetadata segment) throws Exception { - System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment); - } - - private Map<BookieSocketAddress, Integer> getBookieStats(LogSegmentMetadata segment) throws Exception { - Map<BookieSocketAddress, Integer> stats = new HashMap<BookieSocketAddress, Integer>(); - LedgerHandle lh = bkc.client().get().openLedgerNoRecovery(segment.getLogSegmentId(), BookKeeper.DigestType.CRC32, - getConf().getBKDigestPW().getBytes(UTF_8)); - long eidFirst = 0; - for (SortedMap.Entry<Long, ArrayList<BookieSocketAddress>> entry : LedgerReader.bookiesForLedger(lh).entrySet()) { - long eidLast = entry.getKey().longValue(); - long count = eidLast - eidFirst + 1; - for (BookieSocketAddress bookie : entry.getValue()) { - merge(stats, bookie, (int) count); - } - eidFirst = eidLast; - } - return stats; - } - - void merge(Map<BookieSocketAddress, Integer> m, BookieSocketAddress bookie, Integer count) { - if (m.containsKey(bookie)) { - m.put(bookie, count + m.get(bookie).intValue()); - } else { - m.put(bookie, count); - } - } - - void merge(Map<BookieSocketAddress, Integer> m1, Map<BookieSocketAddress, Integer> m2) { - for (Map.Entry<BookieSocketAddress, Integer> entry : m2.entrySet()) { - merge(m1, entry.getKey(), entry.getValue()); - } - } - - String getDlsnName(DLSN dlsn) { - if (dlsn.equals(DLSN.InvalidDLSN)) { - return "InvalidDLSN"; - } - return dlsn.toString(); - } - - @Override - protected String getUsage() { - return "show [options]"; - } - } - - static class CountCommand extends PerStreamCommand { - - DLSN startDLSN = null; - DLSN endDLSN = null; - - protected CountCommand() { - super("count", "count number records between dlsns"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - String[] args = cmdline.getArgs(); - if (args.length < 1) { - throw new ParseException("Must specify at least start dlsn."); - } - if (args.length >= 1) { - startDLSN = parseDLSN(args[0]); - } - if (args.length >= 2) { - endDLSN = parseDLSN(args[1]); - } - } - - @Override - protected int runCmd() throws Exception { - DistributedLogManager dlm = getNamespace().openLog(getStreamName()); - try { - long count = 0; - if (null == endDLSN) { - count = countToLastRecord(dlm); - } else { - count = countFromStartToEnd(dlm); - } - System.out.println("total is " + count + " records."); - return 0; - } finally { - dlm.close(); - } - } - - int countFromStartToEnd(DistributedLogManager dlm) throws Exception { - int count = 0; - try { - LogReader reader = dlm.getInputStream(startDLSN); - try { - LogRecordWithDLSN record = reader.readNext(false); - LogRecordWithDLSN preRecord = record; - System.out.println("first record : " + record); - while (null != record) { - if (record.getDlsn().compareTo(endDLSN) > 0) { - break; - } - ++count; - if (count % 1000 == 0) { - logger.info("read {} records from {}...", count, getStreamName()); - } - preRecord = record; - record = reader.readNext(false); - } - System.out.println("last record : " + preRecord); - } finally { - reader.close(); - } - } finally { - dlm.close(); - } - return count; - } - - long countToLastRecord(DistributedLogManager dlm) throws Exception { - return Await.result(dlm.getLogRecordCountAsync(startDLSN)).longValue(); - } - - @Override - protected String getUsage() { - return "count <start> <end>"; - } - } - - public static class DeleteCommand extends PerStreamCommand { - - protected DeleteCommand() { - super("delete", "delete a given stream"); - } - - @Override - protected int runCmd() throws Exception { - getConf().setZkAclId(getZkAclId()); - DistributedLogManager dlm = getNamespace().openLog(getStreamName()); - try { - dlm.delete(); - } finally { - dlm.close(); - } - return 0; - } - - @Override - protected String getUsage() { - return "delete"; - } - } - - public static class DeleteLedgersCommand extends PerDLCommand { - - private final List<Long> ledgers = new ArrayList<Long>(); - - int numThreads = 1; - - protected DeleteLedgersCommand() { - super("delete_ledgers", "delete given ledgers"); - options.addOption("l", "ledgers", true, "List of ledgers, separated by comma"); - options.addOption("lf", "ledgers-file", true, "File of list of ledgers, each line has a ledger id"); - options.addOption("t", "concurrency", true, "Number of threads to run deletions"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (cmdline.hasOption("l") && cmdline.hasOption("lf")) { - throw new ParseException("Please specify ledgers: either use list or use file only."); - } - if (!cmdline.hasOption("l") && !cmdline.hasOption("lf")) { - throw new ParseException("No ledgers specified. Please specify ledgers either use list or use file only."); - } - if (cmdline.hasOption("l")) { - String ledgersStr = cmdline.getOptionValue("l"); - String[] ledgerStrs = ledgersStr.split(","); - for (String ledgerStr : ledgerStrs) { - ledgers.add(Long.parseLong(ledgerStr)); - } - } - if (cmdline.hasOption("lf")) { - BufferedReader br = null; - try { - - br = new BufferedReader(new InputStreamReader( - new FileInputStream(new File(cmdline.getOptionValue("lf"))), UTF_8.name())); - String line; - while ((line = br.readLine()) != null) { - ledgers.add(Long.parseLong(line)); - } - } catch (FileNotFoundException e) { - throw new ParseException("No ledgers file " + cmdline.getOptionValue("lf") + " found."); - } catch (IOException e) { - throw new ParseException("Invalid ledgers file " + cmdline.getOptionValue("lf") + " found."); - } finally { - if (null != br) { - try { - br.close(); - } catch (IOException e) { - // no-op - } - } - } - } - if (cmdline.hasOption("t")) { - numThreads = Integer.parseInt(cmdline.getOptionValue("t")); - } - } - - @Override - protected String getUsage() { - return "delete_ledgers [options]"; - } - - @Override - protected int runCmd() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - try { - final AtomicInteger numLedgers = new AtomicInteger(0); - final CountDownLatch doneLatch = new CountDownLatch(numThreads); - final AtomicInteger numFailures = new AtomicInteger(0); - final LinkedBlockingQueue<Long> ledgerQueue = - new LinkedBlockingQueue<Long>(); - ledgerQueue.addAll(ledgers); - for (int i = 0; i < numThreads; i++) { - final int tid = i; - executorService.submit(new Runnable() { - @Override - public void run() { - while (true) { - Long ledger = ledgerQueue.poll(); - if (null == ledger) { - break; - } - try { - getBookKeeperClient().get().deleteLedger(ledger); - int numLedgersDeleted = numLedgers.incrementAndGet(); - if (numLedgersDeleted % 1000 == 0) { - System.out.println("Deleted " + numLedgersDeleted + " ledgers."); - } - } catch (BKException.BKNoSuchLedgerExistsException e) { - int numLedgersDeleted = numLedgers.incrementAndGet(); - if (numLedgersDeleted % 1000 == 0) { - System.out.println("Deleted " + numLedgersDeleted + " ledgers."); - } - } catch (Exception e) { - numFailures.incrementAndGet(); - break; - } - } - doneLatch.countDown(); - System.out.println("Thread " + tid + " quits"); - } - }); - } - doneLatch.await(); - if (numFailures.get() > 0) { - throw new IOException("Encounter " + numFailures.get() + " failures during deleting ledgers"); - } - } finally { - executorService.shutdown(); - } - return 0; - } - } - - public static class CreateCommand extends PerDLCommand { - - final List<String> streams = new ArrayList<String>(); - - String streamPrefix = null; - String streamExpression = null; - - CreateCommand() { - super("create", "create streams under a given namespace"); - options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'."); - options.addOption("e", "expression", true, "Expression to generate stream suffix. " + - "Currently we support range 'x-y', list 'x,y,z' and name 'xyz'"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (cmdline.hasOption("r")) { - streamPrefix = cmdline.getOptionValue("r"); - } - if (cmdline.hasOption("e")) { - streamExpression = cmdline.getOptionValue("e"); - } - if (null == streamPrefix || null == streamExpression) { - throw new ParseException("Please specify stream prefix & expression."); - } - } - - protected void generateStreams(String streamPrefix, String streamExpression) throws ParseException { - // parse the stream expression - if (streamExpression.contains("-")) { - // a range expression - String[] parts = streamExpression.split("-"); - if (parts.length != 2) { - throw new ParseException("Invalid stream index range : " + streamExpression); - } - try { - int start = Integer.parseInt(parts[0]); - int end = Integer.parseInt(parts[1]); - if (start > end) { - throw new ParseException("Invalid stream index range : " + streamExpression); - } - for (int i = start; i <= end; i++) { - streams.add(streamPrefix + i); - } - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid stream index range : " + streamExpression); - } - } else if (streamExpression.contains(",")) { - // a list expression - String[] parts = streamExpression.split(","); - try { - for (String part : parts) { - int idx = Integer.parseInt(part); - streams.add(streamPrefix + idx); - } - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid stream suffix list : " + streamExpression); - } - } else { - streams.add(streamPrefix + streamExpression); - } - } - - @Override - protected int runCmd() throws Exception { - generateStreams(streamPrefix, streamExpression); - if (streams.isEmpty()) { - System.out.println("Nothing to create."); - return 0; - } - if (!getForce() && !IOUtils.confirmPrompt("You are going to create streams : " + streams)) { - return 0; - } - getConf().setZkAclId(getZkAclId()); - for (String stream : streams) { - getNamespace().createLog(stream); - } - return 0; - } - - @Override - protected String getUsage() { - return "create [options]"; - } - - protected void setPrefix(String prefix) { - this.streamPrefix = prefix; - } - - protected void setExpression(String expression) { - this.streamExpression = expression; - } - } - - protected static class DumpCommand extends PerStreamCommand { - - boolean printHex = false; - boolean skipPayload = false; - Long fromTxnId = null; - DLSN fromDLSN = null; - int count = 100; - - DumpCommand() { - super("dump", "dump records of a given stream"); - options.addOption("x", "hex", false, "Print record in hex format"); - options.addOption("sp", "skip-payload", false, "Skip printing the payload of the record"); - options.addOption("o", "offset", true, "Txn ID to start dumping."); - options.addOption("n", "seqno", true, "Sequence Number to start dumping"); - options.addOption("e", "eid", true, "Entry ID to start dumping"); - options.addOption("t", "slot", true, "Slot to start dumping"); - options.addOption("l", "limit", true, "Number of entries to dump. Default is 100."); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - printHex = cmdline.hasOption("x"); - skipPayload = cmdline.hasOption("sp"); - if (cmdline.hasOption("o")) { - try { - fromTxnId = Long.parseLong(cmdline.getOptionValue("o")); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid txn id " + cmdline.getOptionValue("o")); - } - } - if (cmdline.hasOption("l")) { - try { - count = Integer.parseInt(cmdline.getOptionValue("l")); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid count " + cmdline.getOptionValue("l")); - } - if (count <= 0) { - throw new ParseException("Negative count found : " + count); - } - } - if (cmdline.hasOption("n")) { - long seqno; - try { - seqno = Long.parseLong(cmdline.getOptionValue("n")); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid sequence number " + cmdline.getOptionValue("n")); - } - long eid; - if (cmdline.hasOption("e")) { - eid = Long.parseLong(cmdline.getOptionValue("e")); - } else { - eid = 0; - } - long slot; - if (cmdline.hasOption("t")) { - slot = Long.parseLong(cmdline.getOptionValue("t")); - } else { - slot = 0; - } - fromDLSN = new DLSN(seqno, eid, slot); - } - if (null == fromTxnId && null == fromDLSN) { - throw new ParseException("No start Txn/DLSN is specified."); - } - } - - @Override - protected int runCmd() throws Exception { - DistributedLogManager dlm = getNamespace().openLog(getStreamName()); - long totalCount = dlm.getLogRecordCount(); - try { - AsyncLogReader reader; - Object startOffset; - try { - DLSN lastDLSN = Await.result(dlm.getLastDLSNAsync()); - System.out.println("Last DLSN : " + lastDLSN); - if (null == fromDLSN) { - reader = dlm.getAsyncLogReader(fromTxnId); - startOffset = fromTxnId; - } else { - reader = dlm.getAsyncLogReader(fromDLSN); - startOffset = fromDLSN; - } - } catch (LogNotFoundException lee) { - System.out.println("No stream found to dump records."); - return 0; - } - try { - System.out.println(String.format("Dump records for %s (from = %s, dump count = %d, total records = %d)", - getStreamName(), startOffset, count, totalCount)); - - dumpRecords(reader); - } finally { - Utils.close(reader); - } - } finally { - dlm.close(); - } - return 0; - } - - private void dumpRecords(AsyncLogReader reader) throws Exception { - int numRead = 0; - LogRecord record = Await.result(reader.readNext()); - while (record != null) { - // dump the record - dumpRecord(record); - ++numRead; - if (numRead >= count) { - break; - } - record = Await.result(reader.readNext()); - } - if (numRead == 0) { - System.out.println("No records."); - } else { - System.out.println("------------------------------------------------"); - } - } - - private void dumpRecord(LogRecord record) { - System.out.println("------------------------------------------------"); - if (record instanceof LogRecordWithDLSN) { - System.out.println("Record (txn = " + record.getTransactionId() + ", bytes = " - + record.getPayload().length + ", dlsn = " - + ((LogRecordWithDLSN) record).getDlsn() + ", sequence id = " - + ((LogRecordWithDLSN) record).getSequenceId() + ")"); - } else { - System.out.println("Record (txn = " + record.getTransactionId() + ", bytes = " - + record.getPayload().length + ")"); - } - System.out.println(""); - - if (skipPayload) { - return; - } - - if (printHex) { - System.out.println(Hex.encodeHexString(record.getPayload())); - } else { - System.out.println(new String(record.getPayload(), UTF_8)); - } - } - - @Override - protected String getUsage() { - return "dump [options]"; - } - - protected void setFromTxnId(Long fromTxnId) { - this.fromTxnId = fromTxnId; - } - } - - /** - * TODO: refactor inspect & inspectstream - * TODO: support force - * - * inspectstream -lac -gap (different options for different operations for a single stream) - * inspect -lac -gap (inspect the namespace, which will use inspect stream) - */ - static class InspectStreamCommand extends PerStreamCommand { - - InspectStreamCommand() { - super("inspectstream", "Inspect a given stream to identify any metadata corruptions"); - } - - @Override - protected int runCmd() throws Exception { - DistributedLogManager dlm = getNamespace().openLog(getStreamName()); - try { - return inspectAndRepair(dlm.getLogSegments()); - } finally { - dlm.close(); - } - } - - protected int inspectAndRepair(List<LogSegmentMetadata> segments) throws Exception { - LogSegmentMetadataStore metadataStore = getLogSegmentMetadataStore(); - ZooKeeperClient zkc = getZooKeeperClient(); - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri()); - BKDLConfig.propagateConfiguration(bkdlConfig, getConf()); - BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder() - .dlConfig(getConf()) - .zkServers(bkdlConfig.getBkZkServersForReader()) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .name("dlog") - .build(); - try { - List<LogSegmentMetadata> segmentsToRepair = inspectLogSegments(bkc, segments); - if (segmentsToRepair.isEmpty()) { - System.out.println("The stream is good. No log segments to repair."); - return 0; - } - System.out.println(segmentsToRepair.size() + " segments to repair : "); - System.out.println(segmentsToRepair); - System.out.println(); - if (!IOUtils.confirmPrompt("Do you want to repair them (Y/N): ")) { - return 0; - } - repairLogSegments(metadataStore, bkc, segmentsToRepair); - return 0; - } finally { - bkc.close(); - } - } - - protected List<LogSegmentMetadata> inspectLogSegments( - BookKeeperClient bkc, List<LogSegmentMetadata> segments) throws Exception { - List<LogSegmentMetadata> segmentsToRepair = new ArrayList<LogSegmentMetadata>(); - for (LogSegmentMetadata segment : segments) { - if (!segment.isInProgress() && !inspectLogSegment(bkc, segment)) { - segmentsToRepair.add(segment); - } - } - return segmentsToRepair; - } - - /** - * Inspect a given log segment. - * - * @param bkc - * bookkeeper client - * @param metadata - * metadata of the log segment to - * @return true if it is a good stream, false if the stream has inconsistent metadata. - * @throws Exception - */ - protected boolean inspectLogSegment(BookKeeperClient bkc, - LogSegmentMetadata metadata) throws Exception { - if (metadata.isInProgress()) { - System.out.println("Skip inprogress log segment " + metadata); - return true; - } - long ledgerId = metadata.getLogSegmentId(); - LedgerHandle lh = bkc.get().openLedger(ledgerId, BookKeeper.DigestType.CRC32, - getConf().getBKDigestPW().getBytes(UTF_8)); - LedgerHandle readLh = bkc.get().openLedger(ledgerId, BookKeeper.DigestType.CRC32, - getConf().getBKDigestPW().getBytes(UTF_8)); - LedgerReader lr = new LedgerReader(bkc.get()); - final AtomicReference<List<LedgerEntry>> entriesHolder = new AtomicReference<List<LedgerEntry>>(null); - final AtomicInteger rcHolder = new AtomicInteger(-1234); - final CountDownLatch doneLatch = new CountDownLatch(1); - try { - lr.forwardReadEntriesFromLastConfirmed(readLh, new BookkeeperInternalCallbacks.GenericCallback<List<LedgerEntry>>() { - @Override - public void operationComplete(int rc, List<LedgerEntry> entries) { - rcHolder.set(rc); - entriesHolder.set(entries); - doneLatch.countDown(); - } - }); - doneLatch.await(); - if (BKException.Code.OK != rcHolder.get()) { - throw BKException.create(rcHolder.get()); - } - List<LedgerEntry> entries = entriesHolder.get(); - long lastEntryId; - if (entries.isEmpty()) { - lastEntryId = LedgerHandle.INVALID_ENTRY_ID; - } else { - LedgerEntry lastEntry = entries.get(entries.size() - 1); - lastEntryId = lastEntry.getEntryId(); - } - if (lastEntryId != lh.getLastAddConfirmed()) { - System.out.println("Inconsistent Last Add Confirmed Found for LogSegment " + metadata.getLogSegmentSequenceNumber() + ": "); - System.out.println("\t metadata: " + metadata); - System.out.println("\t lac in ledger metadata is " + lh.getLastAddConfirmed() + ", but lac in bookies is " + lastEntryId); - return false; - } else { - return true; - } - } finally { - lh.close(); - readLh.close(); - } - } - - protected void repairLogSegments(LogSegmentMetadataStore metadataStore, - BookKeeperClient bkc, - List<LogSegmentMetadata> segments) throws Exception { - BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get()); - try { - MetadataUpdater metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater( - getConf(), metadataStore); - for (LogSegmentMetadata segment : segments) { - repairLogSegment(bkAdmin, metadataUpdater, segment); - } - } finally { - bkAdmin.close(); - } - } - - protected void repairLogSegment(BookKeeperAdmin bkAdmin, - MetadataUpdater metadataUpdater, - LogSegmentMetadata segment) throws Exception { - if (segment.isInProgress()) { - System.out.println("Skip inprogress log segment " + segment); - return; - } - LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId(), true); - long lac = lh.getLastAddConfirmed(); - Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac); - if (!entries.hasMoreElements()) { - throw new IOException("Entry " + lac + " isn't found for " + segment); - } - LedgerEntry lastEntry = entries.nextElement(); - Entry.Reader reader = Entry.newBuilder() - .setLogSegmentInfo(segment.getLogSegmentSequenceNumber(), segment.getStartSequenceId()) - .setEntryId(lastEntry.getEntryId()) - .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion())) - .setInputStream(lastEntry.getEntryInputStream()) - .buildReader(); - LogRecordWithDLSN record = reader.nextRecord(); - LogRecordWithDLSN lastRecord = null; - while (null != record) { - lastRecord = record; - record = reader.nextRecord(); - } - if (null == lastRecord) { - throw new IOException("No record found in entry " + lac + " for " + segment); - } - System.out.println("Updating last record for " + segment + " to " + lastRecord); - if (!IOUtils.confirmPrompt("Do you want to make this change (Y/N): ")) { - return; - } - metadataUpdater.updateLastRecord(segment, lastRecord); - } - - @Override - protected String getUsage() { - return "inspectstream [options]"; - } - } - - static interface BKCommandRunner { - int run(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception; - } - - abstract static class PerBKCommand extends PerDLCommand { - - protected PerBKCommand(String name, String description) { - super(name, description); - } - - @Override - protected int runCmd() throws Exception { - return runBKCommand(new BKCommandRunner() { - @Override - public int run(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception { - return runBKCmd(zkc, bkc); - } - }); - } - - protected int runBKCommand(BKCommandRunner runner) throws Exception { - return runner.run(getZooKeeperClient(), getBookKeeperClient()); - } - - abstract protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception; - } - - static class RecoverCommand extends PerBKCommand { - - final List<Long> ledgers = new ArrayList<Long>(); - boolean query = false; - boolean dryrun = false; - boolean skipOpenLedgers = false; - boolean fenceOnly = false; - int fenceRate = 1; - int concurrency = 1; - final Set<BookieSocketAddress> bookiesSrc = new HashSet<BookieSocketAddress>(); - int partition = 0; - int numPartitions = 0; - - RecoverCommand() { - super("recover", "Recover the ledger data that stored on failed bookies"); - options.addOption("l", "ledger", true, "Specific ledger to recover"); - options.addOption("lf", "ledgerfile", true, "File contains ledgers list"); - options.addOption("q", "query", false, "Query the ledgers that contain given bookies"); - options.addOption("d", "dryrun", false, "Print the recovery plan w/o actually recovering"); - options.addOption("cy", "concurrency", true, "Number of ledgers could be recovered in parallel"); - options.addOption("sk", "skipOpenLedgers", false, "Skip recovering open ledgers"); - options.addOption("p", "partition", true, "partition"); - options.addOption("n", "num-partitions", true, "num partitions"); - options.addOption("fo", "fence-only", true, "fence the ledgers only w/o re-replicating entries"); - options.addOption("fr", "fence-rate", true, "rate on fencing ledgers"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - query = cmdline.hasOption("q"); - force = cmdline.hasOption("f"); - dryrun = cmdline.hasOption("d"); - skipOpenLedgers = cmdline.hasOption("sk"); - fenceOnly = cmdline.hasOption("fo"); - if (cmdline.hasOption("l")) { - String[] lidStrs = cmdline.getOptionValue("l").split(","); - try { - for (String lidStr : lidStrs) { - ledgers.add(Long.parseLong(lidStr)); - } - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid ledger id provided : " + cmdline.getOptionValue("l")); - } - } - if (cmdline.hasOption("lf")) { - String file = cmdline.getOptionValue("lf"); - try { - BufferedReader br = new BufferedReader( - new InputStreamReader(new FileInputStream(file), UTF_8.name())); - try { - String line = br.readLine(); - - while (line != null) { - ledgers.add(Long.parseLong(line)); - line = br.readLine(); - } - } finally { - br.close(); - } - } catch (IOException e) { - throw new ParseException("Invalid ledgers file provided : " + file); - } - } - if (cmdline.hasOption("cy")) { - try { - concurrency = Integer.parseInt(cmdline.getOptionValue("cy")); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid concurrency provided : " + cmdline.getOptionValue("cy")); - } - } - if (cmdline.hasOption("p")) { - partition = Integer.parseInt(cmdline.getOptionValue("p")); - } - if (cmdline.hasOption("n")) { - numPartitions = Integer.parseInt(cmdline.getOptionValue("n")); - } - if (cmdline.hasOption("fr")) { - fenceRate = Integer.parseInt(cmdline.getOptionValue("fr")); - } - // Get bookies list to recover - String[] args = cmdline.getArgs(); - final String[] bookieStrs = args[0].split(","); - for (String bookieStr : bookieStrs) { - final String bookieStrParts[] = bookieStr.split(":"); - if (bookieStrParts.length != 2) { - throw new ParseException("BookieSrcs has invalid bookie address format (host:port expected) : " - + bookieStr); - } - try { - bookiesSrc.add(new BookieSocketAddress(bookieStrParts[0], - Integer.parseInt(bookieStrParts[1]))); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid ledger id provided : " + cmdline.getOptionValue("l")); - } - } - } - - @Override - protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception { - BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get()); - try { - if (query) { - return bkQuery(bkAdmin, bookiesSrc); - } - if (fenceOnly) { - return bkFence(bkc, ledgers, fenceRate); - } - if (!force) { - System.out.println("Bookies : " + bookiesSrc); - if (!IOUtils.confirmPrompt("Do you want to recover them: (Y/N)")) { - return -1; - } - } - if (!ledgers.isEmpty()) { - System.out.println("Ledgers : " + ledgers); - long numProcessed = 0; - Iterator<Long> ledgersIter = ledgers.iterator(); - LinkedBlockingQueue<Long> ledgersToProcess = new LinkedBlockingQueue<Long>(); - while (ledgersIter.hasNext()) { - long lid = ledgersIter.next(); - if (numPartitions <=0 || (numPartitions > 0 && lid % numPartitions == partition)) { - ledgersToProcess.add(lid); - ++numProcessed; - } - if (ledgersToProcess.size() == 10000) { - System.out.println("Processing " + numProcessed + " ledgers"); - bkRecovery(ledgersToProcess, bookiesSrc, dryrun, skipOpenLedgers); - ledgersToProcess.clear(); - System.out.println("Processed " + numProcessed + " ledgers"); - } - } - if (!ledgersToProcess.isEmpty()) { - System.out.println("Processing " + numProcessed + " ledgers"); - bkRecovery(ledgersToProcess, bookiesSrc, dryrun, skipOpenLedgers); - System.out.println("Processed " + numProcessed + " ledgers"); - } - System.out.println("Done."); - CountDownLatch latch = new CountDownLatch(1); - latch.await(); - return 0; - } - return bkRecovery(bkAdmin, bookiesSrc, dryrun, skipOpenLedgers); - } finally { - bkAdmin.close(); - } - } - - private int bkFence(final BookKeeperClient bkc, List<Long> ledgers, int fenceRate) throws Exception { - if (ledgers.isEmpty()) { - System.out.println("Nothing to fence. Done."); - return 0; - } - ExecutorService executorService = Executors.newCachedThreadPool(); - final RateLimiter rateLimiter = RateLimiter.create(fenceRate); - final byte[] passwd = getConf().getBKDigestPW().getBytes(UTF_8); - final CountDownLatch latch = new CountDownLatch(ledgers.size()); - final AtomicInteger numPendings = new AtomicInteger(ledgers.size()); - final LinkedBlockingQueue<Long> ledgersQueue = new LinkedBlockingQueue<Long>(); - ledgersQueue.addAll(ledgers); - - for (int i = 0; i < concurrency; i++) { - executorService.submit(new Runnable() { - @Override - public void run() { - while (!ledgersQueue.isEmpty()) { - rateLimiter.acquire(); - Long lid = ledgersQueue.poll(); - if (null == lid) { - break; - } - System.out.println("Fencing ledger " + lid); - int numRetries = 3; - while (numRetries > 0) { - try { - LedgerHandle lh = bkc.get().openLedger(lid, BookKeeper.DigestType.CRC32, passwd); - lh.close(); - System.out.println("Fenced ledger " + lid + ", " + numPendings.decrementAndGet() + " left."); - latch.countDown(); - } catch (BKException.BKNoSuchLedgerExistsException bke) { - System.out.println("Skipped fence non-exist ledger " + lid + ", " + numPendings.decrementAndGet() + " left."); - latch.countDown(); - } catch (BKException.BKLedgerRecoveryException lre) { - --numRetries; - continue; - } catch (Exception e) { - e.printStackTrace(); - break; - } - numRetries = 0; - } - } - System.out.println("Thread exits"); - } - }); - } - latch.await(); - SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES); - return 0; - } - - private int bkQuery(BookKeeperAdmin bkAdmin, Set<BookieSocketAddress> bookieAddrs) - throws InterruptedException, BKException { - SortedMap<Long, LedgerMetadata> ledgersContainBookies = - bkAdmin.getLedgersContainBookies(bookieAddrs); - System.err.println("NOTE: Bookies in inspection list are marked with '*'."); - for (Map.Entry<Long, LedgerMetadata> ledger : ledgersContainBookies.entrySet()) { - System.out.println("ledger " + ledger.getKey() + " : " + ledger.getValue().getState()); - Map<Long, Integer> numBookiesToReplacePerEnsemble = - inspectLedger(ledger.getValue(), bookieAddrs); - System.out.print("summary: ["); - for (Map.Entry<Long, Integer> entry : numBookiesToReplacePerEnsemble.entrySet()) { - System.out.print(entry.getKey() + "=" + entry.getValue() + ", "); - } - System.out.println("]"); - System.out.println(); - } - System.out.println("Done"); - return 0; - } - - private Map<Long, Integer> inspectLedger(LedgerMetadata metadata, Set<BookieSocketAddress> bookiesToInspect) { - Map<Long, Integer> numBookiesToReplacePerEnsemble = new TreeMap<Long, Integer>(); - for (Map.Entry<Long, ArrayList<BookieSocketAddress>> ensemble : metadata.getEnsembles().entrySet()) { - ArrayList<BookieSocketAddress> bookieList = ensemble.getValue(); - System.out.print(ensemble.getKey() + ":\t"); - int numBookiesToReplace = 0; - for (BookieSocketAddress bookie: bookieList) { - System.out.print(bookie.toString()); - if (bookiesToInspect.contains(bookie)) { - System.out.print("*"); - ++numBookiesToReplace; - } else { - System.out.print(" "); - } - System.out.print(" "); - } - System.out.println(); - numBookiesToReplacePerEnsemble.put(ensemble.getKey(), numBookiesToReplace); - } - return numBookiesToReplacePerEnsemble; - } - - private int bkRecovery(final LinkedBlockingQueue<Long> ledgers, final Set<BookieSocketAddress> bookieAddrs, - final boolean dryrun, final boolean skipOpenLedgers) - throws Exception { - return runBKCommand(new BKCommandRunner() { - @Override - public int run(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception { - BookKeeperAdmin bkAdmin = new BookKeeperAdmin(bkc.get()); - try { - bkRecovery(bkAdmin, ledgers, bookieAddrs, dryrun, skipOpenLedgers); - return 0; - } finally { - bkAdmin.close(); - } - } - }); - } - - private int bkRecovery(final BookKeeperAdmin bkAdmin, final LinkedBlockingQueue<Long> ledgers, - final Set<BookieSocketAddress> bookieAddrs, - final boolean dryrun, final boolean skipOpenLedgers) - throws InterruptedException, BKException { - final AtomicInteger numPendings = new AtomicInteger(ledgers.size()); - final ExecutorService executorService = Executors.newCachedThreadPool(); - final CountDownLatch doneLatch = new CountDownLatch(concurrency); - Runnable r = new Runnable() { - @Override - public void run() { - while (!ledgers.isEmpty()) { - long lid = -1L; -
<TRUNCATED>