DL-163: clean up direct zookeeper and bookkeeper usage and use metadata/data 
store abstraction

- introduce NamespaceDriver class to manage metadata/data store abstractions
- clean up direct zookeeper and bookkeeper usage and use metadata/data store 
abstraction
- separate MetadataAccessor from DistributedLogManager
- remove deprecated classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/cfc049cd
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/cfc049cd
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/cfc049cd

Branch: refs/heads/master
Commit: cfc049cd045b73ca96fb328877b0acf4e75c95ab
Parents: d871e65
Author: Sijie Guo <sij...@twitter.com>
Authored: Wed Dec 28 17:47:12 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Thu Dec 29 02:13:30 2016 -0800

----------------------------------------------------------------------
 .../benchmark/stream/LedgerReadBenchmark.java   |   2 +-
 .../distributedlog/BKAsyncLogReader.java        |   2 -
 .../distributedlog/BKAsyncLogWriter.java        |   1 -
 .../distributedlog/BKDistributedLogManager.java | 454 ++--------
 .../BKDistributedLogNamespace.java              | 846 ++-----------------
 .../distributedlog/BKLogWriteHandler.java       |  58 +-
 .../distributedlog/BookKeeperClient.java        |  22 +-
 .../DistributedLogConfiguration.java            |   9 +-
 .../distributedlog/DistributedLogManager.java   |  28 +-
 .../DistributedLogManagerFactory.java           | 202 -----
 .../distributedlog/LocalDLMEmulator.java        |   2 +-
 .../distributedlog/ReadAheadEntryReader.java    |  18 +-
 .../distributedlog/ZKMetadataAccessor.java      | 259 ------
 .../distributedlog/ZooKeeperClientBuilder.java  |   4 +-
 .../distributedlog/acl/ZKAccessControl.java     | 229 -----
 .../acl/ZKAccessControlManager.java             | 373 --------
 .../admin/DistributedLogAdmin.java              |  85 +-
 .../distributedlog/auditor/DLAuditor.java       | 103 +--
 .../twitter/distributedlog/impl/BKDLUtils.java  | 100 ---
 .../distributedlog/impl/BKNamespaceDriver.java  | 631 ++++++++++++++
 .../distributedlog/impl/ZKLogMetadataStore.java |   2 +-
 .../distributedlog/impl/ZKMetadataAccessor.java | 264 ++++++
 .../distributedlog/impl/ZKNamespaceWatcher.java |   2 +-
 .../impl/acl/ZKAccessControl.java               | 232 +++++
 .../impl/acl/ZKAccessControlManager.java        | 374 ++++++++
 .../impl/logsegment/BKLogSegmentAllocator.java  |  85 ++
 .../logsegment/BKLogSegmentEntryReader.java     |  27 +-
 .../impl/logsegment/BKLogSegmentEntryStore.java |  63 +-
 .../impl/metadata/BKDLConfig.java               | 400 +++++++++
 .../impl/metadata/ZkMetadataResolver.java       |  72 ++
 .../subscription/ZKSubscriptionStateStore.java  | 121 +++
 .../impl/subscription/ZKSubscriptionsStore.java | 164 ++++
 .../logsegment/LogSegmentEntryReader.java       |  25 +
 .../logsegment/LogSegmentEntryStore.java        |  16 +-
 .../distributedlog/metadata/BKDLConfig.java     | 399 ---------
 .../distributedlog/metadata/DLMetadata.java     |   1 +
 .../metadata/ZkMetadataResolver.java            |  70 --
 .../namespace/DistributedLogNamespace.java      |   7 +
 .../DistributedLogNamespaceBuilder.java         | 143 +++-
 .../namespace/NamespaceDriver.java              | 138 +++
 .../namespace/NamespaceDriverManager.java       | 180 ++++
 .../subscription/ZKSubscriptionStateStore.java  | 120 ---
 .../subscription/ZKSubscriptionsStore.java      | 165 ----
 .../tools/DistributedLogTool.java               | 177 ++--
 .../twitter/distributedlog/util/DLUtils.java    | 124 ++-
 .../distributedlog/util/FutureUtils.java        |   6 +-
 .../com/twitter/distributedlog/DLMTestUtil.java | 120 +--
 .../distributedlog/TestAsyncReaderLock.java     |  29 +-
 .../distributedlog/TestAsyncReaderWriter.java   |  14 +-
 .../TestBKDistributedLogManager.java            |  99 +--
 .../TestBKDistributedLogNamespace.java          |  68 +-
 .../distributedlog/TestBKLogReadHandler.java    |  23 +-
 .../distributedlog/TestBKLogSegmentWriter.java  |   6 +-
 .../distributedlog/TestBKLogWriteHandler.java   |   4 +-
 .../distributedlog/TestDistributedLogBase.java  | 117 ++-
 .../distributedlog/TestFailureAndRecovery.java  | 257 ------
 .../distributedlog/TestInterleavedReaders.java  |  70 --
 .../distributedlog/TestLogSegmentsZK.java       |  24 +-
 .../distributedlog/TestNonBlockingReads.java    |   3 +
 .../TestReadAheadEntryReader.java               |  52 +-
 .../distributedlog/TestRollLogSegments.java     |   5 +-
 .../distributedlog/acl/TestZKAccessControl.java |   5 +-
 .../acl/TestZKAccessControlManager.java         |   2 +
 .../twitter/distributedlog/admin/TestDLCK.java  |  34 +-
 .../admin/TestDistributedLogAdmin.java          |  32 +-
 .../impl/TestZKLogSegmentMetadataStore.java     |   2 +-
 .../impl/TestZKNamespaceWatcher.java            |   2 +-
 .../TestFederatedZKLogMetadataStore.java        |   3 +-
 .../logsegment/TestBKLogSegmentEntryReader.java |  22 +-
 .../metadata/TestZKLogStreamMetadataStore.java  |  14 +-
 .../impl/metadata/TestZkMetadataResolver.java   | 203 +++++
 .../distributedlog/metadata/TestDLMetadata.java |   1 +
 .../metadata/TestZkMetadataResolver.java        | 200 -----
 .../service/DistributedLogCluster.java          |   2 +-
 .../service/balancer/BalancerTool.java          |   6 +-
 .../placement/ZKPlacementStateManager.java      |  11 +-
 .../service/TestDistributedLogServerBase.java   |   4 +-
 .../mapreduce/DistributedLogInputFormat.java    |  14 +-
 78 files changed, 3949 insertions(+), 4304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
