Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 7fab246d9 -> b44820b50


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
 
b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
new file mode 100644
index 0000000..56a4f2e
--- /dev/null
+++ 
b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.auditor;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.distributedlog.BookKeeperClient;
+import org.apache.distributedlog.BookKeeperClientBuilder;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.ZooKeeperClientBuilder;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.util.DLUtils;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAccessor;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * DL Auditor will audit DL namespace, e.g. find leaked ledger, report disk 
usage by streams.
+ */
+public class DLAuditor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(DLAuditor.class);
+
+    private final DistributedLogConfiguration conf;
+
+    public DLAuditor(DistributedLogConfiguration conf) {
+        this.conf = conf;
+    }
+
+    private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace 
namespace) {
+        NamespaceDriver driver = namespace.getNamespaceDriver();
+        assert(driver instanceof BKNamespaceDriver);
+        return ((BKNamespaceDriver) driver).getWriterZKC();
+    }
+
+    private BookKeeperClient getBookKeeperClient(DistributedLogNamespace 
namespace) {
+        NamespaceDriver driver = namespace.getNamespaceDriver();
+        assert(driver instanceof BKNamespaceDriver);
+        return ((BKNamespaceDriver) driver).getReaderBKC();
+    }
+
+    private String validateAndGetZKServers(List<URI> uris) {
+        URI firstURI = uris.get(0);
+        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI);
+        for (URI uri : uris) {
+            if 
(!zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) {
+                throw new IllegalArgumentException("Uris don't belong to same 
zookeeper cluster");
+            }
+        }
+        return zkServers;
+    }
+
+    private BKDLConfig resolveBKDLConfig(ZooKeeperClient zkc, List<URI> uris) 
throws IOException {
+        URI firstURI = uris.get(0);
+        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, firstURI);
+        for (URI uri : uris) {
+            BKDLConfig anotherConfig = BKDLConfig.resolveDLConfig(zkc, uri);
+            if (!(Objects.equal(bkdlConfig.getBkLedgersPath(), 
anotherConfig.getBkLedgersPath())
+                    && Objects.equal(bkdlConfig.getBkZkServersForWriter(), 
anotherConfig.getBkZkServersForWriter()))) {
+                throw new IllegalArgumentException("Uris don't use same 
bookkeeper cluster");
+            }
+        }
+        return bkdlConfig;
+    }
+
+    public Pair<Set<Long>, Set<Long>> collectLedgers(List<URI> uris, 
List<List<String>> allocationPaths)
+            throws IOException {
+        Preconditions.checkArgument(uris.size() > 0, "No uri provided to 
audit");
+
+        String zkServers = validateAndGetZKServers(uris);
+        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
+                conf.getZKRetryBackoffStartMillis(),
+                conf.getZKRetryBackoffMaxMillis(),
+                Integer.MAX_VALUE);
+        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
+                .name("DLAuditor-ZK")
+                .zkServers(zkServers)
+                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+                .retryPolicy(retryPolicy)
+                .zkAclId(conf.getZkAclId())
+                .build();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        try {
+            BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris);
+            logger.info("Resolved bookkeeper config : {}", bkdlConfig);
+
+            BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder()
+                    .name("DLAuditor-BK")
+                    .dlConfig(conf)
+                    .zkServers(bkdlConfig.getBkZkServersForWriter())
+                    .ledgersPath(bkdlConfig.getBkLedgersPath())
+                    .build();
+            try {
+                Set<Long> bkLedgers = collectLedgersFromBK(bkc, 
executorService);
+                Set<Long> dlLedgers = collectLedgersFromDL(uris, 
allocationPaths);
+                return Pair.of(bkLedgers, dlLedgers);
+            } finally {
+                bkc.close();
+            }
+        } finally {
+            zkc.close();
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Find leak ledgers phase 1: collect ledgers set.
+     */
+    private Set<Long> collectLedgersFromBK(BookKeeperClient bkc,
+                                           final ExecutorService 
executorService)
+            throws IOException {
+        LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
+
+        final Set<Long> ledgers = new HashSet<Long>();
+        final SettableFuture<Void> doneFuture = SettableFuture.create();
+
+        BookkeeperInternalCallbacks.Processor<Long> collector =
+                new BookkeeperInternalCallbacks.Processor<Long>() {
+            @Override
+            public void process(Long lid,
+                                final AsyncCallback.VoidCallback cb) {
+                synchronized (ledgers) {
+                    ledgers.add(lid);
+                    if (0 == ledgers.size() % 1000) {
+                        logger.info("Collected {} ledgers", ledgers.size());
+                    }
+                }
+                executorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        cb.processResult(BKException.Code.OK, null, null);
+                    }
+                });
+
+            }
+        };
+        AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                if (BKException.Code.OK == rc) {
+                    doneFuture.set(null);
+                } else {
+                    doneFuture.setException(BKException.create(rc));
+                }
+            }
+        };
+        lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK,
+                BKException.Code.ZKException);
+        try {
+            doneFuture.get();
+            logger.info("Collected total {} ledgers", ledgers.size());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted on collecting 
ledgers : ", e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException)(e.getCause());
+            } else {
+                throw new IOException("Failed to collect ledgers : ", 
e.getCause());
+            }
+        }
+        return ledgers;
+    }
+
+    /**
+     * Find leak ledgers phase 2: collect ledgers from uris.
+     */
+    private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> 
allocationPaths)
+            throws IOException {
+        final Set<Long> ledgers = new TreeSet<Long>();
+        List<DistributedLogNamespace> namespaces =
+                new ArrayList<DistributedLogNamespace>(uris.size());
+        try {
+            for (URI uri : uris) {
+                namespaces.add(
+                        DistributedLogNamespaceBuilder.newBuilder()
+                                .conf(conf)
+                                .uri(uri)
+                                .build());
+            }
+            final CountDownLatch doneLatch = new CountDownLatch(uris.size());
+            final AtomicInteger numFailures = new AtomicInteger(0);
+            ExecutorService executor = 
Executors.newFixedThreadPool(uris.size());
+            try {
+                int i = 0;
+                for (final DistributedLogNamespace namespace : namespaces) {
+                    final DistributedLogNamespace dlNamespace = namespace;
+                    final URI uri = uris.get(i);
+                    final List<String> aps = allocationPaths.get(i);
+                    i++;
+                    executor.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                logger.info("Collecting ledgers from {} : {}", 
uri, aps);
+                                collectLedgersFromAllocator(uri, namespace, 
aps, ledgers);
+                                synchronized (ledgers) {
+                                    logger.info("Collected {} ledgers from 
allocators for {} : {} ",
+                                            new Object[]{ledgers.size(), uri, 
ledgers});
+                                }
+                                collectLedgersFromDL(uri, namespace, ledgers);
+                            } catch (IOException e) {
+                                numFailures.incrementAndGet();
+                                logger.info("Error to collect ledgers from DL 
: ", e);
+                            }
+                            doneLatch.countDown();
+                        }
+                    });
+                }
+                try {
+                    doneLatch.await();
+                    if (numFailures.get() > 0) {
+                        throw new IOException(numFailures.get() + " errors to 
collect ledgers from DL");
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    logger.warn("Interrupted on collecting ledgers from DL : 
", e);
+                    throw new DLInterruptedException("Interrupted on 
collecting ledgers from DL : ", e);
+                }
+            } finally {
+                executor.shutdown();
+            }
+        } finally {
+            for (DistributedLogNamespace namespace : namespaces) {
+                namespace.close();
+            }
+        }
+        return ledgers;
+    }
+
+    private void collectLedgersFromAllocator(final URI uri,
+                                             final DistributedLogNamespace 
namespace,
+                                             final List<String> 
allocationPaths,
+                                             final Set<Long> ledgers) throws 
IOException {
+        final LinkedBlockingQueue<String> poolQueue =
+                new LinkedBlockingQueue<String>();
+        for (String allocationPath : allocationPaths) {
+            String rootPath = uri.getPath() + "/" + allocationPath;
+            try {
+                List<String> pools = 
getZooKeeperClient(namespace).get().getChildren(rootPath, false);
+                for (String pool : pools) {
+                    poolQueue.add(rootPath + "/" + pool);
+                }
+            } catch (KeeperException e) {
+                throw new ZKException("Failed to get list of pools from " + 
rootPath, e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new DLInterruptedException("Interrupted on getting list 
of pools from " + rootPath, e);
+            }
+        }
+
+
+        logger.info("Collecting ledgers from allocators for {} : {}", uri, 
poolQueue);
+
+        executeAction(poolQueue, 10, new Action<String>() {
+            @Override
+            public void execute(String poolPath) throws IOException {
+                try {
+                    collectLedgersFromPool(poolPath);
+                } catch (InterruptedException e) {
+                    throw new DLInterruptedException("Interrupted on 
collecting ledgers from allocation pool " + poolPath, e);
+                } catch (KeeperException e) {
+                    throw new ZKException("Failed to collect ledgers from 
allocation pool " + poolPath, e.code());
+                }
+            }
+
+            private void collectLedgersFromPool(String poolPath)
+                    throws InterruptedException, 
ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
+                List<String> allocators = getZooKeeperClient(namespace).get()
+                                        .getChildren(poolPath, false);
+                for (String allocator : allocators) {
+                    String allocatorPath = poolPath + "/" + allocator;
+                    byte[] data = 
getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat());
+                    if (null != data && data.length > 0) {
+                        try {
+                            long ledgerId = DLUtils.bytes2LogSegmentId(data);
+                            synchronized (ledgers) {
+                                ledgers.add(ledgerId);
+                            }
+                        } catch (NumberFormatException nfe) {
+                            logger.warn("Invalid ledger found in allocator 
path {} : ", allocatorPath, nfe);
+                        }
+                    }
+                }
+            }
+        });
+
+        logger.info("Collected ledgers from allocators for {}.", uri);
+    }
+
+    private void collectLedgersFromDL(final URI uri,
+                                      final DistributedLogNamespace namespace,
+                                      final Set<Long> ledgers) throws 
IOException {
+        logger.info("Enumerating {} to collect streams.", uri);
+        Iterator<String> streams = namespace.getLogs();
+        final LinkedBlockingQueue<String> streamQueue = new 
LinkedBlockingQueue<String>();
+        while (streams.hasNext()) {
+            streamQueue.add(streams.next());
+        }
+
+        logger.info("Collected {} streams from uri {} : {}",
+                    new Object[] { streamQueue.size(), uri, streams });
+
+        executeAction(streamQueue, 10, new Action<String>() {
+            @Override
+            public void execute(String stream) throws IOException {
+                collectLedgersFromStream(namespace, stream, ledgers);
+            }
+        });
+    }
+
+    private List<Long> collectLedgersFromStream(DistributedLogNamespace 
namespace,
+                                                String stream,
+                                                Set<Long> ledgers)
+            throws IOException {
+        DistributedLogManager dlm = namespace.openLog(stream);
+        try {
+            List<LogSegmentMetadata> segments = dlm.getLogSegments();
+            List<Long> sLedgers = new ArrayList<Long>();
+            for (LogSegmentMetadata segment : segments) {
+                synchronized (ledgers) {
+                    ledgers.add(segment.getLogSegmentId());
+                }
+                sLedgers.add(segment.getLogSegmentId());
+            }
+            return sLedgers;
+        } finally {
+            dlm.close();
+        }
+    }
+
+    /**
+     * Calculating stream space usage from given <i>uri</i>.
+     *
+     * @param uri dl uri
+     * @throws IOException
+     */
+    public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws 
IOException {
+        logger.info("Collecting stream space usage for {}.", uri);
+        DistributedLogNamespace namespace = 
DistributedLogNamespaceBuilder.newBuilder()
+                .conf(conf)
+                .uri(uri)
+                .build();
+        try {
+            return calculateStreamSpaceUsage(uri, namespace);
+        } finally {
+            namespace.close();
+        }
+    }
+
+    private Map<String, Long> calculateStreamSpaceUsage(
+            final URI uri, final DistributedLogNamespace namespace)
+        throws IOException {
+        Iterator<String> streams = namespace.getLogs();
+        final LinkedBlockingQueue<String> streamQueue = new 
LinkedBlockingQueue<String>();
+        while (streams.hasNext()) {
+            streamQueue.add(streams.next());
+        }
+
+        final Map<String, Long> streamSpaceUsageMap =
+                new ConcurrentSkipListMap<String, Long>();
+        final AtomicInteger numStreamsCollected = new AtomicInteger(0);
+
+        executeAction(streamQueue, 10, new Action<String>() {
+            @Override
+            public void execute(String stream) throws IOException {
+                streamSpaceUsageMap.put(stream,
+                        calculateStreamSpaceUsage(namespace, stream));
+                if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
+                    logger.info("Calculated {} streams from uri {}.", 
numStreamsCollected.get(), uri);
+                }
+            }
+        });
+
+        return streamSpaceUsageMap;
+    }
+
+    private long calculateStreamSpaceUsage(final DistributedLogNamespace 
namespace,
+                                           final String stream) throws 
IOException {
+        DistributedLogManager dlm = namespace.openLog(stream);
+        long totalBytes = 0;
+        try {
+            List<LogSegmentMetadata> segments = dlm.getLogSegments();
+            for (LogSegmentMetadata segment : segments) {
+                try {
+                    LedgerHandle lh = 
getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(),
+                            BookKeeper.DigestType.CRC32, 
conf.getBKDigestPW().getBytes(UTF_8));
+                    totalBytes += lh.getLength();
+                    lh.close();
+                } catch (BKException e) {
+                    logger.error("Failed to open ledger {} : ", 
segment.getLogSegmentId(), e);
+                    throw new IOException("Failed to open ledger " + 
segment.getLogSegmentId(), e);
+                } catch (InterruptedException e) {
+                    logger.warn("Interrupted on opening ledger {} : ", 
segment.getLogSegmentId(), e);
+                    Thread.currentThread().interrupt();
+                    throw new DLInterruptedException("Interrupted on opening 
ledger " + segment.getLogSegmentId(), e);
+                }
+            }
+        } finally {
+            dlm.close();
+        }
+        return totalBytes;
+    }
+
+    public long calculateLedgerSpaceUsage(URI uri) throws IOException {
+        List<URI> uris = Lists.newArrayList(uri);
+        String zkServers = validateAndGetZKServers(uris);
+        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
+                conf.getZKRetryBackoffStartMillis(),
+                conf.getZKRetryBackoffMaxMillis(),
+                Integer.MAX_VALUE);
+        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
+                .name("DLAuditor-ZK")
+                .zkServers(zkServers)
+                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+                .retryPolicy(retryPolicy)
+                .zkAclId(conf.getZkAclId())
+                .build();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        try {
+            BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris);
+            logger.info("Resolved bookkeeper config : {}", bkdlConfig);
+
+            BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder()
+                    .name("DLAuditor-BK")
+                    .dlConfig(conf)
+                    .zkServers(bkdlConfig.getBkZkServersForWriter())
+                    .ledgersPath(bkdlConfig.getBkLedgersPath())
+                    .build();
+            try {
+                return calculateLedgerSpaceUsage(bkc, executorService);
+            } finally {
+                bkc.close();
+            }
+        } finally {
+            zkc.close();
+            executorService.shutdown();
+        }
+    }
+
+    private long calculateLedgerSpaceUsage(BookKeeperClient bkc,
+                                           final ExecutorService 
executorService)
+        throws IOException {
+        final AtomicLong totalBytes = new AtomicLong(0);
+        final AtomicLong totalEntries = new AtomicLong(0);
+        final AtomicLong numLedgers = new AtomicLong(0);
+
+        LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
+
+        final SettableFuture<Void> doneFuture = SettableFuture.create();
+        final BookKeeper bk = bkc.get();
+
+        BookkeeperInternalCallbacks.Processor<Long> collector =
+                new BookkeeperInternalCallbacks.Processor<Long>() {
+            @Override
+            public void process(final Long lid,
+                                final AsyncCallback.VoidCallback cb) {
+                numLedgers.incrementAndGet();
+                executorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        bk.asyncOpenLedgerNoRecovery(lid, 
BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8),
+                                new 
org.apache.bookkeeper.client.AsyncCallback.OpenCallback() {
+                            @Override
+                            public void openComplete(int rc, LedgerHandle lh, 
Object ctx) {
+                                final int cbRc;
+                                if (BKException.Code.OK == rc) {
+                                    totalBytes.addAndGet(lh.getLength());
+                                    
totalEntries.addAndGet(lh.getLastAddConfirmed() + 1);
+                                    cbRc = rc;
+                                } else {
+                                    cbRc = BKException.Code.ZKException;
+                                }
+                                executorService.submit(new Runnable() {
+                                    @Override
+                                    public void run() {
+                                        cb.processResult(cbRc, null, null);
+                                    }
+                                });
+                            }
+                        }, null);
+                    }
+                });
+            }
+        };
+        AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                if (BKException.Code.OK == rc) {
+                    doneFuture.set(null);
+                } else {
+                    doneFuture.setException(BKException.create(rc));
+                }
+            }
+        };
+        lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK, 
BKException.Code.ZKException);
+        try {
+            doneFuture.get();
+            logger.info("calculated {} ledgers\n\ttotal bytes = {}\n\ttotal 
entries = {}",
+                    new Object[] { numLedgers.get(), totalBytes.get(), 
totalEntries.get() });
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted on calculating 
ledger space : ", e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException)(e.getCause());
+            } else {
+                throw new IOException("Failed to calculate ledger space : ", 
e.getCause());
+            }
+        }
+        return totalBytes.get();
+    }
+
+    public void close() {
+        // no-op
+    }
+
+    static interface Action<T> {
+        void execute(T item) throws IOException ;
+    }
+
+    static <T> void executeAction(final LinkedBlockingQueue<T> queue,
+                                  final int numThreads,
+                                  final Action<T> action) throws IOException {
+        final CountDownLatch failureLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(queue.size());
+        final AtomicInteger numFailures = new AtomicInteger(0);
+        final AtomicInteger completedThreads = new AtomicInteger(0);
+
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+        try {
+            for (int i = 0 ; i < numThreads; i++) {
+                executorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        while (true) {
+                            T item = queue.poll();
+                            if (null == item) {
+                                break;
+                            }
+                            try {
+                                action.execute(item);
+                            } catch (IOException ioe) {
+                                logger.error("Failed to execute action on item 
'{}'", item, ioe);
+                                numFailures.incrementAndGet();
+                                failureLatch.countDown();
+                                break;
+                            }
+                            doneLatch.countDown();
+                        }
+                        if (numFailures.get() == 0 && 
completedThreads.incrementAndGet() == numThreads) {
+                            failureLatch.countDown();
+                        }
+                    }
+                });
+            }
+            try {
+                failureLatch.await();
+                if (numFailures.get() > 0) {
+                    throw new IOException("Encountered " + numFailures.get() + 
" failures on executing action.");
+                }
+                doneLatch.await();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                logger.warn("Interrupted on executing action", ie);
+                throw new DLInterruptedException("Interrupted on executing 
action", ie);
+            }
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+}

Reply via email to