DL-163: clean up direct zookeeper and bookkeeper usage and use metadata/data store abstraction
- introduce NamespaceDriver class to manage metadata/data store abstractions - clean up direct zookeeper and bookkeeper usage and use metadata/data store abstraction - separate MetadataAccessor from DistributedLogManager - remove deprecated classes Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/cfc049cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/cfc049cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/cfc049cd Branch: refs/heads/master Commit: cfc049cd045b73ca96fb328877b0acf4e75c95ab Parents: d871e65 Author: Sijie Guo <sij...@twitter.com> Authored: Wed Dec 28 17:47:12 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:13:30 2016 -0800 ---------------------------------------------------------------------- .../benchmark/stream/LedgerReadBenchmark.java | 2 +- .../distributedlog/BKAsyncLogReader.java | 2 - .../distributedlog/BKAsyncLogWriter.java | 1 - .../distributedlog/BKDistributedLogManager.java | 454 ++-------- .../BKDistributedLogNamespace.java | 846 ++----------------- .../distributedlog/BKLogWriteHandler.java | 58 +- .../distributedlog/BookKeeperClient.java | 22 +- .../DistributedLogConfiguration.java | 9 +- .../distributedlog/DistributedLogManager.java | 28 +- .../DistributedLogManagerFactory.java | 202 ----- .../distributedlog/LocalDLMEmulator.java | 2 +- .../distributedlog/ReadAheadEntryReader.java | 18 +- .../distributedlog/ZKMetadataAccessor.java | 259 ------ .../distributedlog/ZooKeeperClientBuilder.java | 4 +- .../distributedlog/acl/ZKAccessControl.java | 229 ----- .../acl/ZKAccessControlManager.java | 373 -------- .../admin/DistributedLogAdmin.java | 85 +- .../distributedlog/auditor/DLAuditor.java | 103 +-- .../twitter/distributedlog/impl/BKDLUtils.java | 100 --- .../distributedlog/impl/BKNamespaceDriver.java | 631 ++++++++++++++ .../distributedlog/impl/ZKLogMetadataStore.java | 2 +- .../distributedlog/impl/ZKMetadataAccessor.java | 264 ++++++ .../distributedlog/impl/ZKNamespaceWatcher.java | 2 +- .../impl/acl/ZKAccessControl.java | 232 +++++ .../impl/acl/ZKAccessControlManager.java | 374 ++++++++ .../impl/logsegment/BKLogSegmentAllocator.java | 85 ++ .../logsegment/BKLogSegmentEntryReader.java | 27 +- .../impl/logsegment/BKLogSegmentEntryStore.java | 63 +- .../impl/metadata/BKDLConfig.java | 400 +++++++++ .../impl/metadata/ZkMetadataResolver.java | 72 ++ .../subscription/ZKSubscriptionStateStore.java | 121 +++ .../impl/subscription/ZKSubscriptionsStore.java | 164 ++++ .../logsegment/LogSegmentEntryReader.java | 25 + .../logsegment/LogSegmentEntryStore.java | 16 +- .../distributedlog/metadata/BKDLConfig.java | 399 --------- .../distributedlog/metadata/DLMetadata.java | 1 + .../metadata/ZkMetadataResolver.java | 70 -- .../namespace/DistributedLogNamespace.java | 7 + .../DistributedLogNamespaceBuilder.java | 143 +++- .../namespace/NamespaceDriver.java | 138 +++ .../namespace/NamespaceDriverManager.java | 180 ++++ .../subscription/ZKSubscriptionStateStore.java | 120 --- .../subscription/ZKSubscriptionsStore.java | 165 ---- .../tools/DistributedLogTool.java | 177 ++-- .../twitter/distributedlog/util/DLUtils.java | 124 ++- .../distributedlog/util/FutureUtils.java | 6 +- .../com/twitter/distributedlog/DLMTestUtil.java | 120 +-- .../distributedlog/TestAsyncReaderLock.java | 29 +- .../distributedlog/TestAsyncReaderWriter.java | 14 +- .../TestBKDistributedLogManager.java | 99 +-- .../TestBKDistributedLogNamespace.java | 68 +- .../distributedlog/TestBKLogReadHandler.java | 23 +- .../distributedlog/TestBKLogSegmentWriter.java | 6 +- .../distributedlog/TestBKLogWriteHandler.java | 4 +- .../distributedlog/TestDistributedLogBase.java | 117 ++- .../distributedlog/TestFailureAndRecovery.java | 257 ------ .../distributedlog/TestInterleavedReaders.java | 70 -- .../distributedlog/TestLogSegmentsZK.java | 24 +- .../distributedlog/TestNonBlockingReads.java | 3 + .../TestReadAheadEntryReader.java | 52 +- .../distributedlog/TestRollLogSegments.java | 5 +- .../distributedlog/acl/TestZKAccessControl.java | 5 +- .../acl/TestZKAccessControlManager.java | 2 + .../twitter/distributedlog/admin/TestDLCK.java | 34 +- .../admin/TestDistributedLogAdmin.java | 32 +- .../impl/TestZKLogSegmentMetadataStore.java | 2 +- .../impl/TestZKNamespaceWatcher.java | 2 +- .../TestFederatedZKLogMetadataStore.java | 3 +- .../logsegment/TestBKLogSegmentEntryReader.java | 22 +- .../metadata/TestZKLogStreamMetadataStore.java | 14 +- .../impl/metadata/TestZkMetadataResolver.java | 203 +++++ .../distributedlog/metadata/TestDLMetadata.java | 1 + .../metadata/TestZkMetadataResolver.java | 200 ----- .../service/DistributedLogCluster.java | 2 +- .../service/balancer/BalancerTool.java | 6 +- .../placement/ZKPlacementStateManager.java | 11 +- .../service/TestDistributedLogServerBase.java | 4 +- .../mapreduce/DistributedLogInputFormat.java | 14 +- 78 files changed, 3949 insertions(+), 4304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java index 0f4d3d4..072c3ef 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java @@ -25,7 +25,7 @@ import com.twitter.distributedlog.DistributedLogManager; import com.twitter.distributedlog.LogSegmentMetadata; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import java.io.IOException; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java index b9d0365..cebbc33 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java @@ -594,8 +594,6 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } lastProcessTime.reset().start(); - lastProcessTime.reset().start(); - // If the oldest pending promise is interrupted then we must mark // the reader in error and abort all pending reads since we dont // know the last consumed read http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java index a6b5fd2..9432e8a 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java @@ -24,7 +24,6 @@ import com.twitter.distributedlog.exceptions.StreamNotReadyException; import com.twitter.distributedlog.exceptions.WriteCancelledException; import com.twitter.distributedlog.exceptions.WriteException; import com.twitter.distributedlog.feature.CoreFeatureKeys; -import com.twitter.distributedlog.stats.OpStatsListener; import com.twitter.distributedlog.util.FailpointUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.util.Future; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index d20cc6a..a3959b0 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -20,12 +20,6 @@ package com.twitter.distributedlog; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; -import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider; -import com.twitter.distributedlog.bk.LedgerAllocator; -import com.twitter.distributedlog.bk.LedgerAllocatorDelegator; -import com.twitter.distributedlog.bk.QuorumConfigProvider; -import com.twitter.distributedlog.bk.SimpleLedgerAllocator; import com.twitter.distributedlog.callback.LogSegmentListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.AlreadyClosedException; @@ -34,27 +28,22 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.CloseAsyncCloseableFunction; import com.twitter.distributedlog.function.GetVersionedValueFunction; -import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.injector.AsyncRandomFailureInjector; import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; import com.twitter.distributedlog.metadata.LogMetadataForReader; import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.lock.NopDistributedLock; import com.twitter.distributedlog.lock.ZKDistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.LogStreamMetadataStore; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.stats.BroadCastStatsLogger; -import com.twitter.distributedlog.subscription.SubscriptionStateStore; import com.twitter.distributedlog.subscription.SubscriptionsStore; -import com.twitter.distributedlog.subscription.ZKSubscriptionStateStore; -import com.twitter.distributedlog.subscription.ZKSubscriptionsStore; -import com.twitter.distributedlog.util.ConfUtils; +import com.twitter.distributedlog.util.Allocator; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.MonitoredFuturePool; @@ -65,24 +54,20 @@ import com.twitter.distributedlog.util.SchedulerUtils; import com.twitter.distributedlog.util.Utils; import com.twitter.util.ExceptionalFunction; import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.ExecutorServiceFuturePool; import com.twitter.util.Function; import com.twitter.util.Future; -import com.twitter.util.FuturePool; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; +import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.util.HashSet; @@ -91,6 +76,9 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.READER; +import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.WRITER; + /** * <h3>Metrics</h3> * <ul> @@ -112,7 +100,7 @@ import java.util.concurrent.TimeUnit; * for details. * </ul> */ -class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedLogManager { +class BKDistributedLogManager implements DistributedLogManager { static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class); static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = @@ -131,44 +119,28 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL } }; + private final URI uri; + private final String name; private final String clientId; private final int regionId; private final String streamIdentifier; private final DistributedLogConfiguration conf; private final DynamicDistributedLogConfiguration dynConf; + private final NamespaceDriver driver; private Promise<Void> closePromise; private final OrderedScheduler scheduler; - private final OrderedScheduler readAheadScheduler; - private boolean ownExecutor; private final FeatureProvider featureProvider; + private final AsyncFailureInjector failureInjector; private final StatsLogger statsLogger; private final StatsLogger perLogStatsLogger; final AlertStatsLogger alertStatsLogger; - // log stream metadata stores - private final LogStreamMetadataStore writerMetadataStore; - private final LogStreamMetadataStore readerMetadataStore; // log segment metadata cache private final LogSegmentMetadataCache logSegmentMetadataCache; - // bookkeeper clients - // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by - // {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to - // keep builders and their client wrappers here, as they will be used when - // instantiating readers or writers. - private final BookKeeperClientBuilder writerBKCBuilder; - private final BookKeeperClient writerBKC; - private final LogSegmentEntryStore writerEntryStore; - private final boolean ownWriterBKC; - private final BookKeeperClientBuilder readerBKCBuilder; - private final BookKeeperClient readerBKC; - private final LogSegmentEntryStore readerEntryStore; - private final boolean ownReaderBKC; - // // Writer Related Variables // - private final LedgerAllocator ledgerAllocator; private final PermitLimiter writeLimiter; // @@ -176,92 +148,26 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL /// // read handler for listener. private BKLogReadHandler readHandlerForListener = null; - private FuturePool readerFuturePool = null; private final PendingReaders pendingReaders; - // Failure Injector - private final AsyncFailureInjector failureInjector; - - /** - * Create a DLM for testing. - * - * @param name log name - * @param conf distributedlog configuration - * @param uri uri location for the log - * @param writerZKCBuilder zookeeper builder for writers - * @param readerZKCBuilder zookeeper builder for readers - * @param zkcForWriterBKC zookeeper builder for bookkeeper shared by writers - * @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers - * @param writerBKCBuilder bookkeeper builder for writers - * @param readerBKCBuilder bookkeeper builder for readers - * @param featureProvider provider to offer features - * @param writeLimiter write limiter - * @param statsLogger stats logger to receive stats - * @throws IOException - */ - BKDistributedLogManager(String name, - DistributedLogConfiguration conf, - URI uri, - ZooKeeperClientBuilder writerZKCBuilder, - ZooKeeperClientBuilder readerZKCBuilder, - ZooKeeperClient zkcForWriterBKC, - ZooKeeperClient zkcForReaderBKC, - BookKeeperClientBuilder writerBKCBuilder, - BookKeeperClientBuilder readerBKCBuilder, - FeatureProvider featureProvider, - PermitLimiter writeLimiter, - StatsLogger statsLogger) throws IOException { - this(name, - conf, - ConfUtils.getConstDynConf(conf), - uri, - writerZKCBuilder, - readerZKCBuilder, - zkcForWriterBKC, - zkcForReaderBKC, - writerBKCBuilder, - readerBKCBuilder, - null, - null, - new LogSegmentMetadataCache(conf, Ticker.systemTicker()), - OrderedScheduler.newBuilder().name("BKDL-" + name).corePoolSize(1).build(), - null, - null, - null, - DistributedLogConstants.UNKNOWN_CLIENT_ID, - DistributedLogConstants.LOCAL_REGION_ID, - null, - writeLimiter, - featureProvider, - statsLogger, - NullStatsLogger.INSTANCE); - this.ownExecutor = true; - } + // resource to close + private final Optional<AsyncCloseable> resourcesCloseable; /** * Create a {@link DistributedLogManager} with supplied resources. * * @param name log name * @param conf distributedlog configuration + * @param dynConf dynamic distributedlog configuration * @param uri uri location for the log - * @param writerZKCBuilder zookeeper builder for writers - * @param readerZKCBuilder zookeeper builder for readers - * @param zkcForWriterBKC zookeeper builder for bookkeeper shared by writers - * @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers - * @param writerBKCBuilder bookkeeper builder for writers - * @param readerBKCBuilder bookkeeper builder for readers - * @param writerMetadataStore writer metadata store - * @param readerMetadataStore reader metadata store + * @param driver namespace driver + * @param logSegmentMetadataCache log segment metadata cache * @param scheduler ordered scheduled used by readers and writers - * @param readAheadScheduler readAhead scheduler used by readers - * @param channelFactory client socket channel factory to build bookkeeper clients - * @param requestTimer request timer to build bookkeeper clients * @param clientId client id that used to initiate the locks * @param regionId region id that would be encrypted as part of log segment metadata * to indicate which region that the log segment will be created - * @param ledgerAllocator ledger allocator to allocate ledgers - * @param featureProvider provider to offer features * @param writeLimiter write limiter + * @param featureProvider provider to offer features * @param statsLogger stats logger to receive stats * @param perLogStatsLogger stats logger to receive per log stats * @throws IOException @@ -270,148 +176,48 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL DistributedLogConfiguration conf, DynamicDistributedLogConfiguration dynConf, URI uri, - ZooKeeperClientBuilder writerZKCBuilder, - ZooKeeperClientBuilder readerZKCBuilder, - ZooKeeperClient zkcForWriterBKC, - ZooKeeperClient zkcForReaderBKC, - BookKeeperClientBuilder writerBKCBuilder, - BookKeeperClientBuilder readerBKCBuilder, - LogStreamMetadataStore writerMetadataStore, - LogStreamMetadataStore readerMetadataStore, + NamespaceDriver driver, LogSegmentMetadataCache logSegmentMetadataCache, OrderedScheduler scheduler, - OrderedScheduler readAheadScheduler, - ClientSocketChannelFactory channelFactory, - HashedWheelTimer requestTimer, String clientId, Integer regionId, - LedgerAllocator ledgerAllocator, PermitLimiter writeLimiter, FeatureProvider featureProvider, + AsyncFailureInjector failureInjector, StatsLogger statsLogger, - StatsLogger perLogStatsLogger) throws IOException { - super(name, conf, uri, writerZKCBuilder, readerZKCBuilder, statsLogger); + StatsLogger perLogStatsLogger, + Optional<AsyncCloseable> resourcesCloseable) { + this.name = name; this.conf = conf; this.dynConf = dynConf; + this.uri = uri; + this.driver = driver; + this.logSegmentMetadataCache = logSegmentMetadataCache; this.scheduler = scheduler; - this.readAheadScheduler = null == readAheadScheduler ? scheduler : readAheadScheduler; this.statsLogger = statsLogger; this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger); - this.ownExecutor = false; this.pendingReaders = new PendingReaders(scheduler); this.regionId = regionId; this.clientId = clientId; this.streamIdentifier = conf.getUnpartitionedStreamName(); - this.ledgerAllocator = ledgerAllocator; this.writeLimiter = writeLimiter; - - // Failure Injection - this.failureInjector = AsyncRandomFailureInjector.newBuilder() - .injectDelays(conf.getEIInjectReadAheadDelay(), - conf.getEIInjectReadAheadDelayPercent(), - conf.getEIInjectMaxReadAheadDelayMs()) - .injectErrors(false, 10) - .injectStops(conf.getEIInjectReadAheadStall(), 10) - .injectCorruption(conf.getEIInjectReadAheadBrokenEntries()) - .build(); - - if (null == writerMetadataStore) { - this.writerMetadataStore = new ZKLogStreamMetadataStore( - clientId, - conf, - writerZKC, - scheduler, - statsLogger); - } else { - this.writerMetadataStore = writerMetadataStore; - } - if (null == readerMetadataStore) { - this.readerMetadataStore = new ZKLogStreamMetadataStore( - clientId, - conf, - readerZKC, - scheduler, - statsLogger); - } else { - this.readerMetadataStore = readerMetadataStore; - } - this.logSegmentMetadataCache = logSegmentMetadataCache; - - // create the bkc for writers - if (null == writerBKCBuilder) { - // resolve uri - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, uri); - BKDLConfig.propagateConfiguration(bkdlConfig, conf); - this.writerBKCBuilder = BookKeeperClientBuilder.newBuilder() - .dlConfig(conf) - .name(String.format("bk:%s:dlm_writer_shared", name)) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .channelFactory(channelFactory) - .requestTimer(requestTimer) - .statsLogger(statsLogger); - if (null == zkcForWriterBKC) { - this.writerBKCBuilder.zkServers(bkdlConfig.getBkZkServersForWriter()); - } else { - this.writerBKCBuilder.zkc(zkcForWriterBKC); - } - this.ownWriterBKC = true; - } else { - this.writerBKCBuilder = writerBKCBuilder; - this.ownWriterBKC = false; - } - this.writerBKC = this.writerBKCBuilder.build(); - this.writerEntryStore = new BKLogSegmentEntryStore( - conf, - writerBKC, - scheduler, - statsLogger, - failureInjector); - - // create the bkc for readers - if (null == readerBKCBuilder) { - // resolve uri - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, uri); - BKDLConfig.propagateConfiguration(bkdlConfig, conf); - if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) { - this.readerBKCBuilder = this.writerBKCBuilder; - this.ownReaderBKC = false; - } else { - this.readerBKCBuilder = BookKeeperClientBuilder.newBuilder() - .dlConfig(conf) - .name(String.format("bk:%s:dlm_reader_shared", name)) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .channelFactory(channelFactory) - .requestTimer(requestTimer) - .statsLogger(statsLogger); - if (null == zkcForReaderBKC) { - this.readerBKCBuilder.zkServers(bkdlConfig.getBkZkServersForReader()); - } else { - this.readerBKCBuilder.zkc(zkcForReaderBKC); - } - this.ownReaderBKC = true; - } - } else { - this.readerBKCBuilder = readerBKCBuilder; - this.ownReaderBKC = false; - } - this.readerBKC = this.readerBKCBuilder.build(); - this.readerEntryStore = new BKLogSegmentEntryStore( - conf, - readerBKC, - scheduler, - statsLogger, - failureInjector); - // Feature Provider this.featureProvider = featureProvider; - + // Failure Injector + this.failureInjector = failureInjector; // Stats this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert"); + this.resourcesCloseable = resourcesCloseable; } - @VisibleForTesting - LogStreamMetadataStore getWriterMetadataStore() { - return writerMetadataStore; + @Override + public String getStreamName() { + return name; + } + + @Override + public NamespaceDriver getNamespaceDriver() { + return driver; } URI getUri() { @@ -426,23 +232,22 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return scheduler; } - @VisibleForTesting - BookKeeperClient getWriterBKC() { - return this.writerBKC; + AsyncFailureInjector getFailureInjector() { + return failureInjector; } - @VisibleForTesting - BookKeeperClient getReaderBKC() { - return this.readerBKC; - } + // + // Test Methods + // - LogSegmentEntryStore getReaderEntryStore() { - return this.readerEntryStore; + @VisibleForTesting + LogStreamMetadataStore getWriterMetadataStore() { + return driver.getLogStreamMetadataStore(WRITER); } @VisibleForTesting - FuturePool getReaderFuturePool() { - return this.readerFuturePool; + LogSegmentEntryStore getReaderEntryStore() { + return driver.getLogSegmentEntryStore(READER); } @VisibleForTesting @@ -450,10 +255,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return this.featureProvider; } - AsyncFailureInjector getFailureInjector() { - return this.failureInjector; - } - private synchronized BKLogReadHandler getReadHandlerAndRegisterListener( boolean create, LogSegmentListener listener) { if (null == readHandlerForListener && create) { @@ -502,12 +303,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager"); } } - if (null != writerBKC) { - writerBKC.checkClosedOrInError(); - } - if (null != readerBKC) { - readerBKC.checkClosedOrInError(); - } } // Create Read Handler @@ -538,9 +333,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL subscriberId, conf, dynConf, - readerMetadataStore, + driver.getLogStreamMetadataStore(READER), logSegmentMetadataCache, - readerEntryStore, + driver.getLogSegmentEntryStore(READER), scheduler, alertStatsLogger, statsLogger, @@ -552,23 +347,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL // Create Ledger Allocator - LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata) throws IOException { - LedgerAllocator ledgerAllocatorDelegator; - if (!dynConf.getEnableLedgerAllocatorPool()) { - QuorumConfigProvider quorumConfigProvider = - new DynamicQuorumConfigProvider(dynConf); - LedgerAllocator allocator = new SimpleLedgerAllocator( - logMetadata.getAllocationPath(), - logMetadata.getAllocationData(), - quorumConfigProvider, - writerZKC, - writerBKC); - ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, true); - } else { - ledgerAllocatorDelegator = ledgerAllocator; - } - return ledgerAllocatorDelegator; - } + // Create Write Handler @@ -578,14 +357,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL } Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) { - boolean ownAllocator = null == ledgerAllocator; - // Fetching Log Metadata (create if not exists) - return writerMetadataStore.getLog( + return driver.getLogStreamMetadataStore(WRITER).getLog( uri, name, - ownAllocator, - conf.getCreateStreamIfNotExists() || ownAllocator + true, + conf.getCreateStreamIfNotExists() ).flatMap(new AbstractFunction1<LogMetadataForWriter, Future<BKLogWriteHandler>>() { @Override public Future<BKLogWriteHandler> apply(LogMetadataForWriter logMetadata) { @@ -602,16 +379,17 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL // Build the locks DistributedLock lock; if (conf.isWriteLockEnabled()) { - lock = writerMetadataStore.createWriteLock(logMetadata); + lock = driver.getLogStreamMetadataStore(WRITER).createWriteLock(logMetadata); } else { lock = NopDistributedLock.INSTANCE; } - // Build the ledger allocator - LedgerAllocator allocator; + + Allocator<LogSegmentEntryWriter, Object> segmentAllocator; try { - allocator = createLedgerAllocator(logMetadata); - } catch (IOException e) { - FutureUtils.setException(createPromise, e); + segmentAllocator = driver.getLogSegmentEntryStore(WRITER) + .newLogSegmentAllocator(logMetadata, dynConf); + } catch (IOException ioe) { + FutureUtils.setException(createPromise, ioe); return; } @@ -619,11 +397,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL final BKLogWriteHandler writeHandler = new BKLogWriteHandler( logMetadata, conf, - writerMetadataStore, + driver.getLogStreamMetadataStore(WRITER), logSegmentMetadataCache, - writerEntryStore, + driver.getLogSegmentEntryStore(WRITER), scheduler, - allocator, + segmentAllocator, statsLogger, perLogStatsLogger, alertStatsLogger, @@ -657,12 +435,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL } PermitManager getLogSegmentRollingPermitManager() { - return writerMetadataStore.getPermitManager(); + return driver.getLogStreamMetadataStore(WRITER).getPermitManager(); } <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) { - initializeFuturePool(false); - return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() { + return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() { @Override public BKLogReadHandler applyE() throws Throwable { return getReadHandlerAndRegisterListener(true, null); @@ -822,7 +599,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL fromTxnId, segmentIdx, segments, - readerEntryStore + driver.getLogSegmentEntryStore(READER) ); } @@ -1010,8 +787,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.", subscriberId.get(), name); // we acquired lock - final SubscriptionStateStore stateStore = getSubscriptionStateStore(subscriberId.get()); - return stateStore.getLastCommitPosition().map(new ExceptionalFunction<DLSN, AsyncLogReader>() { + final SubscriptionsStore subscriptionsStore = driver.getSubscriptionsStore(getStreamName()); + return subscriptionsStore.getLastCommitPosition(subscriberId.get()) + .map(new ExceptionalFunction<DLSN, AsyncLogReader>() { @Override public AsyncLogReader applyE(DLSN lastCommitPosition) throws UnexpectedException { LOG.info("Reader {} @ {} positioned to last commit position {}.", @@ -1226,7 +1004,8 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL */ @Override public void delete() throws IOException { - FutureUtils.result(writerMetadataStore.deleteLog(uri, getStreamName())); + FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) + .deleteLog(uri, getStreamName())); } /** @@ -1297,43 +1076,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL readHandlerToClose = readHandlerForListener; } - // NOTE: the resources {scheduler, writerBKC, readerBKC} are mostly from namespace instance. - // so they are not blocking call except tests. - AsyncCloseable resourcesCloseable = new AsyncCloseable() { - @Override - public Future<Void> asyncClose() { - int schedTimeout = conf.getSchedulerShutdownTimeoutMs(); - - // Clean up executor state. - if (ownExecutor) { - SchedulerUtils.shutdownScheduler(scheduler, schedTimeout, TimeUnit.MILLISECONDS); - LOG.info("Stopped BKDL executor service for {}.", name); - - if (scheduler != readAheadScheduler) { - SchedulerUtils.shutdownScheduler(readAheadScheduler, schedTimeout, TimeUnit.MILLISECONDS); - LOG.info("Stopped BKDL ReadAhead Executor Service for {}.", name); - } - } - if (ownWriterBKC) { - writerBKC.close(); - } - if (ownReaderBKC) { - readerBKC.close(); - } - return Future.Void(); - } - }; - Future<Void> closeResult = Utils.closeSequence(null, true, readHandlerToClose, pendingReaders, - resourcesCloseable, - new AsyncCloseable() { - @Override - public Future<Void> asyncClose() { - return BKDistributedLogManager.super.asyncClose(); - } - }); + resourcesCloseable.or(AsyncCloseable.NULL)); closeResult.proxyTo(closeFuture); return closeFuture; } @@ -1343,70 +1089,18 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL FutureUtils.result(asyncClose()); } - private FuturePool buildFuturePool(ExecutorService executorService, - StatsLogger statsLogger) { - FuturePool futurePool = new ExecutorServiceFuturePool(executorService); - return new MonitoredFuturePool( - futurePool, - statsLogger, - conf.getEnableTaskExecutionStats(), - conf.getTaskExecutionWarnTimeMicros()); - } - - private void initializeFuturePool(boolean ordered) { - // ownExecutor is a single threaded thread pool - if (null == readerFuturePool) { - readerFuturePool = buildFuturePool( - scheduler, statsLogger.scope("reader_future_pool")); - } - } - @Override public String toString() { - return String.format("DLM:%s:%s", getZKPath(), getStreamName()); + return String.format("DLM:%s:%s", getUri(), getStreamName()); } public void raiseAlert(String msg, Object... args) { alertStatsLogger.raise(msg, args); } - /** - * Get the subscription state storage provided by the distributed log manager - * - * @param subscriberId - Application specific Id associated with the subscriber - * @return Subscription state store - */ - @Override - @Deprecated - public SubscriptionStateStore getSubscriptionStateStore(String subscriberId) { - return getSubscriptionStateStoreInternal(conf.getUnpartitionedStreamName(), subscriberId); - } - - /** - * Get the subscription state storage provided by the distributed log manager - * - * @param streamIdentifier - Identifier associated with the stream - * @param subscriberId - Application specific Id associated with the subscriber - * @return Subscription state store - */ - private SubscriptionStateStore getSubscriptionStateStoreInternal(String streamIdentifier, String subscriberId) { - return new ZKSubscriptionStateStore(writerZKC, - LogMetadataForReader.getSubscriberPath(uri, name, streamIdentifier, subscriberId)); - } - @Override public SubscriptionsStore getSubscriptionsStore() { - return getSubscriptionsStoreInternal(conf.getUnpartitionedStreamName()); + return driver.getSubscriptionsStore(getStreamName()); } - /** - * Get the subscription state storage provided by the distributed log manager - * - * @param streamIdentifier - Identifier associated with the stream - * @return Subscriptions store - */ - private SubscriptionsStore getSubscriptionsStoreInternal(String streamIdentifier) { - return new ZKSubscriptionsStore(writerZKC, - LogMetadataForReader.getSubscribersPath(uri, name, streamIdentifier)); - } }