index 0f4d3d4..072c3ef 100644
--- 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
+++ 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java
@@ -25,7 +25,7 @@ import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClientBuilder;
-import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import java.io.IOException;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
index b9d0365..cebbc33 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
@@ -594,8 +594,6 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, 
AsyncNotification {
                 }
                 lastProcessTime.reset().start();
 
-                lastProcessTime.reset().start();
-
                 // If the oldest pending promise is interrupted then we must 
mark
                 // the reader in error and abort all pending reads since we 
dont
                 // know the last consumed read

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
index a6b5fd2..9432e8a 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
@@ -24,7 +24,6 @@ import 
com.twitter.distributedlog.exceptions.StreamNotReadyException;
 import com.twitter.distributedlog.exceptions.WriteCancelledException;
 import com.twitter.distributedlog.exceptions.WriteException;
 import com.twitter.distributedlog.feature.CoreFeatureKeys;
-import com.twitter.distributedlog.stats.OpStatsListener;
 import com.twitter.distributedlog.util.FailpointUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.util.Future;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index d20cc6a..a3959b0 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -20,12 +20,6 @@ package com.twitter.distributedlog;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
-import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider;
-import com.twitter.distributedlog.bk.LedgerAllocator;
-import com.twitter.distributedlog.bk.LedgerAllocatorDelegator;
-import com.twitter.distributedlog.bk.QuorumConfigProvider;
-import com.twitter.distributedlog.bk.SimpleLedgerAllocator;
 import com.twitter.distributedlog.callback.LogSegmentListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
@@ -34,27 +28,22 @@ import 
com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.function.CloseAsyncCloseableFunction;
 import com.twitter.distributedlog.function.GetVersionedValueFunction;
-import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
 import com.twitter.distributedlog.injector.AsyncFailureInjector;
-import com.twitter.distributedlog.injector.AsyncRandomFailureInjector;
 import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
+import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
 import com.twitter.distributedlog.metadata.LogMetadataForReader;
 import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
 import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.lock.DistributedLock;
 import com.twitter.distributedlog.lock.NopDistributedLock;
 import com.twitter.distributedlog.lock.ZKDistributedLock;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
