DL-117: Stream metadata store This change is to abstract the zookeeper operations into a stream metadata store, so we can replace zookeeper with other metadata store easily.
So the metadata operations in distributedlog now are managed by 3 classes: - LogMetadataStore : it is the namespace metadata store : it manages the location (uri) mapping for streams and handle namespace operations. - LogStreamMetadataStore: it is the stream metadata store : it manages the metadata for a single stream, such as managing read/write lock, retriving/creating stream metadata, deleting metadata and such. - LogSegmentMetadataStore: it is the segment metadata store : it manages the log segment metadata for individual log segment. LogMetadataStore and LogSegmentMetadataStore are already there. This change focus on LogStreamMetadataStore Changed: * abstract all the zookeeper metadata operation in log handlers to LogStreamMetadataStore * remove disabling max tx id santify check, as maxTxId update is part of the metadata update transaction Not changed: the name of ZKLogMetadataForReader and ZKLogMetadataForWriter are not changed. I will send out a change to rename these two classes as they are not related to zookeeper anymore. Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b91d49a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b91d49a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b91d49a8 Branch: refs/heads/master Commit: b91d49a89e7c9133eb2807ced2e7dbb8aba72e02 Parents: 9d467a6 Author: Sijie Guo <sij...@twitter.com> Authored: Wed Nov 30 17:14:05 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:08:33 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogReaderDLSN.java | 6 +- .../distributedlog/BKDistributedLogManager.java | 180 ++---- .../BKDistributedLogNamespace.java | 137 ++-- .../twitter/distributedlog/BKLogHandler.java | 74 +-- .../distributedlog/BKLogReadHandler.java | 126 +--- .../distributedlog/BKLogWriteHandler.java | 211 +++---- .../distributedlog/BKSyncLogReaderDLSN.java | 1 - .../DistributedLogConfiguration.java | 2 + .../distributedlog/MaxLogSegmentSequenceNo.java | 40 +- .../com/twitter/distributedlog/MaxTxId.java | 76 +-- .../impl/ZKLogSegmentMetadataStore.java | 63 +- .../impl/metadata/ZKLogMetadata.java | 23 +- .../impl/metadata/ZKLogMetadataForWriter.java | 309 +-------- .../impl/metadata/ZKLogStreamMetadataStore.java | 630 +++++++++++++++++++ .../logsegment/LogSegmentMetadataStore.java | 24 +- .../distributedlog/metadata/BKDLConfig.java | 5 +- .../LogSegmentMetadataStoreUpdater.java | 4 +- .../metadata/LogStreamMetadataStore.java | 116 ++++ .../tools/DistributedLogTool.java | 3 +- .../util/LimitedPermitManager.java | 194 ------ .../twitter/distributedlog/zk/DefaultZKOp.java | 20 +- .../distributedlog/zk/LimitedPermitManager.java | 195 ++++++ .../distributedlog/zk/ZKVersionedSetOp.java | 11 +- .../com/twitter/distributedlog/DLMTestUtil.java | 5 +- .../TestAppendOnlyStreamWriter.java | 5 +- .../TestBKDistributedLogManager.java | 65 +- .../TestBKDistributedLogNamespace.java | 6 +- .../distributedlog/TestDistributedLogBase.java | 3 +- .../distributedlog/TestRollLogSegments.java | 1 + .../distributedlog/bk/TestLedgerAllocator.java | 4 +- .../impl/TestZKLogSegmentMetadataStore.java | 73 ++- .../metadata/TestZKLogMetadataForWriter.java | 327 ---------- ...TestZKLogMetadataForWriterUtilFunctions.java | 204 ------ .../metadata/TestZKLogStreamMetadataStore.java | 326 ++++++++++ .../TestZKLogStreamMetadataStoreUtils.java | 206 ++++++ .../distributedlog/util/TestPermitManager.java | 1 + 36 files changed, 1875 insertions(+), 1801 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java index cf792e3..2ca064c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java @@ -30,7 +30,6 @@ import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.injector.AsyncRandomFailureInjector; import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; @@ -210,7 +209,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio BKAsyncLogReaderDLSN(BKDistributedLogManager bkdlm, ScheduledExecutorService executorService, - OrderedScheduler lockStateExecutor, DLSN startDLSN, Optional<String> subscriberId, boolean returnEndOfStreamRecord, @@ -219,7 +217,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio this.bkDistributedLogManager = bkdlm; this.executorService = executorService; this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId, - lockStateExecutor, this, deserializeRecordSet, true); + this, deserializeRecordSet, true); LOG.debug("Starting async reader at {}", startDLSN); this.startDLSN = startDLSN; this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); @@ -414,7 +412,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); if (!readAheadStarted) { - bkLedgerManager.checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() { + bkLedgerManager.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { try { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index ac37f3a..0a34caa 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -29,25 +29,22 @@ import com.twitter.distributedlog.bk.SimpleLedgerAllocator; import com.twitter.distributedlog.callback.LogSegmentListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.LogEmptyException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.CloseAsyncCloseableFunction; import com.twitter.distributedlog.function.GetVersionedValueFunction; -import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; +import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.lock.NopDistributedLock; -import com.twitter.distributedlog.lock.SessionLockFactory; import com.twitter.distributedlog.lock.ZKDistributedLock; -import com.twitter.distributedlog.lock.ZKSessionLockFactory; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.stats.BroadCastStatsLogger; import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; import com.twitter.distributedlog.subscription.SubscriptionStateStore; @@ -75,10 +72,6 @@ import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZooKeeper; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; @@ -93,7 +86,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; /** @@ -120,13 +112,6 @@ import java.util.concurrent.TimeUnit; class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedLogManager { static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class); - static void createLog(DistributedLogConfiguration conf, ZooKeeperClient zkc, URI uri, String streamName) - throws IOException, InterruptedException { - Future<ZKLogMetadataForWriter> createFuture = ZKLogMetadataForWriter.of( - uri, streamName, conf.getUnpartitionedStreamName(), zkc.get(), zkc.getDefaultACL(), true, true); - FutureUtils.result(createFuture); - } - static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = new Function<LogRecordWithDLSN, Long>() { @Override @@ -158,12 +143,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL private final StatsLogger perLogStatsLogger; private final AlertStatsLogger alertStatsLogger; - // lock factory - private SessionLockFactory lockFactory = null; - - // log segment metadata stores - private final LogSegmentMetadataStore writerMetadataStore; - private final LogSegmentMetadataStore readerMetadataStore; + // log stream metadata stores + private final LogStreamMetadataStore writerMetadataStore; + private final LogStreamMetadataStore readerMetadataStore; + // log segment metadata cache private final LogSegmentMetadataCache logSegmentMetadataCache; // bookkeeper clients @@ -183,9 +166,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL // private final LedgerAllocator ledgerAllocator; private final PermitLimiter writeLimiter; - // Log Segment Rolling Manager to control rolling speed - private final PermitManager logSegmentRollingPermitManager; - private OrderedScheduler lockStateExecutor = null; // // Reader Related Variables @@ -237,19 +217,16 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL readerBKCBuilder, null, null, - null, new LogSegmentMetadataCache(conf, Ticker.systemTicker()), OrderedScheduler.newBuilder().name("BKDL-" + name).corePoolSize(1).build(), null, null, null, - null, new ReadAheadExceptionsLogger(statsLogger), DistributedLogConstants.UNKNOWN_CLIENT_ID, DistributedLogConstants.LOCAL_REGION_ID, null, writeLimiter, - PermitManager.UNLIMITED_PERMIT_MANAGER, featureProvider, statsLogger, NullStatsLogger.INSTANCE); @@ -268,12 +245,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL * @param zkcForReaderBKC zookeeper builder for bookkeeper shared by readers * @param writerBKCBuilder bookkeeper builder for writers * @param readerBKCBuilder bookkeeper builder for readers - * @param lockFactory distributed lock factory * @param writerMetadataStore writer metadata store * @param readerMetadataStore reader metadata store * @param scheduler ordered scheduled used by readers and writers * @param readAheadScheduler readAhead scheduler used by readers - * @param lockStateExecutor ordered scheduled used by locks to execute lock actions * @param channelFactory client socket channel factory to build bookkeeper clients * @param requestTimer request timer to build bookkeeper clients * @param readAheadExceptionsLogger stats logger to record readahead exceptions @@ -297,13 +272,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL ZooKeeperClient zkcForReaderBKC, BookKeeperClientBuilder writerBKCBuilder, BookKeeperClientBuilder readerBKCBuilder, - SessionLockFactory lockFactory, - LogSegmentMetadataStore writerMetadataStore, - LogSegmentMetadataStore readerMetadataStore, + LogStreamMetadataStore writerMetadataStore, + LogStreamMetadataStore readerMetadataStore, LogSegmentMetadataCache logSegmentMetadataCache, OrderedScheduler scheduler, OrderedScheduler readAheadScheduler, - OrderedScheduler lockStateExecutor, ClientSocketChannelFactory channelFactory, HashedWheelTimer requestTimer, ReadAheadExceptionsLogger readAheadExceptionsLogger, @@ -311,7 +284,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL Integer regionId, LedgerAllocator ledgerAllocator, PermitLimiter writeLimiter, - PermitManager logSegmentRollingPermitManager, FeatureProvider featureProvider, StatsLogger statsLogger, StatsLogger perLogStatsLogger) throws IOException { @@ -320,8 +292,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL this.conf = conf; this.dynConf = dynConf; this.scheduler = scheduler; - this.lockFactory = lockFactory; - this.lockStateExecutor = lockStateExecutor; this.readAheadScheduler = null == readAheadScheduler ? scheduler : readAheadScheduler; this.statsLogger = statsLogger; this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger); @@ -332,15 +302,24 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL this.streamIdentifier = conf.getUnpartitionedStreamName(); this.ledgerAllocator = ledgerAllocator; this.writeLimiter = writeLimiter; - this.logSegmentRollingPermitManager = logSegmentRollingPermitManager; if (null == writerMetadataStore) { - this.writerMetadataStore = new ZKLogSegmentMetadataStore(conf, writerZKC, scheduler); + this.writerMetadataStore = new ZKLogStreamMetadataStore( + clientId, + conf, + writerZKC, + scheduler, + statsLogger); } else { this.writerMetadataStore = writerMetadataStore; } if (null == readerMetadataStore) { - this.readerMetadataStore = new ZKLogSegmentMetadataStore(conf, readerZKC, scheduler); + this.readerMetadataStore = new ZKLogStreamMetadataStore( + clientId, + conf, + readerZKC, + scheduler, + statsLogger); } else { this.readerMetadataStore = readerMetadataStore; } @@ -407,26 +386,13 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL this.readAheadExceptionsLogger = readAheadExceptionsLogger; } - synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) { - if (createIfNull && null == lockStateExecutor && ownExecutor) { - lockStateExecutor = OrderedScheduler.newBuilder() - .corePoolSize(1).name("BKDL-LockState").build(); - } - return lockStateExecutor; + @VisibleForTesting + LogStreamMetadataStore getWriterMetadataStore() { + return writerMetadataStore; } - private synchronized SessionLockFactory getLockFactory(boolean createIfNull) { - if (createIfNull && null == lockFactory) { - lockFactory = new ZKSessionLockFactory( - writerZKC, - clientId, - getLockStateExecutor(createIfNull), - conf.getZKNumRetries(), - conf.getLockTimeoutMilliSeconds(), - conf.getZKRetryBackoffStartMillis(), - statsLogger); - } - return lockFactory; + URI getUri() { + return uri; } DistributedLogConfiguration getConf() { @@ -457,12 +423,16 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return this.featureProvider; } - private synchronized BKLogReadHandler getReadHandlerForListener(boolean create) { + private synchronized BKLogReadHandler getReadHandlerAndRegisterListener( + boolean create, LogSegmentListener listener) { if (null == readHandlerForListener && create) { readHandlerForListener = createReadHandler(); - // start fetch the log segments + readHandlerForListener.registerListener(listener); + // start fetch the log segments after created the listener readHandlerForListener.asyncStartFetchLogSegments(); + return readHandlerForListener; } + readHandlerForListener.registerListener(listener); return readHandlerForListener; } @@ -483,8 +453,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL @Override public void registerListener(LogSegmentListener listener) throws IOException { - BKLogReadHandler readHandler = getReadHandlerForListener(true); - readHandler.registerListener(listener); + getReadHandlerAndRegisterListener(true, listener); } @Override @@ -523,14 +492,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL boolean isHandleForReading) { return createReadHandler( subscriberId, - getLockStateExecutor(true), null, true, /* deserialize record set */ isHandleForReading); } synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, - OrderedScheduler lockExecutor, AsyncNotification notification, boolean deserializeRecordSet, boolean isHandleForReading) { @@ -540,12 +507,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL subscriberId, conf, dynConf, - readerZKCBuilder, readerBKCBuilder, readerMetadataStore, logSegmentMetadataCache, scheduler, - lockExecutor, readAheadScheduler, alertStatsLogger, readAheadExceptionsLogger, @@ -585,24 +550,15 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL } Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) { - final ZooKeeper zk; - try { - zk = writerZKC.get(); - } catch (InterruptedException e) { - LOG.error("Failed to initialize zookeeper client : ", e); - return Future.exception(new DLInterruptedException("Failed to initialize zookeeper client", e)); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, uri.getPath())); - } - boolean ownAllocator = null == ledgerAllocator; - // Fetching Log Metadata - Future<ZKLogMetadataForWriter> metadataFuture = - ZKLogMetadataForWriter.of(uri, name, streamIdentifier, - zk, writerZKC.getDefaultACL(), - ownAllocator, conf.getCreateStreamIfNotExists() || ownAllocator); - return metadataFuture.flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() { + // Fetching Log Metadata (create if not exists) + return writerMetadataStore.getLog( + uri, + name, + ownAllocator, + conf.getCreateStreamIfNotExists() || ownAllocator + ).flatMap(new AbstractFunction1<ZKLogMetadataForWriter, Future<BKLogWriteHandler>>() { @Override public Future<BKLogWriteHandler> apply(ZKLogMetadataForWriter logMetadata) { Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>(); @@ -615,16 +571,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL private void createWriteHandler(ZKLogMetadataForWriter logMetadata, boolean lockHandler, final Promise<BKLogWriteHandler> createPromise) { - OrderedScheduler lockStateExecutor = getLockStateExecutor(true); // Build the locks DistributedLock lock; if (conf.isWriteLockEnabled()) { - lock = new ZKDistributedLock( - lockStateExecutor, - getLockFactory(true), - logMetadata.getLockPath(), - conf.getLockTimeoutMilliSeconds(), - statsLogger); + lock = writerMetadataStore.createWriteLock(logMetadata); } else { lock = NopDistributedLock.INSTANCE; } @@ -641,7 +591,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL final BKLogWriteHandler writeHandler = new BKLogWriteHandler( logMetadata, conf, - writerZKCBuilder, writerBKCBuilder, writerMetadataStore, logSegmentMetadataCache, @@ -656,10 +605,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL featureProvider, dynConf, lock); - PermitManager manager = getLogSegmentRollingPermitManager(); - if (manager instanceof Watcher) { - writeHandler.register((Watcher) manager); - } if (lockHandler) { writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() { @Override @@ -684,7 +629,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL } PermitManager getLogSegmentRollingPermitManager() { - return logSegmentRollingPermitManager; + return writerMetadataStore.getPermitManager(); } <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) { @@ -692,7 +637,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() { @Override public BKLogReadHandler applyE() throws Throwable { - return getReadHandlerForListener(true); + return getReadHandlerAndRegisterListener(true, null); } }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() { @Override @@ -982,7 +927,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL AsyncLogReader reader = new BKAsyncLogReaderDLSN( this, scheduler, - getLockStateExecutor(true), fromDLSN, subscriberId, false, @@ -1022,7 +966,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL final BKAsyncLogReaderDLSN reader = new BKAsyncLogReaderDLSN( BKDistributedLogManager.this, scheduler, - getLockStateExecutor(true), fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN, subscriberId, false, @@ -1266,33 +1209,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL */ @Override public void delete() throws IOException { - BKLogWriteHandler ledgerHandler = createWriteHandler(true); - try { - ledgerHandler.deleteLog(); - } finally { - Utils.closeQuietly(ledgerHandler); - } - - // Delete the ZK path associated with the log stream - String zkPath = getZKPath(); - // Safety check when we are using the shared zookeeper - if (zkPath.toLowerCase().contains("distributedlog")) { - try { - LOG.info("Delete the path associated with the log {}, ZK Path {}", name, zkPath); - ZKUtil.deleteRecursive(writerZKC.get(), zkPath); - } catch (InterruptedException ie) { - LOG.error("Interrupted while accessing ZK", ie); - throw new DLInterruptedException("Error initializing zk", ie); - } catch (KeeperException ke) { - LOG.error("Error accessing entry in zookeeper", ke); - throw new IOException("Error initializing zk", ke); - } - } else { - LOG.warn("Skip deletion of unrecognized ZK Path {}", zkPath); - } + FutureUtils.result(writerMetadataStore.deleteLog(uri, getStreamName())); } - /** * The DistributedLogManager may archive/purge any logs for transactionId * less than or equal to minImageTxId. @@ -1377,9 +1296,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL SchedulerUtils.shutdownScheduler(readAheadScheduler, schedTimeout, TimeUnit.MILLISECONDS); LOG.info("Stopped BKDL ReadAhead Executor Service for {}.", name); } - - SchedulerUtils.shutdownScheduler(getLockStateExecutor(false), schedTimeout, TimeUnit.MILLISECONDS); - LOG.info("Stopped BKDL Lock State Executor for {}.", name); } if (ownWriterBKC) { writerBKC.close(); @@ -1410,16 +1326,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL FutureUtils.result(asyncClose()); } - public boolean scheduleTask(Runnable task) { - try { - scheduler.submit(task); - return true; - } catch (RejectedExecutionException ree) { - LOG.error("Task {} is rejected : ", task, ree); - return false; - } - } - private FuturePool buildFuturePool(ExecutorService executorService, StatsLogger statsLogger) { FuturePool futurePool = new ExecutorServiceFuturePool(executorService); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index f8d347a..2c9fe44 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption; @@ -33,30 +32,24 @@ import com.twitter.distributedlog.bk.LedgerAllocatorUtils; import com.twitter.distributedlog.callback.NamespaceListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.ZKException; import com.twitter.distributedlog.feature.CoreFeatureKeys; import com.twitter.distributedlog.impl.ZKLogMetadataStore; -import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore; import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore; -import com.twitter.distributedlog.lock.SessionLockFactory; -import com.twitter.distributedlog.lock.ZKSessionLockFactory; +import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.LogMetadataStore; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.LimitedPermitManager; import com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.PermitLimiter; -import com.twitter.distributedlog.util.PermitManager; import com.twitter.distributedlog.util.SchedulerUtils; import com.twitter.distributedlog.util.SimplePermitLimiter; import com.twitter.distributedlog.util.Utils; @@ -272,7 +265,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { private final BKDLConfig bkdlConfig; private final OrderedScheduler scheduler; private final OrderedScheduler readAheadExecutor; - private final OrderedScheduler lockStateExecutor; private final ClientSocketChannelFactory channelFactory; private final HashedWheelTimer requestTimer; // zookeeper clients @@ -300,16 +292,12 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { private final LedgerAllocator allocator; // access control manager private AccessControlManager accessControlManager; - // log segment rolling permit manager - private final PermitManager logSegmentRollingPermitManager; // log metadata store private final LogMetadataStore metadataStore; // log segment metadata store private final LogSegmentMetadataCache logSegmentMetadataCache; - private final LogSegmentMetadataStore writerSegmentMetadataStore; - private final LogSegmentMetadataStore readerSegmentMetadataStore; - // lock factory - private final SessionLockFactory lockFactory; + private final LogStreamMetadataStore writerStreamMetadataStore; + private final LogStreamMetadataStore readerStreamMetadataStore; // feature provider private final FeatureProvider featureProvider; @@ -371,15 +359,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { this.readAheadExecutor = this.scheduler; LOG.info("Used shared executor for readahead."); } - StatsLogger lockStateStatsLogger = statsLogger.scope("factory").scope("lock_scheduler"); - this.lockStateExecutor = OrderedScheduler.newBuilder() - .name("DLM-LockState") - .corePoolSize(conf.getNumLockStateThreads()) - .statsLogger(lockStateStatsLogger) - .perExecutorStatsLogger(lockStateStatsLogger) - .traceTaskExecution(conf.getEnableTaskExecutionStats()) - .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros()) - .build(); + this.channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()), @@ -427,9 +407,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { } this.readerBKC = this.sharedReaderBKCBuilder.build(); - this.logSegmentRollingPermitManager = new LimitedPermitManager( - conf.getLogSegmentRollingConcurrency(), 1, TimeUnit.MINUTES, scheduler); - if (conf.getGlobalOutstandingWriteLimit() < 0) { this.writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER; } else { @@ -458,15 +435,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { } else { allocator = null; } - // Build the lock factory - this.lockFactory = new ZKSessionLockFactory( - sharedWriterZKCForDL, - clientId, - lockStateExecutor, - conf.getZKNumRetries(), - conf.getLockTimeoutMilliSeconds(), - conf.getZKRetryBackoffStartMillis(), - statsLogger); // Stats Loggers this.readAheadExceptionsLogger = new ReadAheadExceptionsLogger(statsLogger); @@ -478,11 +446,22 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { this.metadataStore = new ZKLogMetadataStore(conf, namespace, sharedReaderZKCForDL, scheduler); } - // create log segment metadata store - this.writerSegmentMetadataStore = - new ZKLogSegmentMetadataStore(conf, sharedWriterZKCForDL, scheduler); - this.readerSegmentMetadataStore = - new ZKLogSegmentMetadataStore(conf, sharedReaderZKCForDL, scheduler); + // create log stream metadata store + this.writerStreamMetadataStore = + new ZKLogStreamMetadataStore( + clientId, + conf, + sharedWriterZKCForDL, + scheduler, + statsLogger); + this.readerStreamMetadataStore = + new ZKLogStreamMetadataStore( + clientId, + conf, + sharedReaderZKCForDL, + scheduler, + statsLogger); + // create a log segment metadata cache this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker()); LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, regionId = {}, federated = {}.", @@ -499,7 +478,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { checkState(); validateName(logName); URI uri = FutureUtils.result(metadataStore.createLog(logName)); - createUnpartitionedStreams(conf, uri, Lists.newArrayList(logName)); + FutureUtils.result(writerStreamMetadataStore.getLog(uri, logName, true, true)); } @Override @@ -556,7 +535,16 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws IOException, IllegalArgumentException { checkState(); Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); - return uri.isPresent() && checkIfLogExists(conf, uri.get(), logName); + if (uri.isPresent()) { + try { + FutureUtils.result(writerStreamMetadataStore.logExists(uri.get(), logName)); + return true; + } catch (LogNotFoundException lnfe) { + return false; + } + } else { + return false; + } } @Override @@ -701,8 +689,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { } @VisibleForTesting - public LogSegmentMetadataStore getWriterSegmentMetadataStore() { - return writerSegmentMetadataStore; + public LogStreamMetadataStore getWriterStreamMetadataStore() { + return writerStreamMetadataStore; } @VisibleForTesting @@ -883,10 +871,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { } LedgerAllocator dlmLedgerAlloctor = null; - PermitManager dlmLogSegmentRollingPermitManager = PermitManager.UNLIMITED_PERMIT_MANAGER; if (ClientSharingOption.SharedClients == clientSharingOption) { dlmLedgerAlloctor = this.allocator; - dlmLogSegmentRollingPermitManager = this.logSegmentRollingPermitManager; } // if there's a specified perStreamStatsLogger, user it, otherwise use the default one. StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger); @@ -902,13 +888,11 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { readerZKCForBK, /* ZKC for BookKeeper for DL Readers */ writerBKCBuilder, /* BookKeeper Builder for DL Writers */ readerBKCBuilder, /* BookKeeper Builder for DL Readers */ - lockFactory, /* Lock Factory */ - writerSegmentMetadataStore, /* Log Segment Metadata Store for DL Writers */ - readerSegmentMetadataStore, /* Log Segment Metadata Store for DL Readers */ + writerStreamMetadataStore, /* Log Segment Metadata Store for DL Writers */ + readerStreamMetadataStore, /* Log Segment Metadata Store for DL Readers */ logSegmentMetadataCache, /* Log Segment Metadata Cache */ scheduler, /* DL scheduler */ readAheadExecutor, /* Read Aheader Executor */ - lockStateExecutor, /* Lock State Executor */ channelFactory, /* Netty Channel Factory */ requestTimer, /* Request Timer */ readAheadExceptionsLogger, /* ReadAhead Exceptions Logger */ @@ -916,7 +900,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { regionId, /* Region Id */ dlmLedgerAlloctor, /* Ledger Allocator */ writeLimiter, /* Write Limiter */ - dlmLogSegmentRollingPermitManager, /* Log segment rolling limiter */ featureProvider.scope("dl"), /* Feature Provider */ statsLogger, /* Stats Logger */ perLogStatsLogger /* Per Log Stats Logger */ @@ -961,25 +944,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { validateName(nameOfStream); } - private static boolean checkIfLogExists(DistributedLogConfiguration conf, URI uri, String name) - throws IOException, IllegalArgumentException { - validateInput(conf, uri, name); - final String logRootPath = uri.getPath() + String.format("/%s", name); - return withZooKeeperClient(new ZooKeeperClientHandler<Boolean>() { - @Override - public Boolean handle(ZooKeeperClient zkc) throws IOException { - // check existence after syncing - try { - return null != Utils.sync(zkc, logRootPath).exists(logRootPath, false); - } catch (KeeperException e) { - throw new ZKException("Error on checking if log " + logRootPath + " exists", e.code()); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on checking if log " + logRootPath + " exists", e); - } - } - }, conf, uri); - } - public static Map<String, byte[]> enumerateLogsWithMetadataInNamespace(final DistributedLogConfiguration conf, final URI uri) throws IOException, IllegalArgumentException { return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() { @@ -1025,27 +989,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { return result; } - private static void createUnpartitionedStreams( - final DistributedLogConfiguration conf, - final URI uri, - final List<String> streamNames) - throws IOException, IllegalArgumentException { - withZooKeeperClient(new ZooKeeperClientHandler<Void>() { - @Override - public Void handle(ZooKeeperClient zkc) throws IOException { - for (String s : streamNames) { - try { - BKDistributedLogManager.createLog(conf, zkc, uri, s); - } catch (InterruptedException e) { - LOG.error("Interrupted on creating unpartitioned stream {} : ", s, e); - return null; - } - } - return null; - } - }, conf, uri); - } - private void checkState() throws IOException { if (closed.get()) { LOG.error("BKDistributedLogNamespace {} is already closed", namespace); @@ -1079,13 +1022,11 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { LOG.info("Ledger Allocator stopped."); } - // Unregister gauge to avoid GC spiral - this.logSegmentRollingPermitManager.close(); this.writeLimiter.close(); // Shutdown log segment metadata stores - Utils.close(writerSegmentMetadataStore); - Utils.close(readerSegmentMetadataStore); + Utils.close(writerStreamMetadataStore); + Utils.close(readerStreamMetadataStore); // Shutdown the schedulers SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(), @@ -1113,7 +1054,5 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { LOG.info("Release external resources used by channel factory."); requestTimer.stop(); LOG.info("Stopped request timer"); - SchedulerUtils.shutdownScheduler(lockStateExecutor, 5000, TimeUnit.MILLISECONDS); - LOG.info("Stopped lock state executor"); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java index 2a6e85b..4f138f2 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java @@ -20,12 +20,9 @@ package com.twitter.distributedlog; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.twitter.distributedlog.callback.LogSegmentNamesListener; -import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.LogEmptyException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; import com.twitter.distributedlog.io.AsyncAbortable; import com.twitter.distributedlog.io.AsyncCloseable; @@ -33,6 +30,7 @@ import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.util.Function; @@ -45,10 +43,6 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.runtime.AbstractFunction0; @@ -95,8 +89,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { protected final ZKLogMetadata logMetadata; protected final DistributedLogConfiguration conf; - protected final ZooKeeperClient zooKeeperClient; protected final BookKeeperClient bookKeeperClient; + protected final LogStreamMetadataStore streamMetadataStore; protected final LogSegmentMetadataStore metadataStore; protected final LogSegmentMetadataCache metadataCache; protected final int firstNumEntriesPerReadLastRecordScan; @@ -112,8 +106,6 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { // Maintain the list of log segments per stream protected final PerStreamLogSegmentCache logSegmentCache; - - // trace protected final long metadataLatencyWarnThresholdMillis; @@ -130,15 +122,13 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { */ BKLogHandler(ZKLogMetadata metadata, DistributedLogConfiguration conf, - ZooKeeperClientBuilder zkcBuilder, BookKeeperClientBuilder bkcBuilder, - LogSegmentMetadataStore metadataStore, + LogStreamMetadataStore streamMetadataStore, LogSegmentMetadataCache metadataCache, OrderedScheduler scheduler, StatsLogger statsLogger, AlertStatsLogger alertStatsLogger, String lockClientId) { - Preconditions.checkNotNull(zkcBuilder); Preconditions.checkNotNull(bkcBuilder); this.logMetadata = metadata; this.conf = conf; @@ -148,13 +138,11 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { this.logSegmentCache = new PerStreamLogSegmentCache( metadata.getLogName(), conf.isLogSegmentSequenceNumberValidationEnabled()); - firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan(); maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan(); - this.zooKeeperClient = zkcBuilder.build(); - LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath()); this.bookKeeperClient = bkcBuilder.build(); - this.metadataStore = metadataStore; + this.streamMetadataStore = streamMetadataStore; + this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore(); this.metadataCache = metadataCache; this.lockClientId = lockClientId; @@ -188,7 +176,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() { final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() { + streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) + .addEventListener(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { readLogSegmentsFromStore( @@ -234,7 +223,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) { final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() { + streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) + .addEventListener(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { readLogSegmentsFromStore( @@ -381,8 +371,8 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { * @return the count of records present in the range */ public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) { - - return checkLogStreamExistsAsync().flatMap(new Function<Void, Future<Long>>() { + return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) + .flatMap(new Function<Void, Future<Long>>() { public Future<Long> apply(Void done) { return readLogSegmentsFromStore( @@ -417,48 +407,6 @@ public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { return sum; } - Future<Void> checkLogStreamExistsAsync() { - final Promise<Void> promise = new Promise<Void>(); - try { - final ZooKeeper zk = zooKeeperClient.get(); - zk.sync(logMetadata.getLogSegmentsPath(), new AsyncCallback.VoidCallback() { - @Override - public void processResult(int syncRc, String path, Object syncCtx) { - if (KeeperException.Code.NONODE.intValue() == syncRc) { - promise.setException(new LogNotFoundException( - String.format("Log %s does not exist or has been deleted", getFullyQualifiedName()))); - return; - } else if (KeeperException.Code.OK.intValue() != syncRc){ - promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(), - KeeperException.create(KeeperException.Code.get(syncRc)))); - return; - } - zk.exists(logMetadata.getLogSegmentsPath(), false, new AsyncCallback.StatCallback() { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - promise.setValue(null); - } else if (KeeperException.Code.NONODE.intValue() == rc) { - promise.setException(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName()))); - } else { - promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(), - KeeperException.create(KeeperException.Code.get(rc)))); - } - } - }, null); - } - }, null); - - } catch (InterruptedException ie) { - LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie); - promise.setException(new DLInterruptedException("Interrupted while checking " - + logMetadata.getLogSegmentsPath(), ie)); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } - return promise; - } - @Override public Future<Void> asyncAbort() { return asyncClose(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java index 30a96ff..1963172 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java @@ -31,8 +31,6 @@ import com.twitter.distributedlog.callback.LogSegmentListener; import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.LockCancelledException; import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; @@ -40,12 +38,9 @@ import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.lock.DistributedLock; -import com.twitter.distributedlog.lock.SessionLockFactory; -import com.twitter.distributedlog.lock.ZKDistributedLock; -import com.twitter.distributedlog.lock.ZKSessionLockFactory; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.readahead.ReadAheadWorker; import com.twitter.distributedlog.stats.BroadCastStatsLogger; import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; @@ -53,7 +48,6 @@ import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Utils; import com.twitter.util.ExceptionalFunction; -import com.twitter.util.ExceptionalFunction0; import com.twitter.util.Function; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; @@ -67,14 +61,13 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function0; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; +import javax.annotation.Nullable; + /** * Log Handler for Readers. * <h3>Metrics</h3> @@ -111,7 +104,7 @@ import scala.runtime.BoxedUnit; * becoming idle. * </ul> * <h4>Read Lock</h4> - * All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock} + * All read lock related stats are exposed under scope `read_lock`. * for detail stats. */ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { @@ -126,10 +119,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { protected ReadAheadWorker readAheadWorker = null; private final boolean isHandleForReading; - private final SessionLockFactory lockFactory; - private final OrderedScheduler lockStateExecutor; private final Optional<String> subscriberId; - private final String readLockPath; private DistributedLock readLock; private Future<Void> lockAcquireFuture; @@ -156,12 +146,10 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { Optional<String> subscriberId, DistributedLogConfiguration conf, DynamicDistributedLogConfiguration dynConf, - ZooKeeperClientBuilder zkcBuilder, BookKeeperClientBuilder bkcBuilder, - LogSegmentMetadataStore metadataStore, + LogStreamMetadataStore streamMetadataStore, LogSegmentMetadataCache metadataCache, OrderedScheduler scheduler, - OrderedScheduler lockStateExecutor, OrderedScheduler readAheadExecutor, AlertStatsLogger alertStatsLogger, ReadAheadExceptionsLogger readAheadExceptionsLogger, @@ -173,9 +161,8 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { boolean deserializeRecordSet) { super(logMetadata, conf, - zkcBuilder, bkcBuilder, - metadataStore, + streamMetadataStore, metadataCache, scheduler, statsLogger, @@ -206,23 +193,12 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { Ticker.systemTicker()); this.subscriberId = subscriberId; - this.readLockPath = logMetadata.getReadLockPath(subscriberId); - this.lockStateExecutor = lockStateExecutor; - this.lockFactory = new ZKSessionLockFactory( - zooKeeperClient, - getLockClientId(), - lockStateExecutor, - conf.getZKNumRetries(), - conf.getLockTimeoutMilliSeconds(), - conf.getZKRetryBackoffStartMillis(), - statsLogger.scope("read_lock")); - this.isHandleForReading = isHandleForReading; } @VisibleForTesting String getReadLockPath() { - return readLockPath; + return logMetadataForReader.getReadLockPath(subscriberId); } <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> result) { @@ -234,38 +210,24 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { }); } + Future<Void> checkLogStreamExists() { + return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()); + } + /** * Elective stream lock--readers are not required to acquire the lock before using the stream. */ synchronized Future<Void> lockStream() { if (null == lockAcquireFuture) { - final Function0<DistributedLock> lockFunction = new ExceptionalFunction0<DistributedLock>() { - @Override - public DistributedLock applyE() throws IOException { - // Unfortunately this has a blocking call which we should not execute on the - // ZK completion thread - BKLogReadHandler.this.readLock = new ZKDistributedLock( - lockStateExecutor, - lockFactory, - readLockPath, - conf.getLockTimeoutMilliSeconds(), - statsLogger.scope("read_lock")); - - LOG.info("acquiring readlock {} at {}", getLockClientId(), readLockPath); - return BKLogReadHandler.this.readLock; - } - }; - lockAcquireFuture = ensureReadLockPathExist().flatMap(new ExceptionalFunction<Void, Future<Void>>() { - @Override - public Future<Void> applyE(Void in) throws Throwable { - return scheduler.apply(lockFunction).flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() { + lockAcquireFuture = streamMetadataStore.createReadLock(logMetadataForReader, subscriberId) + .flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() { @Override - public Future<Void> applyE(DistributedLock lock) throws IOException { + public Future<Void> applyE(DistributedLock lock) throws Throwable { + BKLogReadHandler.this.readLock = lock; + LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath()); return acquireLockOnExecutorThread(lock); } }); - } - }); } return lockAcquireFuture; } @@ -292,14 +254,14 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { acquireFuture.addEventListener(new FutureEventListener<DistributedLock>() { @Override public void onSuccess(DistributedLock lock) { - LOG.info("acquired readlock {} at {}", getLockClientId(), readLockPath); + LOG.info("acquired readlock {} at {}", getLockClientId(), getReadLockPath()); satisfyPromiseAsync(threadAcquirePromise, new Return<Void>(null)); } @Override public void onFailure(Throwable cause) { LOG.info("failed to acquire readlock {} at {}", - new Object[]{getLockClientId(), readLockPath, cause}); + new Object[]{ getLockClientId(), getReadLockPath(), cause }); satisfyPromiseAsync(threadAcquirePromise, new Throw<Void>(cause)); } }); @@ -438,46 +400,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { return handleCache; } - private Future<Void> ensureReadLockPathExist() { - final Promise<Void> promise = new Promise<Void>(); - promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - FutureUtils.setException(promise, new LockCancelledException(readLockPath, "Could not ensure read lock path", t)); - return null; - } - }); - Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath()); - Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate, - new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT, - new org.apache.zookeeper.AsyncCallback.StringCallback() { - @Override - public void processResult(final int rc, final String path, Object ctx, String name) { - scheduler.submit(new Runnable() { - @Override - public void run() { - if (KeeperException.Code.NONODE.intValue() == rc) { - FutureUtils.setException(promise, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName()))); - } else if (KeeperException.Code.OK.intValue() == rc) { - FutureUtils.setValue(promise, null); - LOG.trace("Created path {}.", path); - } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - FutureUtils.setValue(promise, null); - LOG.trace("Path {} is already existed.", path); - } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) { - FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path)); - } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) { - FutureUtils.setException(promise, new DLInterruptedException(path)); - } else { - FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc))); - } - } - }); - } - }, null); - return promise; - } - public Entry.Reader getNextReadAheadEntry() throws IOException { return readAheadCache.getNextReadAheadEntry(); } @@ -560,12 +482,16 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { // Listener for log segments // - protected void registerListener(LogSegmentListener listener) { - listeners.add(listener); + protected void registerListener(@Nullable LogSegmentListener listener) { + if (null != listener) { + listeners.add(listener); + } } - protected void unregisterListener(LogSegmentListener listener) { - listeners.remove(listener); + protected void unregisterListener(@Nullable LogSegmentListener listener) { + if (null != listener) { + listeners.remove(listener); + } } protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) {