http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index 1a23228..a8b1f77 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -17,71 +17,39 @@ */ package com.twitter.distributedlog; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Ticker; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.distributedlog.DistributedLogManagerFactory.ClientSharingOption; import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.acl.DefaultAccessControlManager; -import com.twitter.distributedlog.acl.ZKAccessControlManager; -import com.twitter.distributedlog.bk.LedgerAllocator; -import com.twitter.distributedlog.bk.LedgerAllocatorUtils; import com.twitter.distributedlog.callback.NamespaceListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.AlreadyClosedException; import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.feature.CoreFeatureKeys; -import com.twitter.distributedlog.impl.ZKLogMetadataStore; -import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore; -import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.io.AsyncCloseable; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.LogMetadataStore; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.util.ConfUtils; -import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.PermitLimiter; import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.distributedlog.util.SimplePermitLimiter; import com.twitter.distributedlog.util.Utils; import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.SettableFeatureProvider; -import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.Stat; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.InetAddress; import java.net.URI; -import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.twitter.distributedlog.impl.BKDLUtils.*; +import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.WRITER; +import static com.twitter.distributedlog.util.DLUtils.validateName; /** * BKDistributedLogNamespace is the default implementation of {@link DistributedLogNamespace}. It uses @@ -119,344 +87,57 @@ import static com.twitter.distributedlog.impl.BKDLUtils.*; public class BKDistributedLogNamespace implements DistributedLogNamespace { static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogNamespace.class); - public static Builder newBuilder() { - return new Builder(); - } - - public static class Builder { - private DistributedLogConfiguration _conf = null; - private URI _uri = null; - private StatsLogger _statsLogger = NullStatsLogger.INSTANCE; - private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE; - private FeatureProvider _featureProvider = new SettableFeatureProvider("", 0); - private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID; - private int _regionId = DistributedLogConstants.LOCAL_REGION_ID; - - private Builder() {} - - public Builder conf(DistributedLogConfiguration conf) { - this._conf = conf; - return this; - } - - public Builder uri(URI uri) { - this._uri = uri; - return this; - } - - public Builder statsLogger(StatsLogger statsLogger) { - this._statsLogger = statsLogger; - return this; - } - - public Builder perLogStatsLogger(StatsLogger perLogStatsLogger) { - this._perLogStatsLogger = perLogStatsLogger; - return this; - } - - public Builder featureProvider(FeatureProvider featureProvider) { - this._featureProvider = featureProvider; - return this; - } - - public Builder clientId(String clientId) { - this._clientId = clientId; - return this; - } - - public Builder regionId(int regionId) { - this._regionId = regionId; - return this; - } - - @SuppressWarnings("deprecation") - public BKDistributedLogNamespace build() - throws IOException, NullPointerException, IllegalArgumentException { - Preconditions.checkNotNull(_conf, "No DistributedLog Configuration"); - Preconditions.checkNotNull(_uri, "No DistributedLog URI"); - Preconditions.checkNotNull(_featureProvider, "No Feature Provider"); - Preconditions.checkNotNull(_statsLogger, "No Stats Logger"); - Preconditions.checkNotNull(_featureProvider, "No Feature Provider"); - Preconditions.checkNotNull(_clientId, "No Client ID"); - // validate conf and uri - validateConfAndURI(_conf, _uri); - - // Build the namespace zookeeper client - ZooKeeperClientBuilder nsZkcBuilder = createDLZKClientBuilder( - String.format("dlzk:%s:factory_writer_shared", _uri), - _conf, - DLUtils.getZKServersFromDLUri(_uri), - _statsLogger.scope("dlzk_factory_writer_shared")); - ZooKeeperClient nsZkc = nsZkcBuilder.build(); - - // Resolve namespace binding - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(nsZkc, _uri); - - // Backward Compatible to enable per log stats by configuration settings - StatsLogger perLogStatsLogger = _perLogStatsLogger; - if (perLogStatsLogger == NullStatsLogger.INSTANCE && - _conf.getEnablePerStreamStat()) { - perLogStatsLogger = _statsLogger.scope("stream"); - } - - return new BKDistributedLogNamespace( - _conf, - _uri, - _featureProvider, - _statsLogger, - perLogStatsLogger, - _clientId, - _regionId, - nsZkcBuilder, - nsZkc, - bkdlConfig); - } - } - - static interface ZooKeeperClientHandler<T> { - T handle(ZooKeeperClient zkc) throws IOException; - } - - /** - * Run given <i>handler</i> by providing an available new zookeeper client - * - * @param handler - * Handler to process with provided zookeeper client. - * @param conf - * Distributedlog Configuration. - * @param namespace - * Distributedlog Namespace. - */ - private static <T> T withZooKeeperClient(ZooKeeperClientHandler<T> handler, - DistributedLogConfiguration conf, - URI namespace) throws IOException { - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() - .name(String.format("dlzk:%s:factory_static", namespace)) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .uri(namespace) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkAclId(conf.getZkAclId()) - .build(); - try { - return handler.handle(zkc); - } finally { - zkc.close(); - } - } - - private static String getHostIpLockClientId() { - try { - return InetAddress.getLocalHost().toString(); - } catch(Exception ex) { - return DistributedLogConstants.UNKNOWN_CLIENT_ID; - } - } - private final String clientId; private final int regionId; private final DistributedLogConfiguration conf; private final URI namespace; - private final BKDLConfig bkdlConfig; + // namespace driver + private final NamespaceDriver driver; + // resources private final OrderedScheduler scheduler; - private final OrderedScheduler readAheadExecutor; - private final ClientSocketChannelFactory channelFactory; - private final HashedWheelTimer requestTimer; - // zookeeper clients - // NOTE: The actual zookeeper client is initialized lazily when it is referenced by - // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to - // keep builders and their client wrappers here, as they will be used when - // instantiating readers or writers. - private final ZooKeeperClientBuilder sharedWriterZKCBuilderForDL; - private final ZooKeeperClient sharedWriterZKCForDL; - private final ZooKeeperClientBuilder sharedReaderZKCBuilderForDL; - private final ZooKeeperClient sharedReaderZKCForDL; - private ZooKeeperClientBuilder sharedWriterZKCBuilderForBK = null; - private ZooKeeperClient sharedWriterZKCForBK = null; - private ZooKeeperClientBuilder sharedReaderZKCBuilderForBK = null; - private ZooKeeperClient sharedReaderZKCForBK = null; - // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by - // {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to - // keep builders and their client wrappers here, as they will be used when - // instantiating readers or writers. - private final BookKeeperClientBuilder sharedWriterBKCBuilder; - private final BookKeeperClient writerBKC; - private final BookKeeperClientBuilder sharedReaderBKCBuilder; - private final BookKeeperClient readerBKC; - // ledger allocator - private final LedgerAllocator allocator; - // access control manager - private AccessControlManager accessControlManager; - // log metadata store - private final LogMetadataStore metadataStore; + private final PermitLimiter writeLimiter; + private final AsyncFailureInjector failureInjector; // log segment metadata store private final LogSegmentMetadataCache logSegmentMetadataCache; - private final LogStreamMetadataStore writerStreamMetadataStore; - private final LogStreamMetadataStore readerStreamMetadataStore; - // feature provider private final FeatureProvider featureProvider; - // Stats Loggers private final StatsLogger statsLogger; private final StatsLogger perLogStatsLogger; - protected AtomicBoolean closed = new AtomicBoolean(false); - - private final PermitLimiter writeLimiter; + protected final AtomicBoolean closed = new AtomicBoolean(false); - private BKDistributedLogNamespace( + public BKDistributedLogNamespace( DistributedLogConfiguration conf, URI uri, + NamespaceDriver driver, + OrderedScheduler scheduler, FeatureProvider featureProvider, + PermitLimiter writeLimiter, + AsyncFailureInjector failureInjector, StatsLogger statsLogger, StatsLogger perLogStatsLogger, String clientId, - int regionId, - ZooKeeperClientBuilder nsZkcBuilder, - ZooKeeperClient nsZkc, - BKDLConfig bkdlConfig) - throws IOException, IllegalArgumentException { + int regionId) { this.conf = conf; this.namespace = uri; + this.driver = driver; + this.scheduler = scheduler; this.featureProvider = featureProvider; + this.writeLimiter = writeLimiter; + this.failureInjector = failureInjector; this.statsLogger = statsLogger; this.perLogStatsLogger = perLogStatsLogger; + this.clientId = clientId; this.regionId = regionId; - this.bkdlConfig = bkdlConfig; - if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) { - this.clientId = getHostIpLockClientId(); - } else { - this.clientId = clientId; - } - - // Build resources - StatsLogger schedulerStatsLogger = statsLogger.scope("factory").scope("thread_pool"); - this.scheduler = OrderedScheduler.newBuilder() - .name("DLM-" + uri.getPath()) - .corePoolSize(conf.getNumWorkerThreads()) - .statsLogger(schedulerStatsLogger) - .perExecutorStatsLogger(schedulerStatsLogger) - .traceTaskExecution(conf.getEnableTaskExecutionStats()) - .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros()) - .build(); - if (conf.getNumReadAheadWorkerThreads() > 0) { - this.readAheadExecutor = OrderedScheduler.newBuilder() - .name("DLM-" + uri.getPath() + "-readahead-executor") - .corePoolSize(conf.getNumReadAheadWorkerThreads()) - .statsLogger(statsLogger.scope("factory").scope("readahead_thread_pool")) - .traceTaskExecution(conf.getTraceReadAheadDeliveryLatency()) - .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros()) - .build(); - LOG.info("Created dedicated readahead executor : threads = {}", conf.getNumReadAheadWorkerThreads()); - } else { - this.readAheadExecutor = this.scheduler; - LOG.info("Used shared executor for readahead."); - } - - this.channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()), - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()), - conf.getBKClientNumberIOThreads()); - this.requestTimer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(), - conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, - conf.getTimeoutTimerNumTicks()); - // Build zookeeper client for writers - this.sharedWriterZKCBuilderForDL = nsZkcBuilder; - this.sharedWriterZKCForDL = nsZkc; - - // Build zookeeper client for readers - if (bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader())) { - this.sharedReaderZKCBuilderForDL = this.sharedWriterZKCBuilderForDL; - } else { - this.sharedReaderZKCBuilderForDL = createDLZKClientBuilder( - String.format("dlzk:%s:factory_reader_shared", namespace), - conf, - bkdlConfig.getDlZkServersForReader(), - statsLogger.scope("dlzk_factory_reader_shared")); - } - this.sharedReaderZKCForDL = this.sharedReaderZKCBuilderForDL.build(); - - // Build bookkeeper client for writers - this.sharedWriterBKCBuilder = createBKCBuilder( - String.format("bk:%s:factory_writer_shared", namespace), - conf, - bkdlConfig.getBkZkServersForWriter(), - bkdlConfig.getBkLedgersPath(), - Optional.of(featureProvider.scope("bkc"))); - this.writerBKC = this.sharedWriterBKCBuilder.build(); - - // Build bookkeeper client for readers - if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) { - this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder; - } else { - this.sharedReaderBKCBuilder = createBKCBuilder( - String.format("bk:%s:factory_reader_shared", namespace), - conf, - bkdlConfig.getBkZkServersForReader(), - bkdlConfig.getBkLedgersPath(), - Optional.<FeatureProvider>absent()); - } - this.readerBKC = this.sharedReaderBKCBuilder.build(); - - if (conf.getGlobalOutstandingWriteLimit() < 0) { - this.writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER; - } else { - Feature disableWriteLimitFeature = featureProvider.getFeature( - CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase()); - this.writeLimiter = new SimplePermitLimiter( - conf.getOutstandingWriteLimitDarkmode(), - conf.getGlobalOutstandingWriteLimit(), - statsLogger.scope("writeLimiter"), - true /* singleton */, - disableWriteLimitFeature); - } - - // propagate bkdlConfig to configuration - BKDLConfig.propagateConfiguration(bkdlConfig, conf); - - // Build the allocator - if (conf.getEnableLedgerAllocatorPool()) { - String allocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(conf, uri); - allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize(), - conf, sharedWriterZKCForDL, writerBKC, scheduler); - if (null != allocator) { - allocator.start(); - } - LOG.info("Created ledger allocator pool under {} with size {}.", allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize()); - } else { - allocator = null; - } - - // log metadata store - if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) { - this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, sharedReaderZKCForDL, scheduler); - } else { - this.metadataStore = new ZKLogMetadataStore(conf, namespace, sharedReaderZKCForDL, scheduler); - } - - // create log stream metadata store - this.writerStreamMetadataStore = - new ZKLogStreamMetadataStore( - clientId, - conf, - sharedWriterZKCForDL, - scheduler, - statsLogger); - this.readerStreamMetadataStore = - new ZKLogStreamMetadataStore( - clientId, - conf, - sharedReaderZKCForDL, - scheduler, - statsLogger); // create a log segment metadata cache this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker()); + } - LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, regionId = {}, federated = {}.", - new Object[] { clientId, regionId, bkdlConfig.isFederatedNamespace() }); + @Override + public NamespaceDriver getNamespaceDriver() { + return driver; } // @@ -468,8 +149,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws InvalidStreamNameException, IOException { checkState(); validateName(logName); - URI uri = FutureUtils.result(metadataStore.createLog(logName)); - FutureUtils.result(writerStreamMetadataStore.getLog(uri, logName, true, true)); + URI uri = FutureUtils.result(driver.getLogMetadataStore().createLog(logName)); + FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true)); } @Override @@ -477,17 +158,15 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws InvalidStreamNameException, LogNotFoundException, IOException { checkState(); validateName(logName); - Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); + Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); if (!uri.isPresent()) { throw new LogNotFoundException("Log " + logName + " isn't found."); } - DistributedLogManager dlm = createDistributedLogManager( + DistributedLogManager dlm = openLogInternal( uri.get(), logName, - ClientSharingOption.SharedClients, Optional.<DistributedLogConfiguration>absent(), - Optional.<DynamicDistributedLogConfiguration>absent(), - Optional.<StatsLogger>absent()); + Optional.<DynamicDistributedLogConfiguration>absent()); dlm.delete(); } @@ -508,27 +187,26 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { throws InvalidStreamNameException, IOException { checkState(); validateName(logName); - Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); + Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); if (!uri.isPresent()) { throw new LogNotFoundException("Log " + logName + " isn't found."); } - return createDistributedLogManager( + return openLogInternal( uri.get(), logName, - ClientSharingOption.SharedClients, logConf, - dynamicLogConf, - perStreamStatsLogger); + dynamicLogConf); } @Override public boolean logExists(String logName) throws IOException, IllegalArgumentException { checkState(); - Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName)); + Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); if (uri.isPresent()) { try { - FutureUtils.result(writerStreamMetadataStore.logExists(uri.get(), logName)); + FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) + .logExists(uri.get(), logName)); return true; } catch (LogNotFoundException lnfe) { return false; @@ -541,240 +219,18 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { @Override public Iterator<String> getLogs() throws IOException { checkState(); - return FutureUtils.result(metadataStore.getLogs()); + return FutureUtils.result(driver.getLogMetadataStore().getLogs()); } @Override public void registerNamespaceListener(NamespaceListener listener) { - metadataStore.registerNamespaceListener(listener); + driver.getLogMetadataStore().registerNamespaceListener(listener); } @Override public synchronized AccessControlManager createAccessControlManager() throws IOException { checkState(); - if (null == accessControlManager) { - String aclRootPath = bkdlConfig.getACLRootPath(); - // Build the access control manager - if (aclRootPath == null) { - accessControlManager = DefaultAccessControlManager.INSTANCE; - LOG.info("Created default access control manager for {}", namespace); - } else { - if (!isReservedStreamName(aclRootPath)) { - throw new IOException("Invalid Access Control List Root Path : " + aclRootPath); - } - String zkRootPath = namespace.getPath() + "/" + aclRootPath; - LOG.info("Creating zk based access control manager @ {} for {}", - zkRootPath, namespace); - accessControlManager = new ZKAccessControlManager(conf, sharedReaderZKCForDL, - zkRootPath, scheduler); - LOG.info("Created zk based access control manager @ {} for {}", - zkRootPath, namespace); - } - } - return accessControlManager; - } - - // - // Legacy methods - // - - static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI uri) throws IOException { - String poolPath = conf.getLedgerAllocatorPoolPath(); - LOG.info("PoolPath is {}", poolPath); - if (null == poolPath || !poolPath.startsWith(".") || poolPath.endsWith("/")) { - LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath); - throw new IOException("Invalid ledger allocator pool path specified : " + poolPath); - } - String poolName = conf.getLedgerAllocatorPoolName(); - if (null == poolName) { - LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool."); - throw new IOException("No ledger allocator name specified when enabling ledger allocator pool."); - } - String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName; - try { - PathUtils.validatePath(rootPath); - } catch (IllegalArgumentException iae) { - LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath); - throw new IOException("Invalid ledger allocator pool path specified : " + poolPath); - } - return rootPath; - } - - public static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName, - DistributedLogConfiguration conf, - String zkServers, - StatsLogger statsLogger) { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); - } - ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder() - .name(zkcName) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkServers(zkServers) - .retryPolicy(retryPolicy) - .statsLogger(statsLogger) - .zkAclId(conf.getZkAclId()); - LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}," - + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(), - conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() }); - return builder; - } - - private static ZooKeeperClientBuilder createBKZKClientBuilder(String zkcName, - DistributedLogConfiguration conf, - String zkServers, - StatsLogger statsLogger) { - RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getBKClientZKRetryBackoffStartMillis(), - conf.getBKClientZKRetryBackoffMaxMillis(), - conf.getBKClientZKNumRetries()); - ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder() - .name(zkcName) - .sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getBKClientZKRequestRateLimit()) - .zkServers(zkServers) - .retryPolicy(retryPolicy) - .statsLogger(statsLogger) - .zkAclId(conf.getZkAclId()); - LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}," - + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getBKClientZKNumRetries(), - conf.getBKClientZKSessionTimeoutMilliSeconds(), conf.getBKClientZKRetryBackoffStartMillis(), - conf.getBKClientZKRetryBackoffMaxMillis(), conf.getZkAclId() }); - return builder; - } - - private BookKeeperClientBuilder createBKCBuilder(String bkcName, - DistributedLogConfiguration conf, - String zkServers, - String ledgersPath, - Optional<FeatureProvider> featureProviderOptional) { - BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder() - .name(bkcName) - .dlConfig(conf) - .zkServers(zkServers) - .ledgersPath(ledgersPath) - .channelFactory(channelFactory) - .requestTimer(requestTimer) - .featureProvider(featureProviderOptional) - .statsLogger(statsLogger); - LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}", - new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() }); - return builder; - } - - @VisibleForTesting - public ZooKeeperClient getSharedWriterZKCForDL() { - return sharedWriterZKCForDL; - } - - @VisibleForTesting - public BookKeeperClient getReaderBKC() { - return readerBKC; - } - - @VisibleForTesting - public LogStreamMetadataStore getWriterStreamMetadataStore() { - return writerStreamMetadataStore; - } - - @VisibleForTesting - public LedgerAllocator getLedgerAllocator() { - return allocator; - } - - /** - * Run given <i>handler</i> by providing an available zookeeper client. - * - * @param handler - * Handler to process with provided zookeeper client. - * @return result processed by handler. - * @throws IOException - */ - private <T> T withZooKeeperClient(ZooKeeperClientHandler<T> handler) throws IOException { - checkState(); - return handler.handle(sharedWriterZKCForDL); - } - - /** - * Create a DistributedLogManager for <i>nameOfLogStream</i>, with default shared clients. - * - * @param nameOfLogStream - * name of log stream - * @return distributedlog manager - * @throws com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name is invalid - * @throws IOException - */ - public DistributedLogManager createDistributedLogManagerWithSharedClients(String nameOfLogStream) - throws InvalidStreamNameException, IOException { - return createDistributedLogManager(nameOfLogStream, ClientSharingOption.SharedClients); - } - - /** - * Create a DistributedLogManager for <i>nameOfLogStream</i>, with specified client sharing options. - * - * @param nameOfLogStream - * name of log stream. - * @param clientSharingOption - * specifies if the ZK/BK clients are shared - * @return distributedlog manager instance. - * @throws com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name is invalid - * @throws IOException - */ - public DistributedLogManager createDistributedLogManager( - String nameOfLogStream, - ClientSharingOption clientSharingOption) - throws InvalidStreamNameException, IOException { - Optional<DistributedLogConfiguration> logConfiguration = Optional.absent(); - Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration = Optional.absent(); - return createDistributedLogManager( - nameOfLogStream, - clientSharingOption, - logConfiguration, - dynamicLogConfiguration); - } - - /** - * Create a DistributedLogManager for <i>nameOfLogStream</i>, with specified client sharing options. - * Override whitelisted stream-level configuration settings with settings found in - * <i>logConfiguration</i>. - * - * - * @param nameOfLogStream - * name of log stream. - * @param clientSharingOption - * specifies if the ZK/BK clients are shared - * @param logConfiguration - * stream configuration overrides. - * @param dynamicLogConfiguration - * dynamic stream configuration overrides. - * @return distributedlog manager instance. - * @throws com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name is invalid - * @throws IOException - */ - public DistributedLogManager createDistributedLogManager( - String nameOfLogStream, - ClientSharingOption clientSharingOption, - Optional<DistributedLogConfiguration> logConfiguration, - Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration) - throws InvalidStreamNameException, IOException { - if (bkdlConfig.isFederatedNamespace()) { - throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace"); - } - return createDistributedLogManager( - namespace, - nameOfLogStream, - clientSharingOption, - logConfiguration, - dynamicLogConfiguration, - Optional.<StatsLogger>absent() - ); + return driver.getAccessControlManager(); } /** @@ -784,8 +240,6 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { * location to store the log * @param nameOfLogStream * name of the log - * @param clientSharingOption - * client sharing option * @param logConfiguration * optional stream configuration * @param dynamicLogConfiguration @@ -794,13 +248,11 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { * @throws InvalidStreamNameException if the stream name is invalid * @throws IOException */ - protected DistributedLogManager createDistributedLogManager( + protected DistributedLogManager openLogInternal( URI uri, String nameOfLogStream, - ClientSharingOption clientSharingOption, Optional<DistributedLogConfiguration> logConfiguration, - Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration, - Optional<StatsLogger> perStreamStatsLogger) + Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration) throws InvalidStreamNameException, IOException { // Make sure the name is well formed checkState(); @@ -817,172 +269,34 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { dynConf = ConfUtils.getConstDynConf(mergedConfiguration); } - ZooKeeperClientBuilder writerZKCBuilderForDL = null; - ZooKeeperClientBuilder readerZKCBuilderForDL = null; - ZooKeeperClient writerZKCForBK = null; - ZooKeeperClient readerZKCForBK = null; - BookKeeperClientBuilder writerBKCBuilder = null; - BookKeeperClientBuilder readerBKCBuilder = null; - - switch(clientSharingOption) { - case SharedClients: - writerZKCBuilderForDL = sharedWriterZKCBuilderForDL; - readerZKCBuilderForDL = sharedReaderZKCBuilderForDL; - writerBKCBuilder = sharedWriterBKCBuilder; - readerBKCBuilder = sharedReaderBKCBuilder; - break; - case SharedZKClientPerStreamBKClient: - writerZKCBuilderForDL = sharedWriterZKCBuilderForDL; - readerZKCBuilderForDL = sharedReaderZKCBuilderForDL; - synchronized (this) { - if (null == this.sharedWriterZKCForBK) { - this.sharedWriterZKCBuilderForBK = createBKZKClientBuilder( - String.format("bkzk:%s:factory_writer_shared", uri), - mergedConfiguration, - bkdlConfig.getBkZkServersForWriter(), - statsLogger.scope("bkzk_factory_writer_shared")); - this.sharedWriterZKCForBK = this.sharedWriterZKCBuilderForBK.build(); - } - if (null == this.sharedReaderZKCForBK) { - if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) { - this.sharedReaderZKCBuilderForBK = this.sharedWriterZKCBuilderForBK; - } else { - this.sharedReaderZKCBuilderForBK = createBKZKClientBuilder( - String.format("bkzk:%s:factory_reader_shared", uri), - mergedConfiguration, - bkdlConfig.getBkZkServersForReader(), - statsLogger.scope("bkzk_factory_reader_shared")); - } - this.sharedReaderZKCForBK = this.sharedReaderZKCBuilderForBK.build(); - } - writerZKCForBK = this.sharedWriterZKCForBK; - readerZKCForBK = this.sharedReaderZKCForBK; - } - break; - } - - LedgerAllocator dlmLedgerAlloctor = null; - if (ClientSharingOption.SharedClients == clientSharingOption) { - dlmLedgerAlloctor = this.allocator; - } - // if there's a specified perStreamStatsLogger, user it, otherwise use the default one. - StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger); - return new BKDistributedLogManager( nameOfLogStream, /* Log Name */ mergedConfiguration, /* Configuration */ dynConf, /* Dynamic Configuration */ - uri, /* Namespace */ - writerZKCBuilderForDL, /* ZKC Builder for DL Writer */ - readerZKCBuilderForDL, /* ZKC Builder for DL Reader */ - writerZKCForBK, /* ZKC for BookKeeper for DL Writers */ - readerZKCForBK, /* ZKC for BookKeeper for DL Readers */ - writerBKCBuilder, /* BookKeeper Builder for DL Writers */ - readerBKCBuilder, /* BookKeeper Builder for DL Readers */ - writerStreamMetadataStore, /* Log Segment Metadata Store for DL Writers */ - readerStreamMetadataStore, /* Log Segment Metadata Store for DL Readers */ + uri, /* Namespace URI */ + driver, /* Namespace Driver */ logSegmentMetadataCache, /* Log Segment Metadata Cache */ scheduler, /* DL scheduler */ - readAheadExecutor, /* Read Aheader Executor */ - channelFactory, /* Netty Channel Factory */ - requestTimer, /* Request Timer */ clientId, /* Client Id */ regionId, /* Region Id */ - dlmLedgerAlloctor, /* Ledger Allocator */ writeLimiter, /* Write Limiter */ featureProvider.scope("dl"), /* Feature Provider */ + failureInjector, /* Failure Injector */ statsLogger, /* Stats Logger */ - perLogStatsLogger /* Per Log Stats Logger */ + perLogStatsLogger, /* Per Log Stats Logger */ + Optional.<AsyncCloseable>absent() /* shared resources, we don't need to close any resources in dlm */ ); } - public MetadataAccessor createMetadataAccessor(String nameOfMetadataNode) - throws InvalidStreamNameException, IOException { - if (bkdlConfig.isFederatedNamespace()) { - throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace"); - } - checkState(); - validateName(nameOfMetadataNode); - return new ZKMetadataAccessor(nameOfMetadataNode, conf, namespace, - sharedWriterZKCBuilderForDL, sharedReaderZKCBuilderForDL, statsLogger); - } - - public Collection<String> enumerateAllLogsInNamespace() - throws IOException, IllegalArgumentException { - if (bkdlConfig.isFederatedNamespace()) { - throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace"); - } - return Sets.newHashSet(getLogs()); - } - - public Map<String, byte[]> enumerateLogsWithMetadataInNamespace() - throws IOException, IllegalArgumentException { - if (bkdlConfig.isFederatedNamespace()) { - throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace"); - } - return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() { - @Override - public Map<String, byte[]> handle(ZooKeeperClient zkc) throws IOException { - return enumerateLogsWithMetadataInternal(zkc, conf, namespace); - } - }); - } - - private static void validateInput(DistributedLogConfiguration conf, URI uri, String nameOfStream) - throws IllegalArgumentException, InvalidStreamNameException { - validateConfAndURI(conf, uri); - validateName(nameOfStream); - } - - public static Map<String, byte[]> enumerateLogsWithMetadataInNamespace(final DistributedLogConfiguration conf, final URI uri) - throws IOException, IllegalArgumentException { - return withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() { - @Override - public Map<String, byte[]> handle(ZooKeeperClient zkc) throws IOException { - return enumerateLogsWithMetadataInternal(zkc, conf, uri); - } - }, conf, uri); - } - - private static Map<String, byte[]> enumerateLogsWithMetadataInternal(ZooKeeperClient zkc, - DistributedLogConfiguration conf, URI uri) - throws IOException, IllegalArgumentException { - validateConfAndURI(conf, uri); - String namespaceRootPath = uri.getPath(); - HashMap<String, byte[]> result = new HashMap<String, byte[]>(); - try { - ZooKeeper zk = Utils.sync(zkc, namespaceRootPath); - Stat currentStat = zk.exists(namespaceRootPath, false); - if (currentStat == null) { - return result; - } - List<String> children = zk.getChildren(namespaceRootPath, false); - for(String child: children) { - if (isReservedStreamName(child)) { - continue; - } - String zkPath = String.format("%s/%s", namespaceRootPath, child); - currentStat = zk.exists(zkPath, false); - if (currentStat == null) { - result.put(child, new byte[0]); - } else { - result.put(child, zk.getData(zkPath, false, currentStat)); - } - } - } catch (InterruptedException ie) { - LOG.error("Interrupted while deleting " + namespaceRootPath, ie); - throw new IOException("Interrupted while reading " + namespaceRootPath, ie); - } catch (KeeperException ke) { - LOG.error("Error reading" + namespaceRootPath + "entry in zookeeper", ke); - throw new IOException("Error reading" + namespaceRootPath + "entry in zookeeper", ke); - } - return result; - } - + /** + * Check the namespace state. + * + * @throws IOException + */ private void checkState() throws IOException { if (closed.get()) { - LOG.error("BKDistributedLogNamespace {} is already closed", namespace); - throw new AlreadyClosedException("Namespace " + namespace + " is already closed"); + LOG.error("BK namespace {} is already closed", namespace); + throw new AlreadyClosedException("BK namespace " + namespace + " is already closed"); } } @@ -991,58 +305,16 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { */ @Override public void close() { - ZooKeeperClient writerZKC; - ZooKeeperClient readerZKC; - AccessControlManager acm; - if (closed.compareAndSet(false, true)) { - writerZKC = sharedWriterZKCForBK; - readerZKC = sharedReaderZKCForBK; - acm = accessControlManager; - } else { + if (!closed.compareAndSet(false, true)) { return; } - if (null != acm) { - acm.close(); - LOG.info("Access Control Manager Stopped."); - } - - // Close the allocator - if (null != allocator) { - Utils.closeQuietly(allocator); - LOG.info("Ledger Allocator stopped."); - } - + // shutdown the driver + Utils.close(driver); + // close the write limiter this.writeLimiter.close(); - - // Shutdown log segment metadata stores - Utils.close(writerStreamMetadataStore); - Utils.close(readerStreamMetadataStore); - // Shutdown the schedulers SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(), - TimeUnit.MILLISECONDS); - LOG.info("Executor Service Stopped."); - if (scheduler != readAheadExecutor) { - SchedulerUtils.shutdownScheduler(readAheadExecutor, conf.getSchedulerShutdownTimeoutMs(), TimeUnit.MILLISECONDS); - LOG.info("ReadAhead Executor Service Stopped."); - } - - writerBKC.close(); - readerBKC.close(); - sharedWriterZKCForDL.close(); - sharedReaderZKCForDL.close(); - - // Close shared zookeeper clients for bk - if (null != writerZKC) { - writerZKC.close(); - } - if (null != readerZKC) { - readerZKC.close(); - } - channelFactory.releaseExternalResources(); - LOG.info("Release external resources used by channel factory."); - requestTimer.stop(); - LOG.info("Stopped request timer"); + LOG.info("Executor Service Stopped."); } }
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index 25b25e2..2486297 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -20,7 +20,6 @@ package com.twitter.distributedlog; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; -import com.twitter.distributedlog.bk.LedgerAllocator; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.exceptions.EndOfStreamException; @@ -29,8 +28,8 @@ import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.GetLastTxIdFunction; -import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryWriter; import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; @@ -41,6 +40,7 @@ import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy; import com.twitter.distributedlog.metadata.LogStreamMetadataStore; import com.twitter.distributedlog.metadata.MetadataUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; +import com.twitter.distributedlog.util.Allocator; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FailpointUtils; import com.twitter.distributedlog.util.FutureUtils; @@ -53,7 +53,6 @@ import com.twitter.util.Function; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; -import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -88,9 +87,22 @@ import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_F class BKLogWriteHandler extends BKLogHandler { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); + private static Transaction.OpListener<LogSegmentEntryWriter> NULL_OP_LISTENER = + new Transaction.OpListener<LogSegmentEntryWriter>() { + @Override + public void onCommit(LogSegmentEntryWriter r) { + // no-op + } + + @Override + public void onAbort(Throwable t) { + // no-op + } + }; + protected final LogMetadataForWriter logMetadataForWriter; + protected final Allocator<LogSegmentEntryWriter, Object> logSegmentAllocator; protected final DistributedLock lock; - protected final LedgerAllocator ledgerAllocator; protected final MaxTxId maxTxId; protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo; protected final boolean validateLogSegmentSequenceNumber; @@ -154,7 +166,7 @@ class BKLogWriteHandler extends BKLogHandler { LogSegmentMetadataCache metadataCache, LogSegmentEntryStore entryStore, OrderedScheduler scheduler, - LedgerAllocator allocator, + Allocator<LogSegmentEntryWriter, Object> segmentAllocator, StatsLogger statsLogger, StatsLogger perLogStatsLogger, AlertStatsLogger alertStatsLogger, @@ -174,11 +186,11 @@ class BKLogWriteHandler extends BKLogHandler { alertStatsLogger, clientId); this.logMetadataForWriter = logMetadata; + this.logSegmentAllocator = segmentAllocator; this.perLogStatsLogger = perLogStatsLogger; this.writeLimiter = writeLimiter; this.featureProvider = featureProvider; this.dynConf = dynConf; - this.ledgerAllocator = allocator; this.lock = lock; this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); @@ -523,7 +535,7 @@ class BKLogWriteHandler extends BKLogHandler { } try { - ledgerAllocator.allocate(); + logSegmentAllocator.allocate(); } catch (IOException e) { // failed to issue an allocation request failStartLogSegment(promise, bestEffort, e); @@ -541,25 +553,16 @@ class BKLogWriteHandler extends BKLogHandler { return; } - ledgerAllocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() { - @Override - public void onCommit(LedgerHandle lh) { - // no-op - } - - @Override - public void onAbort(Throwable t) { - // no-op - } - }).addEventListener(new FutureEventListener<LedgerHandle>() { + logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER) + .addEventListener(new FutureEventListener<LogSegmentEntryWriter>() { @Override - public void onSuccess(LedgerHandle lh) { + public void onSuccess(LogSegmentEntryWriter entryWriter) { // try-obtain succeed createInprogressLogSegment( txn, txId, - lh, + entryWriter, bestEffort, promise); } @@ -586,7 +589,7 @@ class BKLogWriteHandler extends BKLogHandler { // just leak from the allocation pool - hence cause "No Ledger Allocator" private void createInprogressLogSegment(Transaction<Object> txn, final long txId, - final LedgerHandle lh, + final LogSegmentEntryWriter entryWriter, boolean bestEffort, final Promise<BKLogSegmentWriter> promise) { final long logSegmentSeqNo; @@ -601,13 +604,15 @@ class BKLogWriteHandler extends BKLogHandler { return; } - final String inprogressZnodePath = inprogressZNode(lh.getId(), txId, logSegmentSeqNo); + final String inprogressZnodePath = inprogressZNode( + entryWriter.getLogSegmentId(), txId, logSegmentSeqNo); final LogSegmentMetadata l = new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath, - conf.getDLLedgerMetadataLayoutVersion(), lh.getId(), txId) + conf.getDLLedgerMetadataLayoutVersion(), entryWriter.getLogSegmentId(), txId) .setLogSegmentSequenceNo(logSegmentSeqNo) .setRegionId(regionId) - .setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion())) + .setEnvelopeEntries( + LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion())) .build(); // Create an inprogress segment @@ -631,7 +636,7 @@ class BKLogWriteHandler extends BKLogHandler { l.getSegmentName(), conf, conf.getDLLedgerMetadataLayoutVersion(), - new BKLogSegmentEntryWriter(lh), + entryWriter, lock, txId, logSegmentSeqNo, @@ -1268,8 +1273,7 @@ class BKLogWriteHandler extends BKLogHandler { public Future<Void> asyncClose() { return Utils.closeSequence(scheduler, lock, - ledgerAllocator - ); + logSegmentAllocator); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java index c39ae4c..8d3c418 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java @@ -250,15 +250,22 @@ public class BookKeeperClient { return promise; } - public synchronized void close() { - if (closed) { - return; + public void close() { + BookKeeper bkcToClose; + ZooKeeperClient zkcToClose; + synchronized (this) { + if (closed) { + return; + } + closed = true; + bkcToClose = bkc; + zkcToClose = zkc; } LOG.info("BookKeeper Client closed {}", name); - if (null != bkc) { + if (null != bkcToClose) { try { - bkc.close(); + bkcToClose.close(); } catch (InterruptedException e) { LOG.warn("Interrupted on closing bookkeeper client {} : ", name, e); Thread.currentThread().interrupt(); @@ -266,12 +273,11 @@ public class BookKeeperClient { LOG.warn("Error on closing bookkeeper client {} : ", name, e); } } - if (null != zkc) { + if (null != zkcToClose) { if (ownZK) { - zkc.close(); + zkcToClose.close(); } } - closed = true; } public synchronized void checkClosedOrInError() throws AlreadyClosedException { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java index 6c6017e..6da4b8d 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java @@ -1301,6 +1301,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration { * @return number of dedicated readahead worker threads. * @see #getNumWorkerThreads() */ + @Deprecated public int getNumReadAheadWorkerThreads() { return getInt(BKDL_NUM_READAHEAD_WORKER_THREADS, 0); } @@ -1313,6 +1314,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration { * @return configuration * @see #getNumReadAheadWorkerThreads() */ + @Deprecated public DistributedLogConfiguration setNumReadAheadWorkerThreads(int numWorkerThreads) { setProperty(BKDL_NUM_READAHEAD_WORKER_THREADS, numWorkerThreads); return this; @@ -3515,8 +3517,11 @@ public class DistributedLogConfiguration extends CompositeConfiguration { Preconditions.checkArgument(getBKClientReadTimeout() * 1000 >= getReadLACLongPollTimeout(), "Invalid timeout configuration: bkcReadTimeoutSeconds ("+getBKClientReadTimeout()+ ") should be longer than readLACLongPollTimeout ("+getReadLACLongPollTimeout()+")"); - Preconditions.checkArgument(getReaderIdleWarnThresholdMillis() > 2 * getReadLACLongPollTimeout(), - "Invalid configuration: ReaderIdleWarnThreshold should be 2x larget than readLACLongPollTimeout"); + long readerIdleWarnThresholdMs = getReaderIdleWarnThresholdMillis(); + if (readerIdleWarnThresholdMs > 0) { // NOTE: some test cases set the idle warn threshold to 0 + Preconditions.checkArgument(readerIdleWarnThresholdMs > 2 * getReadLACLongPollTimeout(), + "Invalid configuration: ReaderIdleWarnThreshold should be 2x larget than readLACLongPollTimeout"); + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java index ccb0778..34cfb65 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java @@ -18,9 +18,13 @@ package com.twitter.distributedlog; import com.twitter.distributedlog.callback.LogSegmentListener; +import com.twitter.distributedlog.io.AsyncCloseable; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.subscription.SubscriptionStateStore; import com.twitter.distributedlog.subscription.SubscriptionsStore; import com.twitter.util.Future; + +import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -31,7 +35,20 @@ import java.util.List; * each conceptual place of storage corresponds to exactly one instance of * this class, which is created when the EditLog is first opened. */ -public interface DistributedLogManager extends MetadataAccessor { +public interface DistributedLogManager extends AsyncCloseable, Closeable { + + /** + * Get the name of the stream managed by this log manager + * @return streamName + */ + public String getStreamName(); + + /** + * Get the namespace driver used by this manager. + * + * @return the namespace driver + */ + public NamespaceDriver getNamespaceDriver(); /** * Get log segments. @@ -282,15 +299,6 @@ public interface DistributedLogManager extends MetadataAccessor { public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; /** - * Get the subscription state storage provided by the distributed log manager - * - * @param subscriberId - Application specific Id associated with the subscriber - * @return Subscription state store - */ - @Deprecated - public SubscriptionStateStore getSubscriptionStateStore(String subscriberId); - - /** * Get the subscriptions store provided by the distributedlog manager. * * @return subscriptions store manages subscriptions for current stream. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java deleted file mode 100644 index 4caeeba..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManagerFactory.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Optional; -import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.Collection; -import java.util.Map; - -/** - * This is the legacy way to access bookkeeper based distributedlog namespace. - * Use {@link DistributedLogNamespace} to manage logs instead if you could. - */ -@Deprecated -public class DistributedLogManagerFactory { - static final Logger LOG = LoggerFactory.getLogger(DistributedLogManagerFactory.class); - - public static enum ClientSharingOption { - PerStreamClients, - SharedZKClientPerStreamBKClient, - SharedClients - } - - private final BKDistributedLogNamespace namespace; - - public DistributedLogManagerFactory(DistributedLogConfiguration conf, URI uri) - throws IOException, IllegalArgumentException { - this(conf, uri, NullStatsLogger.INSTANCE); - } - - public DistributedLogManagerFactory(DistributedLogConfiguration conf, URI uri, - StatsLogger statsLogger) - throws IOException, IllegalArgumentException { - this(conf, - uri, - statsLogger, - DistributedLogConstants.UNKNOWN_CLIENT_ID, - DistributedLogConstants.LOCAL_REGION_ID); - } - - public DistributedLogManagerFactory(DistributedLogConfiguration conf, - URI uri, - StatsLogger statsLogger, - String clientId, - int regionId) - throws IOException, IllegalArgumentException { - this.namespace = BKDistributedLogNamespace.newBuilder() - .conf(conf) - .uri(uri) - .statsLogger(statsLogger) - .clientId(clientId) - .regionId(regionId) - .build(); - } - - public DistributedLogNamespace getNamespace() { - return namespace; - } - - public void registerNamespaceListener(NamespaceListener listener) { - namespace.registerNamespaceListener(listener); - } - - /** - * Create a DistributedLogManager for <i>nameOfLogStream</i>, with default shared clients. - * - * @param nameOfLogStream - * name of log stream - * @return distributedlog manager - * @throws com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name is invalid - * @throws IOException - */ - public DistributedLogManager createDistributedLogManagerWithSharedClients(String nameOfLogStream) - throws InvalidStreamNameException, IOException { - return createDistributedLogManager(nameOfLogStream, ClientSharingOption.SharedClients); - } - - /** - * Create a DistributedLogManager for <i>nameOfLogStream</i>, with specified client sharing options. - * - * @param nameOfLogStream - * name of log stream. - * @param clientSharingOption - * specifies if the ZK/BK clients are shared - * @return distributedlog manager instance. - * @throws com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name is invalid - * @throws IOException - */ - public DistributedLogManager createDistributedLogManager( - String nameOfLogStream, - ClientSharingOption clientSharingOption) - throws InvalidStreamNameException, IOException { - Optional<DistributedLogConfiguration> streamConfiguration = Optional.absent(); - Optional<DynamicDistributedLogConfiguration> dynamicStreamConfiguration = Optional.absent(); - return createDistributedLogManager(nameOfLogStream, - clientSharingOption, - streamConfiguration, - dynamicStreamConfiguration); - } - - /** - * Create a DistributedLogManager for <i>nameOfLogStream</i>, with specified client sharing options. - * This method allows the caller to override global configuration options by supplying stream - * configuration overrides. Stream config overrides come in two flavors, static and dynamic. Static - * config never changes, and DynamicDistributedLogConfiguration is a) reloaded periodically and - * b) safe to access from any context. - * - * @param nameOfLogStream - * name of log stream. - * @param clientSharingOption - * specifies if the ZK/BK clients are shared - * @param streamConfiguration - * stream configuration overrides. - * @param dynamicStreamConfiguration - * dynamic stream configuration overrides. - * @return distributedlog manager instance. - * @throws com.twitter.distributedlog.exceptions.InvalidStreamNameException if stream name is invalid - * @throws IOException - */ - public DistributedLogManager createDistributedLogManager( - String nameOfLogStream, - ClientSharingOption clientSharingOption, - Optional<DistributedLogConfiguration> streamConfiguration, - Optional<DynamicDistributedLogConfiguration> dynamicStreamConfiguration) - throws InvalidStreamNameException, IOException { - return namespace.createDistributedLogManager( - nameOfLogStream, - clientSharingOption, - streamConfiguration, - dynamicStreamConfiguration); - } - - public MetadataAccessor createMetadataAccessor(String nameOfMetadataNode) - throws InvalidStreamNameException, IOException { - return namespace.createMetadataAccessor(nameOfMetadataNode); - } - - public synchronized AccessControlManager createAccessControlManager() throws IOException { - return namespace.createAccessControlManager(); - } - - public boolean checkIfLogExists(String nameOfLogStream) - throws IOException, IllegalArgumentException { - return namespace.logExists(nameOfLogStream); - } - - public Collection<String> enumerateAllLogsInNamespace() - throws IOException, IllegalArgumentException { - return namespace.enumerateAllLogsInNamespace(); - } - - public Map<String, byte[]> enumerateLogsWithMetadataInNamespace() - throws IOException, IllegalArgumentException { - return namespace.enumerateLogsWithMetadataInNamespace(); - } - - /** - * This method is to initialize the metadata for a unpartitioned stream with name <i>streamName</i>. - * - * TODO: after 0.2 is upgraded to 0.3, remove this. - * - * @param streamName - * stream name. - * @throws IOException - */ - public void createUnpartitionedStream(final String streamName) throws IOException { - namespace.createLog(streamName); - } - - /** - * Close the distributed log manager factory, freeing any resources it may hold. - */ - public void close() { - namespace.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java index 85a370f..f4a1e41 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java @@ -18,7 +18,7 @@ package com.twitter.distributedlog; import com.google.common.base.Optional; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookieServer; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java index 94e618a..19f4497 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java @@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference; public class ReadAheadEntryReader implements AsyncCloseable, LogSegmentListener, + LogSegmentEntryReader.StateChangeListener, FutureEventListener<List<Entry.Reader>> { private static final Logger logger = LoggerFactory.getLogger(ReadAheadEntryReader.class); @@ -169,6 +170,9 @@ public class ReadAheadEntryReader implements @Override synchronized public void onSuccess(LogSegmentEntryReader reader) { this.reader = reader; + if (reader.getSegment().isInProgress()) { + reader.registerListener(ReadAheadEntryReader.this); + } } @Override @@ -271,7 +275,7 @@ public class ReadAheadEntryReader implements // State of the reader // - private boolean isInitialized; + private boolean isInitialized = false; private boolean readAheadPaused = false; private Promise<Void> closePromise = null; // segment readers @@ -549,10 +553,22 @@ public class ReadAheadEntryReader implements } } + void markCaughtup() { + if (isCatchingUp) { + isCatchingUp = false; + logger.info("ReadAhead for {} is caught up", readHandler.getFullyQualifiedName()); + } + } + public boolean isReadAheadCaughtUp() { return !isCatchingUp; } + @Override + public void onCaughtupOnInprogress() { + markCaughtup(); + } + // // ReadAhead State Machine // http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java deleted file mode 100644 index 4d7a0e1..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZKMetadataAccessor.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.io.IOException; -import java.net.URI; - -import com.google.common.annotations.VisibleForTesting; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.metadata.BKDLConfig; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ZKMetadataAccessor implements MetadataAccessor { - static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class); - protected final String name; - protected Promise<Void> closePromise; - protected final URI uri; - // zookeeper clients - // NOTE: The actual zookeeper client is initialized lazily when it is referenced by - // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to - // keep builders and their client wrappers here, as they will be used when - // instantiating readers or writers. - protected final ZooKeeperClientBuilder writerZKCBuilder; - protected final ZooKeeperClient writerZKC; - protected final boolean ownWriterZKC; - protected final ZooKeeperClientBuilder readerZKCBuilder; - protected final ZooKeeperClient readerZKC; - protected final boolean ownReaderZKC; - - ZKMetadataAccessor(String name, - DistributedLogConfiguration conf, - URI uri, - ZooKeeperClientBuilder writerZKCBuilder, - ZooKeeperClientBuilder readerZKCBuilder, - StatsLogger statsLogger) { - this.name = name; - this.uri = uri; - - if (null == writerZKCBuilder) { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); - } - this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder() - .name(String.format("dlzk:%s:dlm_writer_shared", name)) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkAclId(conf.getZkAclId()) - .uri(uri) - .retryPolicy(retryPolicy) - .statsLogger(statsLogger.scope("dlzk_dlm_writer_shared")); - this.ownWriterZKC = true; - } else { - this.writerZKCBuilder = writerZKCBuilder; - this.ownWriterZKC = false; - } - this.writerZKC = this.writerZKCBuilder.build(); - - if (null == readerZKCBuilder) { - String zkServersForWriter = DLUtils.getZKServersFromDLUri(uri); - String zkServersForReader; - try { - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.writerZKC, uri); - zkServersForReader = bkdlConfig.getDlZkServersForReader(); - } catch (IOException e) { - LOG.warn("Error on resolving dl metadata bindings for {} : ", uri, e); - zkServersForReader = zkServersForWriter; - } - if (zkServersForReader.equals(zkServersForWriter)) { - LOG.info("Used same zookeeper servers '{}' for both writers and readers for {}.", - zkServersForWriter, name); - this.readerZKCBuilder = this.writerZKCBuilder; - this.ownReaderZKC = false; - } else { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); - } - this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder() - .name(String.format("dlzk:%s:dlm_reader_shared", name)) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkServers(zkServersForReader) - .retryPolicy(retryPolicy) - .zkAclId(conf.getZkAclId()) - .statsLogger(statsLogger.scope("dlzk_dlm_reader_shared")); - this.ownReaderZKC = true; - } - } else { - this.readerZKCBuilder = readerZKCBuilder; - this.ownReaderZKC = false; - } - this.readerZKC = this.readerZKCBuilder.build(); - } - - /** - * Get the name of the stream managed by this log manager - * - * @return streamName - */ - @Override - public String getStreamName() { - return name; - } - - /** - * Creates or update the metadata stored at the node associated with the - * name and URI - * @param metadata opaque metadata to be stored for the node - * @throws IOException - */ - @Override - public void createOrUpdateMetadata(byte[] metadata) throws IOException { - checkClosedOrInError("createOrUpdateMetadata"); - - String zkPath = getZKPath(); - LOG.debug("Setting application specific metadata on {}", zkPath); - try { - Stat currentStat = writerZKC.get().exists(zkPath, false); - if (currentStat == null) { - if (metadata.length > 0) { - Utils.zkCreateFullPathOptimistic(writerZKC, - zkPath, - metadata, - writerZKC.getDefaultACL(), - CreateMode.PERSISTENT); - } - } else { - writerZKC.get().setData(zkPath, metadata, currentStat.getVersion()); - } - } catch (InterruptedException ie) { - throw new DLInterruptedException("Interrupted on creating or updating container metadata", ie); - } catch (Exception exc) { - throw new IOException("Exception creating or updating container metadata", exc); - } - } - - /** - * Delete the metadata stored at the associated node. This only deletes the metadata - * and not the node itself - * @throws IOException - */ - @Override - public void deleteMetadata() throws IOException { - checkClosedOrInError("createOrUpdateMetadata"); - createOrUpdateMetadata(null); - } - - /** - * Retrieve the metadata stored at the node - * @return byte array containing the metadata - * @throws IOException - */ - @Override - public byte[] getMetadata() throws IOException { - checkClosedOrInError("createOrUpdateMetadata"); - String zkPath = getZKPath(); - LOG.debug("Getting application specific metadata from {}", zkPath); - try { - Stat currentStat = readerZKC.get().exists(zkPath, false); - if (currentStat == null) { - return null; - } else { - return readerZKC.get().getData(zkPath, false, currentStat); - } - } catch (InterruptedException ie) { - throw new DLInterruptedException("Error reading the max tx id from zk", ie); - } catch (Exception e) { - throw new IOException("Error reading the max tx id from zk", e); - } - } - - /** - * Close the metadata accessor, freeing any resources it may hold. - * @return future represents the close result. - */ - @Override - public Future<Void> asyncClose() { - Promise<Void> closeFuture; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closeFuture = closePromise = new Promise<Void>(); - } - // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests - // the managers created by the namespace - whose zkc will be closed by namespace - try { - if (ownWriterZKC) { - writerZKC.close(); - } - if (ownReaderZKC) { - readerZKC.close(); - } - } catch (Exception e) { - LOG.warn("Exception while closing distributed log manager", e); - } - FutureUtils.setValue(closeFuture, null); - return closeFuture; - } - - @Override - public void close() throws IOException { - FutureUtils.result(asyncClose()); - } - - public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException { - if (null != closePromise) { - throw new AlreadyClosedException("Executing " + operation + " on already closed ZKMetadataAccessor"); - } - } - - protected String getZKPath() { - return String.format("%s/%s", uri.getPath(), name); - } - - @VisibleForTesting - protected ZooKeeperClient getReaderZKC() { - return readerZKC; - } - - @VisibleForTesting - protected ZooKeeperClient getWriterZKC() { - return writerZKC; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java index 90807b0..15f1805 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java @@ -20,7 +20,7 @@ package com.twitter.distributedlog; import com.google.common.base.Preconditions; import com.twitter.distributedlog.ZooKeeperClient.Credentials; import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials; -import com.twitter.distributedlog.util.DLUtils; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.zookeeper.RetryPolicy; @@ -139,7 +139,7 @@ public class ZooKeeperClientBuilder { * @return builder. */ public synchronized ZooKeeperClientBuilder uri(URI uri) { - this.zkServers = DLUtils.getZKServersFromDLUri(uri); + this.zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); return this; }