+import com.twitter.distributedlog.namespace.NamespaceDriver;
 import com.twitter.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
 import com.twitter.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.distributedlog.subscription.ZKSubscriptionStateStore;
-import com.twitter.distributedlog.subscription.ZKSubscriptionsStore;
-import com.twitter.distributedlog.util.ConfUtils;
+import com.twitter.distributedlog.util.Allocator;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.MonitoredFuturePool;
@@ -65,24 +54,20 @@ import com.twitter.distributedlog.util.SchedulerUtils;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.ExceptionalFunction;
 import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.ExecutorServiceFuturePool;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
-import com.twitter.util.FuturePool;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction0;
 import scala.runtime.AbstractFunction1;
 import scala.runtime.BoxedUnit;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashSet;
@@ -91,6 +76,9 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.READER;
+import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.WRITER;
+
 /**
  * <h3>Metrics</h3>
  * <ul>
@@ -112,7 +100,7 @@ import java.util.concurrent.TimeUnit;
  * for details.
  * </ul>
  */
-class BKDistributedLogManager extends ZKMetadataAccessor implements 
DistributedLogManager {
+class BKDistributedLogManager implements DistributedLogManager {
     static final Logger LOG = 
LoggerFactory.getLogger(BKDistributedLogManager.class);
 
     static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
@@ -131,44 +119,28 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 }
             };
 
+    private final URI uri;
+    private final String name;
     private final String clientId;
     private final int regionId;
     private final String streamIdentifier;
     private final DistributedLogConfiguration conf;
     private final DynamicDistributedLogConfiguration dynConf;
+    private final NamespaceDriver driver;
     private Promise<Void> closePromise;
     private final OrderedScheduler scheduler;
-    private final OrderedScheduler readAheadScheduler;
-    private boolean ownExecutor;
     private final FeatureProvider featureProvider;
+    private final AsyncFailureInjector failureInjector;
     private final StatsLogger statsLogger;
     private final StatsLogger perLogStatsLogger;
     final AlertStatsLogger alertStatsLogger;
 
-    // log stream metadata stores
-    private final LogStreamMetadataStore writerMetadataStore;
-    private final LogStreamMetadataStore readerMetadataStore;
     // log segment metadata cache
     private final LogSegmentMetadataCache logSegmentMetadataCache;
 
-    // bookkeeper clients
-    // NOTE: The actual bookkeeper client is initialized lazily when it is 
referenced by
-    //       {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it 
is safe to
-    //       keep builders and their client wrappers here, as they will be 
used when
-    //       instantiating readers or writers.
-    private final BookKeeperClientBuilder writerBKCBuilder;
-    private final BookKeeperClient writerBKC;
-    private final LogSegmentEntryStore writerEntryStore;
-    private final boolean ownWriterBKC;
-    private final BookKeeperClientBuilder readerBKCBuilder;
-    private final BookKeeperClient readerBKC;
-    private final LogSegmentEntryStore readerEntryStore;
-    private final boolean ownReaderBKC;
-
     //
     // Writer Related Variables
     //
-    private final LedgerAllocator ledgerAllocator;
     private final PermitLimiter writeLimiter;
 
     //
@@ -176,92 +148,26 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
     ///
     // read handler for listener.
     private BKLogReadHandler readHandlerForListener = null;
-    private FuturePool readerFuturePool = null;
     private final PendingReaders pendingReaders;
 
