http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java deleted file mode 100644 index a081606..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/auditor/DLAuditor.java +++ /dev/null @@ -1,630 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.auditor; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.SettableFuture; -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.BookKeeperClientBuilder; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.impl.BKNamespaceDriver; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.distributedlog.namespace.NamespaceDriver; -import com.twitter.distributedlog.util.DLUtils; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeperAccessor; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.meta.LedgerManager; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * DL Auditor will audit DL namespace, e.g. find leaked ledger, report disk usage by streams. - */ -public class DLAuditor { - - private static final Logger logger = LoggerFactory.getLogger(DLAuditor.class); - - private final DistributedLogConfiguration conf; - - public DLAuditor(DistributedLogConfiguration conf) { - this.conf = conf; - } - - private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) { - NamespaceDriver driver = namespace.getNamespaceDriver(); - assert(driver instanceof BKNamespaceDriver); - return ((BKNamespaceDriver) driver).getWriterZKC(); - } - - private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) { - NamespaceDriver driver = namespace.getNamespaceDriver(); - assert(driver instanceof BKNamespaceDriver); - return ((BKNamespaceDriver) driver).getReaderBKC(); - } - - private String validateAndGetZKServers(List<URI> uris) { - URI firstURI = uris.get(0); - String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI); - for (URI uri : uris) { - if (!zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) { - throw new IllegalArgumentException("Uris don't belong to same zookeeper cluster"); - } - } - return zkServers; - } - - private BKDLConfig resolveBKDLConfig(ZooKeeperClient zkc, List<URI> uris) throws IOException { - URI firstURI = uris.get(0); - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, firstURI); - for (URI uri : uris) { - BKDLConfig anotherConfig = BKDLConfig.resolveDLConfig(zkc, uri); - if (!(Objects.equal(bkdlConfig.getBkLedgersPath(), anotherConfig.getBkLedgersPath()) - && Objects.equal(bkdlConfig.getBkZkServersForWriter(), anotherConfig.getBkZkServersForWriter()))) { - throw new IllegalArgumentException("Uris don't use same bookkeeper cluster"); - } - } - return bkdlConfig; - } - - public Pair<Set<Long>, Set<Long>> collectLedgers(List<URI> uris, List<List<String>> allocationPaths) - throws IOException { - Preconditions.checkArgument(uris.size() > 0, "No uri provided to audit"); - - String zkServers = validateAndGetZKServers(uris); - RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), - Integer.MAX_VALUE); - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() - .name("DLAuditor-ZK") - .zkServers(zkServers) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryPolicy(retryPolicy) - .zkAclId(conf.getZkAclId()) - .build(); - ExecutorService executorService = Executors.newCachedThreadPool(); - try { - BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris); - logger.info("Resolved bookkeeper config : {}", bkdlConfig); - - BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder() - .name("DLAuditor-BK") - .dlConfig(conf) - .zkServers(bkdlConfig.getBkZkServersForWriter()) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .build(); - try { - Set<Long> bkLedgers = collectLedgersFromBK(bkc, executorService); - Set<Long> dlLedgers = collectLedgersFromDL(uris, allocationPaths); - return Pair.of(bkLedgers, dlLedgers); - } finally { - bkc.close(); - } - } finally { - zkc.close(); - executorService.shutdown(); - } - } - - /** - * Find leak ledgers phase 1: collect ledgers set. - */ - private Set<Long> collectLedgersFromBK(BookKeeperClient bkc, - final ExecutorService executorService) - throws IOException { - LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get()); - - final Set<Long> ledgers = new HashSet<Long>(); - final SettableFuture<Void> doneFuture = SettableFuture.create(); - - BookkeeperInternalCallbacks.Processor<Long> collector = - new BookkeeperInternalCallbacks.Processor<Long>() { - @Override - public void process(Long lid, - final AsyncCallback.VoidCallback cb) { - synchronized (ledgers) { - ledgers.add(lid); - if (0 == ledgers.size() % 1000) { - logger.info("Collected {} ledgers", ledgers.size()); - } - } - executorService.submit(new Runnable() { - @Override - public void run() { - cb.processResult(BKException.Code.OK, null, null); - } - }); - - } - }; - AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (BKException.Code.OK == rc) { - doneFuture.set(null); - } else { - doneFuture.setException(BKException.create(rc)); - } - } - }; - lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK, - BKException.Code.ZKException); - try { - doneFuture.get(); - logger.info("Collected total {} ledgers", ledgers.size()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new DLInterruptedException("Interrupted on collecting ledgers : ", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException)(e.getCause()); - } else { - throw new IOException("Failed to collect ledgers : ", e.getCause()); - } - } - return ledgers; - } - - /** - * Find leak ledgers phase 2: collect ledgers from uris. - */ - private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths) - throws IOException { - final Set<Long> ledgers = new TreeSet<Long>(); - List<DistributedLogNamespace> namespaces = - new ArrayList<DistributedLogNamespace>(uris.size()); - try { - for (URI uri : uris) { - namespaces.add( - DistributedLogNamespaceBuilder.newBuilder() - .conf(conf) - .uri(uri) - .build()); - } - final CountDownLatch doneLatch = new CountDownLatch(uris.size()); - final AtomicInteger numFailures = new AtomicInteger(0); - ExecutorService executor = Executors.newFixedThreadPool(uris.size()); - try { - int i = 0; - for (final DistributedLogNamespace namespace : namespaces) { - final DistributedLogNamespace dlNamespace = namespace; - final URI uri = uris.get(i); - final List<String> aps = allocationPaths.get(i); - i++; - executor.submit(new Runnable() { - @Override - public void run() { - try { - logger.info("Collecting ledgers from {} : {}", uri, aps); - collectLedgersFromAllocator(uri, namespace, aps, ledgers); - synchronized (ledgers) { - logger.info("Collected {} ledgers from allocators for {} : {} ", - new Object[]{ledgers.size(), uri, ledgers}); - } - collectLedgersFromDL(uri, namespace, ledgers); - } catch (IOException e) { - numFailures.incrementAndGet(); - logger.info("Error to collect ledgers from DL : ", e); - } - doneLatch.countDown(); - } - }); - } - try { - doneLatch.await(); - if (numFailures.get() > 0) { - throw new IOException(numFailures.get() + " errors to collect ledgers from DL"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("Interrupted on collecting ledgers from DL : ", e); - throw new DLInterruptedException("Interrupted on collecting ledgers from DL : ", e); - } - } finally { - executor.shutdown(); - } - } finally { - for (DistributedLogNamespace namespace : namespaces) { - namespace.close(); - } - } - return ledgers; - } - - private void collectLedgersFromAllocator(final URI uri, - final DistributedLogNamespace namespace, - final List<String> allocationPaths, - final Set<Long> ledgers) throws IOException { - final LinkedBlockingQueue<String> poolQueue = - new LinkedBlockingQueue<String>(); - for (String allocationPath : allocationPaths) { - String rootPath = uri.getPath() + "/" + allocationPath; - try { - List<String> pools = getZooKeeperClient(namespace).get().getChildren(rootPath, false); - for (String pool : pools) { - poolQueue.add(rootPath + "/" + pool); - } - } catch (KeeperException e) { - throw new ZKException("Failed to get list of pools from " + rootPath, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new DLInterruptedException("Interrupted on getting list of pools from " + rootPath, e); - } - } - - - logger.info("Collecting ledgers from allocators for {} : {}", uri, poolQueue); - - executeAction(poolQueue, 10, new Action<String>() { - @Override - public void execute(String poolPath) throws IOException { - try { - collectLedgersFromPool(poolPath); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on collecting ledgers from allocation pool " + poolPath, e); - } catch (KeeperException e) { - throw new ZKException("Failed to collect ledgers from allocation pool " + poolPath, e.code()); - } - } - - private void collectLedgersFromPool(String poolPath) - throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException { - List<String> allocators = getZooKeeperClient(namespace).get() - .getChildren(poolPath, false); - for (String allocator : allocators) { - String allocatorPath = poolPath + "/" + allocator; - byte[] data = getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat()); - if (null != data && data.length > 0) { - try { - long ledgerId = DLUtils.bytes2LogSegmentId(data); - synchronized (ledgers) { - ledgers.add(ledgerId); - } - } catch (NumberFormatException nfe) { - logger.warn("Invalid ledger found in allocator path {} : ", allocatorPath, nfe); - } - } - } - } - }); - - logger.info("Collected ledgers from allocators for {}.", uri); - } - - private void collectLedgersFromDL(final URI uri, - final DistributedLogNamespace namespace, - final Set<Long> ledgers) throws IOException { - logger.info("Enumerating {} to collect streams.", uri); - Iterator<String> streams = namespace.getLogs(); - final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); - while (streams.hasNext()) { - streamQueue.add(streams.next()); - } - - logger.info("Collected {} streams from uri {} : {}", - new Object[] { streamQueue.size(), uri, streams }); - - executeAction(streamQueue, 10, new Action<String>() { - @Override - public void execute(String stream) throws IOException { - collectLedgersFromStream(namespace, stream, ledgers); - } - }); - } - - private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace, - String stream, - Set<Long> ledgers) - throws IOException { - DistributedLogManager dlm = namespace.openLog(stream); - try { - List<LogSegmentMetadata> segments = dlm.getLogSegments(); - List<Long> sLedgers = new ArrayList<Long>(); - for (LogSegmentMetadata segment : segments) { - synchronized (ledgers) { - ledgers.add(segment.getLogSegmentId()); - } - sLedgers.add(segment.getLogSegmentId()); - } - return sLedgers; - } finally { - dlm.close(); - } - } - - /** - * Calculating stream space usage from given <i>uri</i>. - * - * @param uri dl uri - * @throws IOException - */ - public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException { - logger.info("Collecting stream space usage for {}.", uri); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(conf) - .uri(uri) - .build(); - try { - return calculateStreamSpaceUsage(uri, namespace); - } finally { - namespace.close(); - } - } - - private Map<String, Long> calculateStreamSpaceUsage( - final URI uri, final DistributedLogNamespace namespace) - throws IOException { - Iterator<String> streams = namespace.getLogs(); - final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); - while (streams.hasNext()) { - streamQueue.add(streams.next()); - } - - final Map<String, Long> streamSpaceUsageMap = - new ConcurrentSkipListMap<String, Long>(); - final AtomicInteger numStreamsCollected = new AtomicInteger(0); - - executeAction(streamQueue, 10, new Action<String>() { - @Override - public void execute(String stream) throws IOException { - streamSpaceUsageMap.put(stream, - calculateStreamSpaceUsage(namespace, stream)); - if (numStreamsCollected.incrementAndGet() % 1000 == 0) { - logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri); - } - } - }); - - return streamSpaceUsageMap; - } - - private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace, - final String stream) throws IOException { - DistributedLogManager dlm = namespace.openLog(stream); - long totalBytes = 0; - try { - List<LogSegmentMetadata> segments = dlm.getLogSegments(); - for (LogSegmentMetadata segment : segments) { - try { - LedgerHandle lh = getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(), - BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); - totalBytes += lh.getLength(); - lh.close(); - } catch (BKException e) { - logger.error("Failed to open ledger {} : ", segment.getLogSegmentId(), e); - throw new IOException("Failed to open ledger " + segment.getLogSegmentId(), e); - } catch (InterruptedException e) { - logger.warn("Interrupted on opening ledger {} : ", segment.getLogSegmentId(), e); - Thread.currentThread().interrupt(); - throw new DLInterruptedException("Interrupted on opening ledger " + segment.getLogSegmentId(), e); - } - } - } finally { - dlm.close(); - } - return totalBytes; - } - - public long calculateLedgerSpaceUsage(URI uri) throws IOException { - List<URI> uris = Lists.newArrayList(uri); - String zkServers = validateAndGetZKServers(uris); - RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), - Integer.MAX_VALUE); - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() - .name("DLAuditor-ZK") - .zkServers(zkServers) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryPolicy(retryPolicy) - .zkAclId(conf.getZkAclId()) - .build(); - ExecutorService executorService = Executors.newCachedThreadPool(); - try { - BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris); - logger.info("Resolved bookkeeper config : {}", bkdlConfig); - - BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder() - .name("DLAuditor-BK") - .dlConfig(conf) - .zkServers(bkdlConfig.getBkZkServersForWriter()) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .build(); - try { - return calculateLedgerSpaceUsage(bkc, executorService); - } finally { - bkc.close(); - } - } finally { - zkc.close(); - executorService.shutdown(); - } - } - - private long calculateLedgerSpaceUsage(BookKeeperClient bkc, - final ExecutorService executorService) - throws IOException { - final AtomicLong totalBytes = new AtomicLong(0); - final AtomicLong totalEntries = new AtomicLong(0); - final AtomicLong numLedgers = new AtomicLong(0); - - LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get()); - - final SettableFuture<Void> doneFuture = SettableFuture.create(); - final BookKeeper bk = bkc.get(); - - BookkeeperInternalCallbacks.Processor<Long> collector = - new BookkeeperInternalCallbacks.Processor<Long>() { - @Override - public void process(final Long lid, - final AsyncCallback.VoidCallback cb) { - numLedgers.incrementAndGet(); - executorService.submit(new Runnable() { - @Override - public void run() { - bk.asyncOpenLedgerNoRecovery(lid, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8), - new org.apache.bookkeeper.client.AsyncCallback.OpenCallback() { - @Override - public void openComplete(int rc, LedgerHandle lh, Object ctx) { - final int cbRc; - if (BKException.Code.OK == rc) { - totalBytes.addAndGet(lh.getLength()); - totalEntries.addAndGet(lh.getLastAddConfirmed() + 1); - cbRc = rc; - } else { - cbRc = BKException.Code.ZKException; - } - executorService.submit(new Runnable() { - @Override - public void run() { - cb.processResult(cbRc, null, null); - } - }); - } - }, null); - } - }); - } - }; - AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (BKException.Code.OK == rc) { - doneFuture.set(null); - } else { - doneFuture.setException(BKException.create(rc)); - } - } - }; - lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK, BKException.Code.ZKException); - try { - doneFuture.get(); - logger.info("calculated {} ledgers\n\ttotal bytes = {}\n\ttotal entries = {}", - new Object[] { numLedgers.get(), totalBytes.get(), totalEntries.get() }); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new DLInterruptedException("Interrupted on calculating ledger space : ", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException)(e.getCause()); - } else { - throw new IOException("Failed to calculate ledger space : ", e.getCause()); - } - } - return totalBytes.get(); - } - - public void close() { - // no-op - } - - static interface Action<T> { - void execute(T item) throws IOException ; - } - - static <T> void executeAction(final LinkedBlockingQueue<T> queue, - final int numThreads, - final Action<T> action) throws IOException { - final CountDownLatch failureLatch = new CountDownLatch(1); - final CountDownLatch doneLatch = new CountDownLatch(queue.size()); - final AtomicInteger numFailures = new AtomicInteger(0); - final AtomicInteger completedThreads = new AtomicInteger(0); - - ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - try { - for (int i = 0 ; i < numThreads; i++) { - executorService.submit(new Runnable() { - @Override - public void run() { - while (true) { - T item = queue.poll(); - if (null == item) { - break; - } - try { - action.execute(item); - } catch (IOException ioe) { - logger.error("Failed to execute action on item '{}'", item, ioe); - numFailures.incrementAndGet(); - failureLatch.countDown(); - break; - } - doneLatch.countDown(); - } - if (numFailures.get() == 0 && completedThreads.incrementAndGet() == numThreads) { - failureLatch.countDown(); - } - } - }); - } - try { - failureLatch.await(); - if (numFailures.get() > 0) { - throw new IOException("Encountered " + numFailures.get() + " failures on executing action."); - } - doneLatch.await(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - logger.warn("Interrupted on executing action", ie); - throw new DLInterruptedException("Interrupted on executing action", ie); - } - } finally { - executorService.shutdown(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java deleted file mode 100644 index 871997f..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/DynamicQuorumConfigProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; - -/** - * Provider returns quorum configs based on dynamic configuration. - */ -public class DynamicQuorumConfigProvider implements QuorumConfigProvider { - - private final DynamicDistributedLogConfiguration conf; - - public DynamicQuorumConfigProvider(DynamicDistributedLogConfiguration conf) { - this.conf = conf; - } - - @Override - public QuorumConfig getQuorumConfig() { - return conf.getQuorumConfig(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java deleted file mode 100644 index 6c3f06e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/ImmutableQuorumConfigProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -/** - * Provider that returns an immutable quorum config. - */ -public class ImmutableQuorumConfigProvider implements QuorumConfigProvider { - - private final QuorumConfig quorumConfig; - - public ImmutableQuorumConfigProvider(QuorumConfig quorumConfig) { - this.quorumConfig = quorumConfig; - } - - @Override - public QuorumConfig getQuorumConfig() { - return quorumConfig; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java deleted file mode 100644 index c14f374..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocator.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -import com.twitter.distributedlog.util.Allocator; -import org.apache.bookkeeper.client.LedgerHandle; - -import java.io.IOException; - -public interface LedgerAllocator extends Allocator<LedgerHandle, Object> { - - /** - * Start the ledger allocator. The implementaion should not be blocking call. - */ - void start() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java deleted file mode 100644 index b76d03a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorDelegator.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -import com.twitter.distributedlog.util.Transaction; -import com.twitter.distributedlog.util.Transaction.OpListener; -import com.twitter.util.Future; -import org.apache.bookkeeper.client.LedgerHandle; - -import java.io.IOException; - -/** - * Delegator of the underlying allocator. If it owns the allocator, it takes - * the responsibility of start the allocator and close the allocator. - */ -public class LedgerAllocatorDelegator implements LedgerAllocator { - - private final LedgerAllocator allocator; - private final boolean ownAllocator; - - /** - * Create an allocator's delegator. - * - * @param allocator - * the underlying allocator - * @param ownAllocator - * whether to own the allocator - */ - public LedgerAllocatorDelegator(LedgerAllocator allocator, - boolean ownAllocator) - throws IOException { - this.allocator = allocator; - this.ownAllocator = ownAllocator; - if (this.ownAllocator) { - this.allocator.start(); - } - } - - @Override - public void start() throws IOException { - // no-op - } - - @Override - public Future<Void> delete() { - return Future.exception(new UnsupportedOperationException("Can't delete an allocator by delegator")); - } - - @Override - public void allocate() throws IOException { - this.allocator.allocate(); - } - - @Override - public Future<LedgerHandle> tryObtain(Transaction<Object> txn, - OpListener<LedgerHandle> listener) { - return this.allocator.tryObtain(txn, listener); - } - - @Override - public Future<Void> asyncClose() { - if (ownAllocator) { - return this.allocator.asyncClose(); - } else { - return Future.value(null); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java deleted file mode 100644 index dd0894e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorPool.java +++ /dev/null @@ -1,458 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -public class LedgerAllocatorPool implements LedgerAllocator { - - static final Logger logger = LoggerFactory.getLogger(LedgerAllocatorPool.class); - - private final DistributedLogConfiguration conf; - private final QuorumConfigProvider quorumConfigProvider; - private final BookKeeperClient bkc; - private final ZooKeeperClient zkc; - private final ScheduledExecutorService scheduledExecutorService; - private final String poolPath; - private final int corePoolSize; - - private final LinkedList<SimpleLedgerAllocator> pendingList = - new LinkedList<SimpleLedgerAllocator>(); - private final LinkedList<SimpleLedgerAllocator> allocatingList = - new LinkedList<SimpleLedgerAllocator>(); - private final Map<String, SimpleLedgerAllocator> rescueMap = - new HashMap<String, SimpleLedgerAllocator>(); - private final Map<LedgerHandle, SimpleLedgerAllocator> obtainMap = - new HashMap<LedgerHandle, SimpleLedgerAllocator>(); - private final Map<SimpleLedgerAllocator, LedgerHandle> reverseObtainMap = - new HashMap<SimpleLedgerAllocator, LedgerHandle>(); - - public LedgerAllocatorPool(String poolPath, int corePoolSize, - DistributedLogConfiguration conf, - ZooKeeperClient zkc, - BookKeeperClient bkc, - ScheduledExecutorService scheduledExecutorService) throws IOException { - this.poolPath = poolPath; - this.corePoolSize = corePoolSize; - this.conf = conf; - this.quorumConfigProvider = - new ImmutableQuorumConfigProvider(conf.getQuorumConfig()); - this.zkc = zkc; - this.bkc = bkc; - this.scheduledExecutorService = scheduledExecutorService; - initializePool(); - } - - @Override - public void start() throws IOException { - for (LedgerAllocator allocator : pendingList) { - // issue allocating requests during initialize - allocator.allocate(); - } - } - - @VisibleForTesting - synchronized int pendingListSize() { - return pendingList.size(); - } - - @VisibleForTesting - synchronized int allocatingListSize() { - return allocatingList.size(); - } - - @VisibleForTesting - public synchronized int obtainMapSize() { - return obtainMap.size(); - } - - @VisibleForTesting - synchronized int rescueSize() { - return rescueMap.size(); - } - - @VisibleForTesting - synchronized SimpleLedgerAllocator getLedgerAllocator(LedgerHandle lh) { - return obtainMap.get(lh); - } - - private void initializePool() throws IOException { - try { - List<String> allocators; - try { - allocators = zkc.get().getChildren(poolPath, false); - } catch (KeeperException.NoNodeException e) { - logger.info("Allocator Pool {} doesn't exist. Creating it.", poolPath); - ZkUtils.createFullPathOptimistic(zkc.get(), poolPath, new byte[0], zkc.getDefaultACL(), - CreateMode.PERSISTENT); - allocators = zkc.get().getChildren(poolPath, false); - } - if (null == allocators) { - allocators = new ArrayList<String>(); - } - if (allocators.size() < corePoolSize) { - createAllocators(corePoolSize - allocators.size()); - allocators = zkc.get().getChildren(poolPath, false); - } - initializeAllocators(allocators); - } catch (InterruptedException ie) { - throw new DLInterruptedException("Interrupted when ensuring " + poolPath + " created : ", ie); - } catch (KeeperException ke) { - throw new IOException("Encountered zookeeper exception when initializing pool " + poolPath + " : ", ke); - } - } - - private void createAllocators(int numAllocators) throws InterruptedException, IOException { - final AtomicInteger numPendings = new AtomicInteger(numAllocators); - final AtomicInteger numFailures = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(1); - AsyncCallback.StringCallback createCallback = new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (KeeperException.Code.OK.intValue() != rc) { - numFailures.incrementAndGet(); - latch.countDown(); - return; - } - if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) { - latch.countDown(); - } - } - }; - for (int i = 0; i < numAllocators; i++) { - zkc.get().create(poolPath + "/A", new byte[0], - zkc.getDefaultACL(), - CreateMode.PERSISTENT_SEQUENTIAL, - createCallback, null); - } - latch.await(); - if (numFailures.get() > 0) { - throw new IOException("Failed to create " + numAllocators + " allocators."); - } - } - - /** - * Initialize simple allocators with given list of allocator names <i>allocators</i>. - * It initializes a simple allocator with its simple allocator path. - */ - private void initializeAllocators(List<String> allocators) throws IOException, InterruptedException { - final AtomicInteger numPendings = new AtomicInteger(allocators.size()); - final AtomicInteger numFailures = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(numPendings.get() > 0 ? 1 : 0); - AsyncCallback.DataCallback dataCallback = new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (KeeperException.Code.OK.intValue() != rc) { - numFailures.incrementAndGet(); - latch.countDown(); - return; - } - Versioned<byte[]> allocatorData = - new Versioned<byte[]>(data, new ZkVersion(stat.getVersion())); - SimpleLedgerAllocator allocator = - new SimpleLedgerAllocator(path, allocatorData, quorumConfigProvider, zkc, bkc); - allocator.start(); - pendingList.add(allocator); - if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) { - latch.countDown(); - } - } - }; - for (String name : allocators) { - String path = poolPath + "/" + name; - zkc.get().getData(path, false, dataCallback, null); - } - latch.await(); - if (numFailures.get() > 0) { - throw new IOException("Failed to initialize allocators : " + allocators); - } - } - - private void scheduleAllocatorRescue(final SimpleLedgerAllocator ledgerAllocator) { - try { - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - try { - rescueAllocator(ledgerAllocator); - } catch (DLInterruptedException dle) { - Thread.currentThread().interrupt(); - } - } - }, conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ree) { - logger.warn("Failed to schedule rescuing ledger allocator {} : ", ledgerAllocator.allocatePath, ree); - } - } - - /** - * Rescue a ledger allocator from an ERROR state - * @param ledgerAllocator - * ledger allocator to rescue - */ - private void rescueAllocator(final SimpleLedgerAllocator ledgerAllocator) throws DLInterruptedException { - SimpleLedgerAllocator oldAllocator; - synchronized (this) { - oldAllocator = rescueMap.put(ledgerAllocator.allocatePath, ledgerAllocator); - } - if (oldAllocator != null) { - logger.info("ledger allocator {} is being rescued.", ledgerAllocator.allocatePath); - return; - } - try { - zkc.get().getData(ledgerAllocator.allocatePath, false, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - boolean retry = false; - SimpleLedgerAllocator newAllocator = null; - if (KeeperException.Code.OK.intValue() == rc) { - Versioned<byte[]> allocatorData = - new Versioned<byte[]>(data, new ZkVersion(stat.getVersion())); - logger.info("Rescuing ledger allocator {}.", path); - newAllocator = new SimpleLedgerAllocator(path, allocatorData, quorumConfigProvider, zkc, bkc); - newAllocator.start(); - logger.info("Rescued ledger allocator {}.", path); - } else if (KeeperException.Code.NONODE.intValue() == rc) { - logger.info("Ledger allocator {} doesn't exist, skip rescuing it.", path); - } else { - retry = true; - } - synchronized (LedgerAllocatorPool.this) { - rescueMap.remove(ledgerAllocator.allocatePath); - if (null != newAllocator) { - pendingList.addLast(newAllocator); - } - } - if (retry) { - scheduleAllocatorRescue(ledgerAllocator); - } - } - }, null); - } catch (InterruptedException ie) { - logger.warn("Interrupted on rescuing ledger allocator {} : ", ledgerAllocator.allocatePath, ie); - synchronized (LedgerAllocatorPool.this) { - rescueMap.remove(ledgerAllocator.allocatePath); - } - throw new DLInterruptedException("Interrupted on rescuing ledger allocator " + ledgerAllocator.allocatePath, ie); - } catch (IOException ioe) { - logger.warn("Failed to rescue ledger allocator {}, retry rescuing it later : ", ledgerAllocator.allocatePath, ioe); - synchronized (LedgerAllocatorPool.this) { - rescueMap.remove(ledgerAllocator.allocatePath); - } - scheduleAllocatorRescue(ledgerAllocator); - } - } - - @Override - public void allocate() throws IOException { - SimpleLedgerAllocator allocator; - synchronized (this) { - if (pendingList.isEmpty()) { - // if no ledger allocator available, we should fail it immediately, which the request will be redirected to other - // proxies - throw new IOException("No ledger allocator available under " + poolPath + "."); - } else { - allocator = pendingList.removeFirst(); - } - } - boolean success = false; - try { - allocator.allocate(); - synchronized (this) { - allocatingList.addLast(allocator); - } - success = true; - } finally { - if (!success) { - rescueAllocator(allocator); - } - } - } - - @Override - public Future<LedgerHandle> tryObtain(final Transaction<Object> txn, - final Transaction.OpListener<LedgerHandle> listener) { - final SimpleLedgerAllocator allocator; - synchronized (this) { - if (allocatingList.isEmpty()) { - return Future.exception(new IOException("No ledger allocator available under " + poolPath + ".")); - } else { - allocator = allocatingList.removeFirst(); - } - } - - final Promise<LedgerHandle> tryObtainPromise = new Promise<LedgerHandle>(); - final FutureEventListener<LedgerHandle> tryObtainListener = new FutureEventListener<LedgerHandle>() { - @Override - public void onSuccess(LedgerHandle lh) { - synchronized (LedgerAllocatorPool.this) { - obtainMap.put(lh, allocator); - reverseObtainMap.put(allocator, lh); - tryObtainPromise.setValue(lh); - } - } - - @Override - public void onFailure(Throwable cause) { - try { - rescueAllocator(allocator); - } catch (IOException ioe) { - logger.info("Failed to rescue allocator {}", allocator.allocatePath, ioe); - } - tryObtainPromise.setException(cause); - } - }; - - allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() { - @Override - public void onCommit(LedgerHandle lh) { - confirmObtain(allocator); - listener.onCommit(lh); - } - - @Override - public void onAbort(Throwable t) { - abortObtain(allocator); - listener.onAbort(t); - } - }).addEventListener(tryObtainListener); - return tryObtainPromise; - } - - void confirmObtain(SimpleLedgerAllocator allocator) { - synchronized (this) { - LedgerHandle lh = reverseObtainMap.remove(allocator); - if (null != lh) { - obtainMap.remove(lh); - } - } - synchronized (this) { - pendingList.addLast(allocator); - } - } - - void abortObtain(SimpleLedgerAllocator allocator) { - synchronized (this) { - LedgerHandle lh = reverseObtainMap.remove(allocator); - if (null != lh) { - obtainMap.remove(lh); - } - } - // if a ledger allocator is aborted, it is better to rescue it. since the ledger allocator might - // already encounter BadVersion exception. - try { - rescueAllocator(allocator); - } catch (DLInterruptedException e) { - logger.warn("Interrupted on rescuing ledger allocator pool {} : ", poolPath, e); - Thread.currentThread().interrupt(); - } - } - - @Override - public Future<Void> asyncClose() { - List<LedgerAllocator> allocatorsToClose; - synchronized (this) { - allocatorsToClose = Lists.newArrayListWithExpectedSize( - pendingList.size() + allocatingList.size() + obtainMap.size()); - for (LedgerAllocator allocator : pendingList) { - allocatorsToClose.add(allocator); - } - for (LedgerAllocator allocator : allocatingList) { - allocatorsToClose.add(allocator); - } - for (LedgerAllocator allocator : obtainMap.values()) { - allocatorsToClose.add(allocator); - } - } - return FutureUtils.processList(allocatorsToClose, new Function<LedgerAllocator, Future<Void>>() { - @Override - public Future<Void> apply(LedgerAllocator allocator) { - return allocator.asyncClose(); - } - }, scheduledExecutorService).map(new AbstractFunction1<List<Void>, Void>() { - @Override - public Void apply(List<Void> values) { - return null; - } - }); - } - - @Override - public Future<Void> delete() { - List<LedgerAllocator> allocatorsToDelete; - synchronized (this) { - allocatorsToDelete = Lists.newArrayListWithExpectedSize( - pendingList.size() + allocatingList.size() + obtainMap.size()); - for (LedgerAllocator allocator : pendingList) { - allocatorsToDelete.add(allocator); - } - for (LedgerAllocator allocator : allocatingList) { - allocatorsToDelete.add(allocator); - } - for (LedgerAllocator allocator : obtainMap.values()) { - allocatorsToDelete.add(allocator); - } - } - return FutureUtils.processList(allocatorsToDelete, new Function<LedgerAllocator, Future<Void>>() { - @Override - public Future<Void> apply(LedgerAllocator allocator) { - return allocator.delete(); - } - }, scheduledExecutorService).flatMap(new AbstractFunction1<List<Void>, Future<Void>>() { - @Override - public Future<Void> apply(List<Void> values) { - return Utils.zkDelete(zkc, poolPath, new ZkVersion(-1)); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java deleted file mode 100644 index 0db6d74..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/LedgerAllocatorUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; - -import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; - -public class LedgerAllocatorUtils { - - /** - * Create ledger allocator pool. - * - * @param poolPath - * ledger allocator pool path. - * @param corePoolSize - * ledger allocator pool core size. - * @param conf - * distributedlog configuration. - * @param zkc - * zookeeper client - * @param bkc - * bookkeeper client - * @return ledger allocator - * @throws IOException - */ - public static LedgerAllocator createLedgerAllocatorPool( - String poolPath, - int corePoolSize, - DistributedLogConfiguration conf, - ZooKeeperClient zkc, - BookKeeperClient bkc, - ScheduledExecutorService scheduledExecutorService) throws IOException { - return new LedgerAllocatorPool(poolPath, corePoolSize, conf, zkc, bkc, scheduledExecutorService); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java deleted file mode 100644 index a9cc16c..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfig.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -import com.google.common.base.Objects; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Configuration for quorums - */ -public class QuorumConfig { - - private static final Logger logger = LoggerFactory.getLogger(QuorumConfig.class); - - private final int ensembleSize; - private final int writeQuorumSize; - private final int ackQuorumSize; - - public QuorumConfig(int ensembleSize, - int writeQuorumSize, - int ackQuorumSize) { - this.ensembleSize = ensembleSize; - if (this.ensembleSize < writeQuorumSize) { - this.writeQuorumSize = this.ensembleSize; - logger.warn("Setting write quorum size {} greater than ensemble size {}", - writeQuorumSize, this.ensembleSize); - } else { - this.writeQuorumSize = writeQuorumSize; - } - if (this.writeQuorumSize < ackQuorumSize) { - this.ackQuorumSize = this.writeQuorumSize; - logger.warn("Setting write ack quorum size {} greater than write quorum size {}", - ackQuorumSize, this.writeQuorumSize); - } else { - this.ackQuorumSize = ackQuorumSize; - } - } - - public int getEnsembleSize() { - return ensembleSize; - } - - public int getWriteQuorumSize() { - return writeQuorumSize; - } - - public int getAckQuorumSize() { - return ackQuorumSize; - } - - @Override - public int hashCode() { - return Objects.hashCode(ensembleSize, writeQuorumSize, ackQuorumSize); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof QuorumConfig)) { - return false; - } - QuorumConfig other = (QuorumConfig) obj; - return ensembleSize == other.ensembleSize - && writeQuorumSize == other.writeQuorumSize - && ackQuorumSize == other.ackQuorumSize; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("QuorumConfig[ensemble=") - .append(ensembleSize).append(", write quorum=") - .append(writeQuorumSize).append(", ack quorum=") - .append(ackQuorumSize).append("]"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java deleted file mode 100644 index 2f65427..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/QuorumConfigProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -/** - * Provider to provide quorum config - */ -public interface QuorumConfigProvider { - - /** - * Get the quorum config for a given log stream. - * - * @return quorum config - */ - QuorumConfig getQuorumConfig(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java deleted file mode 100644 index ab5976e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/SimpleLedgerAllocator.java +++ /dev/null @@ -1,536 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.bk; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.distributedlog.util.Transaction.OpListener; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.zk.ZKTransaction; -import com.twitter.distributedlog.zk.ZKVersionedSetOp; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -/** - * Allocator to allocate ledgers. - */ -public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListener<LedgerHandle>, OpListener<Version> { - - static final Logger LOG = LoggerFactory.getLogger(SimpleLedgerAllocator.class); - - static enum Phase { - ALLOCATING, ALLOCATED, HANDING_OVER, HANDED_OVER, ERROR - } - - static class AllocationException extends IOException { - - private static final long serialVersionUID = -1111397872059426882L; - - private final Phase phase; - - public AllocationException(Phase phase, String msg) { - super(msg); - this.phase = phase; - } - - public Phase getPhase() { - return this.phase; - } - - } - - static class ConcurrentObtainException extends AllocationException { - - private static final long serialVersionUID = -8532471098537176913L; - - public ConcurrentObtainException(Phase phase, String msg) { - super(phase, msg); - } - } - - // zookeeper client - final ZooKeeperClient zkc; - // bookkeeper client - final BookKeeperClient bkc; - // znode path - final String allocatePath; - // allocation phase - Phase phase = Phase.HANDED_OVER; - // version - ZkVersion version = new ZkVersion(-1); - // outstanding allocation - Promise<LedgerHandle> allocatePromise; - // outstanding tryObtain transaction - Transaction<Object> tryObtainTxn = null; - OpListener<LedgerHandle> tryObtainListener = null; - // ledger id left from previous allocation - Long ledgerIdLeftFromPrevAllocation = null; - // Allocated Ledger - LedgerHandle allocatedLh = null; - - Future<Void> closeFuture = null; - final LinkedList<Future<Void>> ledgerDeletions = - new LinkedList<Future<Void>>(); - - // Ledger configuration - private final QuorumConfigProvider quorumConfigProvider; - - static Future<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath, - final ZooKeeperClient zkc) { - return Utils.zkGetData(zkc, allocatePath, false) - .flatMap(new AbstractFunction1<Versioned<byte[]>, Future<Versioned<byte[]>>>() { - @Override - public Future<Versioned<byte[]>> apply(Versioned<byte[]> result) { - if (null != result && null != result.getVersion() && null != result.getValue()) { - return Future.value(result); - } - return createAllocationData(allocatePath, zkc); - } - }); - } - - private static Future<Versioned<byte[]>> createAllocationData(final String allocatePath, - final ZooKeeperClient zkc) { - try { - final Promise<Versioned<byte[]>> promise = new Promise<Versioned<byte[]>>(); - zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES, - zkc.getDefaultACL(), CreateMode.PERSISTENT, - new org.apache.zookeeper.AsyncCallback.Create2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, String name, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - promise.setValue(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES, - new ZkVersion(stat.getVersion()))); - } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - Utils.zkGetData(zkc, allocatePath, false).proxyTo(promise); - } else { - promise.setException(FutureUtils.zkException( - KeeperException.create(KeeperException.Code.get(rc)), allocatePath)); - } - } - }, null); - return promise; - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, allocatePath)); - } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, allocatePath)); - } - } - - public static Future<SimpleLedgerAllocator> of(final String allocatePath, - final Versioned<byte[]> allocationData, - final QuorumConfigProvider quorumConfigProvider, - final ZooKeeperClient zkc, - final BookKeeperClient bkc) { - if (null != allocationData && null != allocationData.getValue() - && null != allocationData.getVersion()) { - return Future.value(new SimpleLedgerAllocator(allocatePath, allocationData, - quorumConfigProvider, zkc, bkc)); - } - return getAndCreateAllocationData(allocatePath, zkc) - .map(new AbstractFunction1<Versioned<byte[]>, SimpleLedgerAllocator>() { - @Override - public SimpleLedgerAllocator apply(Versioned<byte[]> allocationData) { - return new SimpleLedgerAllocator(allocatePath, allocationData, - quorumConfigProvider, zkc, bkc); - } - }); - } - - /** - * Construct a ledger allocator. - * - * @param allocatePath - * znode path to store the allocated ledger. - * @param allocationData - * allocation data. - * @param quorumConfigProvider - * Quorum configuration provider. - * @param zkc - * zookeeper client. - * @param bkc - * bookkeeper client. - */ - public SimpleLedgerAllocator(String allocatePath, - Versioned<byte[]> allocationData, - QuorumConfigProvider quorumConfigProvider, - ZooKeeperClient zkc, - BookKeeperClient bkc) { - this.zkc = zkc; - this.bkc = bkc; - this.allocatePath = allocatePath; - this.quorumConfigProvider = quorumConfigProvider; - initialize(allocationData); - } - - /** - * Initialize the allocator. - * - * @param allocationData - * Allocation Data. - */ - private void initialize(Versioned<byte[]> allocationData) { - setVersion((ZkVersion) allocationData.getVersion()); - byte[] data = allocationData.getValue(); - if (null != data && data.length > 0) { - // delete the allocated ledger since this is left by last allocation. - try { - ledgerIdLeftFromPrevAllocation = DLUtils.bytes2LogSegmentId(data); - } catch (NumberFormatException nfe) { - LOG.warn("Invalid data found in allocator path {} : ", allocatePath, nfe); - } - } - - } - - private synchronized void deleteLedgerLeftFromPreviousAllocationIfNecessary() { - if (null != ledgerIdLeftFromPrevAllocation) { - LOG.info("Deleting allocated-but-unused ledger left from previous allocation {}.", ledgerIdLeftFromPrevAllocation); - deleteLedger(ledgerIdLeftFromPrevAllocation); - ledgerIdLeftFromPrevAllocation = null; - } - } - - @Override - public synchronized void allocate() throws IOException { - if (Phase.ERROR == phase) { - throw new AllocationException(Phase.ERROR, "Error on ledger allocator for " + allocatePath); - } - if (Phase.HANDED_OVER == phase) { - // issue an allocate request when ledger is already handed over. - allocateLedger(); - } - } - - @Override - public synchronized Future<LedgerHandle> tryObtain(final Transaction<Object> txn, - final OpListener<LedgerHandle> listener) { - if (Phase.ERROR == phase) { - return Future.exception(new AllocationException(Phase.ERROR, - "Error on allocating ledger under " + allocatePath)); - } - if (Phase.HANDING_OVER == phase || Phase.HANDED_OVER == phase || null != tryObtainTxn) { - return Future.exception(new ConcurrentObtainException(phase, - "Ledger handle is handling over to another thread : " + phase)); - } - tryObtainTxn = txn; - tryObtainListener = listener; - if (null != allocatedLh) { - completeAllocation(allocatedLh); - } - return allocatePromise; - } - - @Override - public void onCommit(Version r) { - confirmObtain((ZkVersion) r); - } - - private void confirmObtain(ZkVersion zkVersion) { - boolean shouldAllocate = false; - OpListener<LedgerHandle> listenerToNotify = null; - LedgerHandle lhToNotify = null; - synchronized (this) { - if (Phase.HANDING_OVER == phase) { - setPhase(Phase.HANDED_OVER); - setVersion(zkVersion); - listenerToNotify = tryObtainListener; - lhToNotify = allocatedLh; - // reset the state - allocatedLh = null; - allocatePromise = null; - tryObtainTxn = null; - tryObtainListener = null; - // mark flag to issue an allocation request - shouldAllocate = true; - } - } - if (null != listenerToNotify && null != lhToNotify) { - // notify the listener - listenerToNotify.onCommit(lhToNotify); - } - if (shouldAllocate) { - // issue an allocation request - allocateLedger(); - } - } - - @Override - public void onAbort(Throwable t) { - OpListener<LedgerHandle> listenerToNotify; - synchronized (this) { - listenerToNotify = tryObtainListener; - if (t instanceof KeeperException && - ((KeeperException) t).code() == KeeperException.Code.BADVERSION) { - LOG.info("Set ledger allocator {} to ERROR state after hit bad version : version = {}", - allocatePath, getVersion()); - setPhase(Phase.ERROR); - } else { - if (Phase.HANDING_OVER == phase) { - setPhase(Phase.ALLOCATED); - tryObtainTxn = null; - tryObtainListener = null; - } - } - } - if (null != listenerToNotify) { - listenerToNotify.onAbort(t); - } - } - - private synchronized void setPhase(Phase phase) { - this.phase = phase; - LOG.info("Ledger allocator {} moved to phase {} : version = {}.", - new Object[] { allocatePath, phase, version }); - } - - private synchronized void allocateLedger() { - // make sure previous allocation is already handed over. - if (Phase.HANDED_OVER != phase) { - LOG.error("Trying allocate ledger for {} in phase {}, giving up.", allocatePath, phase); - return; - } - setPhase(Phase.ALLOCATING); - allocatePromise = new Promise<LedgerHandle>(); - QuorumConfig quorumConfig = quorumConfigProvider.getQuorumConfig(); - bkc.createLedger( - quorumConfig.getEnsembleSize(), - quorumConfig.getWriteQuorumSize(), - quorumConfig.getAckQuorumSize() - ).addEventListener(this); - } - - private synchronized void completeAllocation(LedgerHandle lh) { - allocatedLh = lh; - if (null == tryObtainTxn) { - return; - } - org.apache.zookeeper.Op zkSetDataOp = org.apache.zookeeper.Op.setData( - allocatePath, DistributedLogConstants.EMPTY_BYTES, version.getZnodeVersion()); - ZKVersionedSetOp commitOp = new ZKVersionedSetOp(zkSetDataOp, this); - tryObtainTxn.addOp(commitOp); - setPhase(Phase.HANDING_OVER); - FutureUtils.setValue(allocatePromise, lh); - } - - private synchronized void failAllocation(Throwable cause) { - FutureUtils.setException(allocatePromise, cause); - } - - @Override - public void onSuccess(LedgerHandle lh) { - // a ledger is created, update the ledger to allocation path before handling it over for usage. - markAsAllocated(lh); - } - - @Override - public void onFailure(Throwable cause) { - LOG.error("Error creating ledger for allocating {} : ", allocatePath, cause); - setPhase(Phase.ERROR); - failAllocation(cause); - } - - private synchronized ZkVersion getVersion() { - return version; - } - - private synchronized void setVersion(ZkVersion newVersion) { - Version.Occurred occurred = newVersion.compare(version); - if (occurred == Version.Occurred.AFTER) { - LOG.info("Ledger allocator for {} moved version from {} to {}.", - new Object[] { allocatePath, version, newVersion }); - version = newVersion; - } else { - LOG.warn("Ledger allocator for {} received an old version {}, current version is {}.", - new Object[] { allocatePath, newVersion , version }); - } - } - - private void markAsAllocated(final LedgerHandle lh) { - byte[] data = DLUtils.logSegmentId2Bytes(lh.getId()); - Utils.zkSetData(zkc, allocatePath, data, getVersion()) - .addEventListener(new FutureEventListener<ZkVersion>() { - @Override - public void onSuccess(ZkVersion version) { - // we only issue deleting ledger left from previous allocation when we could allocate first ledger - // as zookeeper version could prevent us doing stupid things. - deleteLedgerLeftFromPreviousAllocationIfNecessary(); - setVersion(version); - setPhase(Phase.ALLOCATED); - // complete the allocation after it is marked as allocated - completeAllocation(lh); - } - - @Override - public void onFailure(Throwable cause) { - setPhase(Phase.ERROR); - deleteLedger(lh.getId()); - LOG.error("Fail mark ledger {} as allocated under {} : ", - new Object[] { lh.getId(), allocatePath, cause }); - // fail the allocation since failed to mark it as allocated - failAllocation(cause); - } - }); - } - - void deleteLedger(final long ledgerId) { - final Future<Void> deleteFuture = bkc.deleteLedger(ledgerId, true); - synchronized (ledgerDeletions) { - ledgerDeletions.add(deleteFuture); - } - deleteFuture.onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - LOG.error("Error deleting ledger {} for ledger allocator {}, retrying : ", - new Object[] { ledgerId, allocatePath, cause }); - if (!isClosing()) { - deleteLedger(ledgerId); - } - return BoxedUnit.UNIT; - } - }).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - synchronized (ledgerDeletions) { - ledgerDeletions.remove(deleteFuture); - } - return BoxedUnit.UNIT; - } - }); - } - - private synchronized boolean isClosing() { - return closeFuture != null; - } - - private Future<Void> closeInternal(boolean cleanup) { - Promise<Void> closePromise; - synchronized (this) { - if (null != closeFuture) { - return closeFuture; - } - closePromise = new Promise<Void>(); - closeFuture = closePromise; - } - if (!cleanup) { - LOG.info("Abort ledger allocator without cleaning up on {}.", allocatePath); - FutureUtils.setValue(closePromise, null); - return closePromise; - } - cleanupAndClose(closePromise); - return closePromise; - } - - private void cleanupAndClose(final Promise<Void> closePromise) { - LOG.info("Closing ledger allocator on {}.", allocatePath); - final ZKTransaction txn = new ZKTransaction(zkc); - // try obtain ledger handle - tryObtain(txn, new OpListener<LedgerHandle>() { - @Override - public void onCommit(LedgerHandle r) { - // no-op - complete(); - } - - @Override - public void onAbort(Throwable t) { - // no-op - complete(); - } - - private void complete() { - FutureUtils.setValue(closePromise, null); - LOG.info("Closed ledger allocator on {}.", allocatePath); - } - }).addEventListener(new FutureEventListener<LedgerHandle>() { - @Override - public void onSuccess(LedgerHandle lh) { - // try obtain succeed - // if we could obtain the ledger handle, we have the responsibility to close it - deleteLedger(lh.getId()); - // wait for deletion to be completed - List<Future<Void>> outstandingDeletions; - synchronized (ledgerDeletions) { - outstandingDeletions = Lists.newArrayList(ledgerDeletions); - } - Future.collect(outstandingDeletions).addEventListener(new FutureEventListener<List<Void>>() { - @Override - public void onSuccess(List<Void> values) { - txn.execute(); - } - - @Override - public void onFailure(Throwable cause) { - LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause); - FutureUtils.setValue(closePromise, null); - } - }); - } - - @Override - public void onFailure(Throwable cause) { - LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause); - FutureUtils.setValue(closePromise, null); - } - }); - - } - - @Override - public void start() { - // nop - } - - @Override - public Future<Void> asyncClose() { - return closeInternal(false); - } - - @Override - public Future<Void> delete() { - return closeInternal(true).flatMap(new AbstractFunction1<Void, Future<Void>>() { - @Override - public Future<Void> apply(Void value) { - return Utils.zkDelete(zkc, allocatePath, getVersion()); - } - }); - } - -}