-    // Failure Injector
-    private final AsyncFailureInjector failureInjector;
-
-    /**
-     * Create a DLM for testing.
-     *
-     * @param name log name
-     * @param conf distributedlog configuration
-     * @param uri uri location for the log
-     * @param writerZKCBuilder zookeeper builder for writers
-     * @param readerZKCBuilder zookeeper builder for readers
-     * @param zkcForWriterBKC zookeeper builder for bookkeeper shared by 
writers
-     * @param zkcForReaderBKC zookeeper builder for bookkeeper shared by 
readers
-     * @param writerBKCBuilder bookkeeper builder for writers
-     * @param readerBKCBuilder bookkeeper builder for readers
-     * @param featureProvider provider to offer features
-     * @param writeLimiter write limiter
-     * @param statsLogger stats logger to receive stats
-     * @throws IOException
-     */
-    BKDistributedLogManager(String name,
-                            DistributedLogConfiguration conf,
-                            URI uri,
-                            ZooKeeperClientBuilder writerZKCBuilder,
-                            ZooKeeperClientBuilder readerZKCBuilder,
-                            ZooKeeperClient zkcForWriterBKC,
-                            ZooKeeperClient zkcForReaderBKC,
-                            BookKeeperClientBuilder writerBKCBuilder,
-                            BookKeeperClientBuilder readerBKCBuilder,
-                            FeatureProvider featureProvider,
-                            PermitLimiter writeLimiter,
-                            StatsLogger statsLogger) throws IOException {
-        this(name,
-             conf,
-             ConfUtils.getConstDynConf(conf),
-             uri,
-             writerZKCBuilder,
-             readerZKCBuilder,
-             zkcForWriterBKC,
-             zkcForReaderBKC,
-             writerBKCBuilder,
-             readerBKCBuilder,
-             null,
-             null,
-             new LogSegmentMetadataCache(conf, Ticker.systemTicker()),
-             OrderedScheduler.newBuilder().name("BKDL-" + 
name).corePoolSize(1).build(),
-             null,
-             null,
-             null,
-             DistributedLogConstants.UNKNOWN_CLIENT_ID,
-             DistributedLogConstants.LOCAL_REGION_ID,
-             null,
-             writeLimiter,
-             featureProvider,
-             statsLogger,
-             NullStatsLogger.INSTANCE);
-        this.ownExecutor = true;
-    }
+    // resource to close
+    private final Optional<AsyncCloseable> resourcesCloseable;
 
     /**
      * Create a {@link DistributedLogManager} with supplied resources.
      *
      * @param name log name
      * @param conf distributedlog configuration
+     * @param dynConf dynamic distributedlog configuration
      * @param uri uri location for the log
-     * @param writerZKCBuilder zookeeper builder for writers
-     * @param readerZKCBuilder zookeeper builder for readers
-     * @param zkcForWriterBKC zookeeper builder for bookkeeper shared by 
writers
-     * @param zkcForReaderBKC zookeeper builder for bookkeeper shared by 
readers
-     * @param writerBKCBuilder bookkeeper builder for writers
-     * @param readerBKCBuilder bookkeeper builder for readers
-     * @param writerMetadataStore writer metadata store
-     * @param readerMetadataStore reader metadata store
+     * @param driver namespace driver
+     * @param logSegmentMetadataCache log segment metadata cache
      * @param scheduler ordered scheduled used by readers and writers
-     * @param readAheadScheduler readAhead scheduler used by readers
-     * @param channelFactory client socket channel factory to build bookkeeper 
clients
-     * @param requestTimer request timer to build bookkeeper clients
      * @param clientId client id that used to initiate the locks
      * @param regionId region id that would be encrypted as part of log 
segment metadata
      *                 to indicate which region that the log segment will be 
created
-     * @param ledgerAllocator ledger allocator to allocate ledgers
-     * @param featureProvider provider to offer features
      * @param writeLimiter write limiter
+     * @param featureProvider provider to offer features
      * @param statsLogger stats logger to receive stats
      * @param perLogStatsLogger stats logger to receive per log stats
      * @throws IOException
@@ -270,148 +176,48 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                             DistributedLogConfiguration conf,
                             DynamicDistributedLogConfiguration dynConf,
                             URI uri,
-                            ZooKeeperClientBuilder writerZKCBuilder,
-                            ZooKeeperClientBuilder readerZKCBuilder,
-                            ZooKeeperClient zkcForWriterBKC,
-                            ZooKeeperClient zkcForReaderBKC,
-                            BookKeeperClientBuilder writerBKCBuilder,
-                            BookKeeperClientBuilder readerBKCBuilder,
-                            LogStreamMetadataStore writerMetadataStore,
-                            LogStreamMetadataStore readerMetadataStore,
+                            NamespaceDriver driver,
                             LogSegmentMetadataCache logSegmentMetadataCache,
                             OrderedScheduler scheduler,
-                            OrderedScheduler readAheadScheduler,
-                            ClientSocketChannelFactory channelFactory,
-                            HashedWheelTimer requestTimer,
                             String clientId,
                             Integer regionId,
-                            LedgerAllocator ledgerAllocator,
                             PermitLimiter writeLimiter,
                             FeatureProvider featureProvider,
+                            AsyncFailureInjector failureInjector,
                             StatsLogger statsLogger,
-                            StatsLogger perLogStatsLogger) throws IOException {
-        super(name, conf, uri, writerZKCBuilder, readerZKCBuilder, 
statsLogger);
+                            StatsLogger perLogStatsLogger,
+                            Optional<AsyncCloseable> resourcesCloseable) {
+        this.name = name;
         this.conf = conf;
         this.dynConf = dynConf;
+        this.uri = uri;
+        this.driver = driver;
+        this.logSegmentMetadataCache = logSegmentMetadataCache;
         this.scheduler = scheduler;
-        this.readAheadScheduler = null == readAheadScheduler ? scheduler : 
readAheadScheduler;
         this.statsLogger = statsLogger;
         this.perLogStatsLogger = 
BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
-        this.ownExecutor = false;
         this.pendingReaders = new PendingReaders(scheduler);
         this.regionId = regionId;
         this.clientId = clientId;
         this.streamIdentifier = conf.getUnpartitionedStreamName();
-        this.ledgerAllocator = ledgerAllocator;
         this.writeLimiter = writeLimiter;
-
-        // Failure Injection
-        this.failureInjector = AsyncRandomFailureInjector.newBuilder()
-                .injectDelays(conf.getEIInjectReadAheadDelay(),
-                              conf.getEIInjectReadAheadDelayPercent(),
-                              conf.getEIInjectMaxReadAheadDelayMs())
-                .injectErrors(false, 10)
-                .injectStops(conf.getEIInjectReadAheadStall(), 10)
-                .injectCorruption(conf.getEIInjectReadAheadBrokenEntries())
-                .build();
-
-        if (null == writerMetadataStore) {
-            this.writerMetadataStore = new ZKLogStreamMetadataStore(
-                    clientId,
-                    conf,
-                    writerZKC,
-                    scheduler,
-                    statsLogger);
-        } else {
-            this.writerMetadataStore = writerMetadataStore;
-        }
-        if (null == readerMetadataStore) {
-            this.readerMetadataStore = new ZKLogStreamMetadataStore(
-                    clientId,
-                    conf,
-                    readerZKC,
-                    scheduler,
-                    statsLogger);
-        } else {
-            this.readerMetadataStore = readerMetadataStore;
-        }
-        this.logSegmentMetadataCache = logSegmentMetadataCache;
-
-        // create the bkc for writers
-        if (null == writerBKCBuilder) {
-            // resolve uri
-            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, uri);
-            BKDLConfig.propagateConfiguration(bkdlConfig, conf);
-            this.writerBKCBuilder = BookKeeperClientBuilder.newBuilder()
-                    .dlConfig(conf)
-                    .name(String.format("bk:%s:dlm_writer_shared", name))
-                    .ledgersPath(bkdlConfig.getBkLedgersPath())
-                    .channelFactory(channelFactory)
-                    .requestTimer(requestTimer)
-                    .statsLogger(statsLogger);
-            if (null == zkcForWriterBKC) {
-                
this.writerBKCBuilder.zkServers(bkdlConfig.getBkZkServersForWriter());
-            } else {
-                this.writerBKCBuilder.zkc(zkcForWriterBKC);
-            }
-            this.ownWriterBKC = true;
-        } else {
-            this.writerBKCBuilder = writerBKCBuilder;
-            this.ownWriterBKC = false;
-        }
-        this.writerBKC = this.writerBKCBuilder.build();
-        this.writerEntryStore = new BKLogSegmentEntryStore(
-                conf,
-                writerBKC,
-                scheduler,
-                statsLogger,
-                failureInjector);
-
-        // create the bkc for readers
-        if (null == readerBKCBuilder) {
-            // resolve uri
-            BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, uri);
-            BKDLConfig.propagateConfiguration(bkdlConfig, conf);
-            if 
(bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader()))
 {
-                this.readerBKCBuilder = this.writerBKCBuilder;
-                this.ownReaderBKC = false;
-            } else {
-                this.readerBKCBuilder = BookKeeperClientBuilder.newBuilder()
-                        .dlConfig(conf)
-                        .name(String.format("bk:%s:dlm_reader_shared", name))
-                        .ledgersPath(bkdlConfig.getBkLedgersPath())
-                        .channelFactory(channelFactory)
-                        .requestTimer(requestTimer)
-                        .statsLogger(statsLogger);
-                if (null == zkcForReaderBKC) {
-                    
this.readerBKCBuilder.zkServers(bkdlConfig.getBkZkServersForReader());
-                } else {
-                    this.readerBKCBuilder.zkc(zkcForReaderBKC);
-                }
-                this.ownReaderBKC = true;
-            }
-        } else {
-            this.readerBKCBuilder = readerBKCBuilder;
-            this.ownReaderBKC = false;
-        }
-        this.readerBKC = this.readerBKCBuilder.build();
-        this.readerEntryStore = new BKLogSegmentEntryStore(
-                conf,
-                readerBKC,
-                scheduler,
-                statsLogger,
-                failureInjector);
-
         // Feature Provider
         this.featureProvider = featureProvider;
-
+        // Failure Injector
+        this.failureInjector = failureInjector;
         // Stats
         this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, 
"dl_alert");
+        this.resourcesCloseable = resourcesCloseable;
     }
 
-    @VisibleForTesting
-    LogStreamMetadataStore getWriterMetadataStore() {
-        return writerMetadataStore;
+    @Override
+    public String getStreamName() {
+        return name;
+    }
+
+    @Override
+    public NamespaceDriver getNamespaceDriver() {
+        return driver;
     }
 
     URI getUri() {
@@ -426,23 +232,22 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         return scheduler;
     }
 
-    @VisibleForTesting
-    BookKeeperClient getWriterBKC() {
-        return this.writerBKC;
+    AsyncFailureInjector getFailureInjector() {
+        return failureInjector;
     }
 
-    @VisibleForTesting
-    BookKeeperClient getReaderBKC() {
-        return this.readerBKC;
-    }
+    //
+    // Test Methods
+    //
 
-    LogSegmentEntryStore getReaderEntryStore() {
-        return this.readerEntryStore;
+    @VisibleForTesting
+    LogStreamMetadataStore getWriterMetadataStore() {
+        return driver.getLogStreamMetadataStore(WRITER);
     }
 
     @VisibleForTesting
-    FuturePool getReaderFuturePool() {
-        return this.readerFuturePool;
+    LogSegmentEntryStore getReaderEntryStore() {
+        return driver.getLogSegmentEntryStore(READER);
     }
 
     @VisibleForTesting
@@ -450,10 +255,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         return this.featureProvider;
     }
 
-    AsyncFailureInjector getFailureInjector() {
-        return this.failureInjector;
-    }
-
     private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
             boolean create, LogSegmentListener listener) {
         if (null == readHandlerForListener && create) {
@@ -502,12 +303,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 throw new AlreadyClosedException("Executing " + operation + " 
on already closed DistributedLogManager");
             }
         }
-        if (null != writerBKC) {
-            writerBKC.checkClosedOrInError();
-        }
-        if (null != readerBKC) {
-            readerBKC.checkClosedOrInError();
-        }
     }
 
     // Create Read Handler
@@ -538,9 +333,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 subscriberId,
                 conf,
                 dynConf,
-                readerMetadataStore,
+                driver.getLogStreamMetadataStore(READER),
                 logSegmentMetadataCache,
-                readerEntryStore,
+                driver.getLogSegmentEntryStore(READER),
                 scheduler,
                 alertStatsLogger,
                 statsLogger,
@@ -552,23 +347,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
 
     // Create Ledger Allocator
 
-    LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata) 
throws IOException {
-        LedgerAllocator ledgerAllocatorDelegator;
-        if (!dynConf.getEnableLedgerAllocatorPool()) {
-            QuorumConfigProvider quorumConfigProvider =
-                    new DynamicQuorumConfigProvider(dynConf);
-            LedgerAllocator allocator = new SimpleLedgerAllocator(
-                    logMetadata.getAllocationPath(),
-                    logMetadata.getAllocationData(),
-                    quorumConfigProvider,
-                    writerZKC,
-                    writerBKC);
-            ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, 
true);
-        } else {
-            ledgerAllocatorDelegator = ledgerAllocator;
-        }
-        return ledgerAllocatorDelegator;
-    }
+
 
     // Create Write Handler
 
@@ -578,14 +357,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
     }
 
     Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean 
lockHandler) {
-        boolean ownAllocator = null == ledgerAllocator;
-
         // Fetching Log Metadata (create if not exists)
-        return writerMetadataStore.getLog(
+        return driver.getLogStreamMetadataStore(WRITER).getLog(
                 uri,
                 name,
-                ownAllocator,
-                conf.getCreateStreamIfNotExists() || ownAllocator
+                true,
+                conf.getCreateStreamIfNotExists()
         ).flatMap(new AbstractFunction1<LogMetadataForWriter, 
Future<BKLogWriteHandler>>() {
             @Override
             public Future<BKLogWriteHandler> apply(LogMetadataForWriter 
logMetadata) {
@@ -602,16 +379,17 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         // Build the locks
         DistributedLock lock;
         if (conf.isWriteLockEnabled()) {
-            lock = writerMetadataStore.createWriteLock(logMetadata);
+            lock = 
driver.getLogStreamMetadataStore(WRITER).createWriteLock(logMetadata);
         } else {
             lock = NopDistributedLock.INSTANCE;
         }
-        // Build the ledger allocator
-        LedgerAllocator allocator;
+
+        Allocator<LogSegmentEntryWriter, Object> segmentAllocator;
         try {
-            allocator = createLedgerAllocator(logMetadata);
-        } catch (IOException e) {
-            FutureUtils.setException(createPromise, e);
+            segmentAllocator = driver.getLogSegmentEntryStore(WRITER)
+                    .newLogSegmentAllocator(logMetadata, dynConf);
+        } catch (IOException ioe) {
+            FutureUtils.setException(createPromise, ioe);
             return;
         }
 
@@ -619,11 +397,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
         final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
                 logMetadata,
                 conf,
-                writerMetadataStore,
+                driver.getLogStreamMetadataStore(WRITER),
                 logSegmentMetadataCache,
-                writerEntryStore,
+                driver.getLogSegmentEntryStore(WRITER),
                 scheduler,
-                allocator,
+                segmentAllocator,
                 statsLogger,
                 perLogStatsLogger,
                 alertStatsLogger,
@@ -657,12 +435,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
     }
 
     PermitManager getLogSegmentRollingPermitManager() {
-        return writerMetadataStore.getPermitManager();
+        return driver.getLogStreamMetadataStore(WRITER).getPermitManager();
     }
 
     <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, 
Future<T>> func) {
-        initializeFuturePool(false);
-        return readerFuturePool.apply(new 
ExceptionalFunction0<BKLogReadHandler>() {
+        return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() {
             @Override
             public BKLogReadHandler applyE() throws Throwable {
                 return getReadHandlerAndRegisterListener(true, null);
@@ -822,7 +599,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 fromTxnId,
                 segmentIdx,
                 segments,
-                readerEntryStore
+                driver.getLogSegmentEntryStore(READER)
         );
     }
 
@@ -1010,8 +787,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
                 LOG.info("Reader {} @ {} reading last commit position from 
subscription store after acquired lock.",
                         subscriberId.get(), name);
                 // we acquired lock
-                final SubscriptionStateStore stateStore = 
getSubscriptionStateStore(subscriberId.get());
-                return stateStore.getLastCommitPosition().map(new 
ExceptionalFunction<DLSN, AsyncLogReader>() {
+                final SubscriptionsStore subscriptionsStore = 
driver.getSubscriptionsStore(getStreamName());
+                return 
subscriptionsStore.getLastCommitPosition(subscriberId.get())
+                        .map(new ExceptionalFunction<DLSN, AsyncLogReader>() {
                     @Override
                     public AsyncLogReader applyE(DLSN lastCommitPosition) 
throws UnexpectedException {
                         LOG.info("Reader {} @ {} positioned to last commit 
position {}.",
@@ -1226,7 +1004,8 @@ class BKDistributedLogManager extends ZKMetadataAccessor 
implements DistributedL
      */
     @Override
     public void delete() throws IOException {
-        FutureUtils.result(writerMetadataStore.deleteLog(uri, 
getStreamName()));
+        FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+                .deleteLog(uri, getStreamName()));
     }
 
     /**
@@ -1297,43 +1076,10 @@ class BKDistributedLogManager extends 
ZKMetadataAccessor implements DistributedL
             readHandlerToClose = readHandlerForListener;
         }
 
-        // NOTE: the resources {scheduler, writerBKC, readerBKC} are mostly 
from namespace instance.
-        //       so they are not blocking call except tests.
-        AsyncCloseable resourcesCloseable = new AsyncCloseable() {
-            @Override
-            public Future<Void> asyncClose() {
-                int schedTimeout = conf.getSchedulerShutdownTimeoutMs();
-
-                // Clean up executor state.
-                if (ownExecutor) {
-                    SchedulerUtils.shutdownScheduler(scheduler, schedTimeout, 
TimeUnit.MILLISECONDS);
-                    LOG.info("Stopped BKDL executor service for {}.", name);
-
-                    if (scheduler != readAheadScheduler) {
-                        SchedulerUtils.shutdownScheduler(readAheadScheduler, 
schedTimeout, TimeUnit.MILLISECONDS);
-                        LOG.info("Stopped BKDL ReadAhead Executor Service for 
{}.", name);
-                    }
-                }
-                if (ownWriterBKC) {
-                    writerBKC.close();
-                }
-                if (ownReaderBKC) {
-                    readerBKC.close();
-                }
-                return Future.Void();
-            }
-        };
-
         Future<Void> closeResult = Utils.closeSequence(null, true,
                 readHandlerToClose,
                 pendingReaders,
-                resourcesCloseable,
-                new AsyncCloseable() {
-                    @Override
-                    public Future<Void> asyncClose() {
-                        return BKDistributedLogManager.super.asyncClose();
-                    }
-                });
+                resourcesCloseable.or(AsyncCloseable.NULL));
         closeResult.proxyTo(closeFuture);
         return closeFuture;
     }
@@ -1343,70 +1089,18 @@ class BKDistributedLogManager extends 
ZKMetadataAccessor implements DistributedL
         FutureUtils.result(asyncClose());
     }
 
-    private FuturePool buildFuturePool(ExecutorService executorService,
-                                       StatsLogger statsLogger) {
-        FuturePool futurePool = new ExecutorServiceFuturePool(executorService);
-        return new MonitoredFuturePool(
-                futurePool,
-                statsLogger,
-                conf.getEnableTaskExecutionStats(),
-                conf.getTaskExecutionWarnTimeMicros());
-    }
-
-    private void initializeFuturePool(boolean ordered) {
-        // ownExecutor is a single threaded thread pool
-        if (null == readerFuturePool) {
-            readerFuturePool = buildFuturePool(
-                    scheduler, statsLogger.scope("reader_future_pool"));
-        }
-    }
-
     @Override
     public String toString() {
-        return String.format("DLM:%s:%s", getZKPath(), getStreamName());
+        return String.format("DLM:%s:%s", getUri(), getStreamName());
     }
 
     public void raiseAlert(String msg, Object... args) {
         alertStatsLogger.raise(msg, args);
     }
 
-    /**
-     * Get the subscription state storage provided by the distributed log 
manager
-     *
-     * @param subscriberId - Application specific Id associated with the 
subscriber
-     * @return Subscription state store
-     */
-    @Override
-    @Deprecated
-    public SubscriptionStateStore getSubscriptionStateStore(String 
subscriberId) {
-        return 
getSubscriptionStateStoreInternal(conf.getUnpartitionedStreamName(), 
subscriberId);
-    }
-
-    /**
-     * Get the subscription state storage provided by the distributed log 
manager
-     *
-     * @param streamIdentifier - Identifier associated with the stream
-     * @param subscriberId - Application specific Id associated with the 
subscriber
-     * @return Subscription state store
-     */
-    private SubscriptionStateStore getSubscriptionStateStoreInternal(String 
streamIdentifier, String subscriberId) {
-        return new ZKSubscriptionStateStore(writerZKC,
-                LogMetadataForReader.getSubscriberPath(uri, name, 
streamIdentifier, subscriberId));
-    }
-
     @Override
     public SubscriptionsStore getSubscriptionsStore() {
-        return 
getSubscriptionsStoreInternal(conf.getUnpartitionedStreamName());
+        return driver.getSubscriptionsStore(getStreamName());
     }
 
-    /**
-     * Get the subscription state storage provided by the distributed log 
manager
-     *
-     * @param streamIdentifier - Identifier associated with the stream
-     * @return Subscriptions store
-     */
-    private SubscriptionsStore getSubscriptionsStoreInternal(String 
streamIdentifier) {
-        return new ZKSubscriptionsStore(writerZKC,
-                LogMetadataForReader.getSubscribersPath(uri, name, 
streamIdentifier));
-    }
 }


Reply via email to