http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java deleted file mode 100644 index 5921233..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKNamespaceDriver.java +++ /dev/null @@ -1,631 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.BookKeeperClientBuilder; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.MetadataAccessor; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.acl.DefaultAccessControlManager; -import com.twitter.distributedlog.impl.acl.ZKAccessControlManager; -import com.twitter.distributedlog.bk.LedgerAllocator; -import com.twitter.distributedlog.bk.LedgerAllocatorUtils; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; -import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore; -import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; -import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; -import com.twitter.distributedlog.impl.subscription.ZKSubscriptionsStore; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.LogMetadataForReader; -import com.twitter.distributedlog.metadata.LogMetadataStore; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.namespace.NamespaceDriver; -import com.twitter.distributedlog.namespace.NamespaceDriverManager; -import com.twitter.distributedlog.subscription.SubscriptionsStore; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.Utils; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.Stat; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.twitter.distributedlog.util.DLUtils.isReservedStreamName; -import static com.twitter.distributedlog.util.DLUtils.validateName; - -/** - * Manager for ZooKeeper/BookKeeper based namespace - */ -public class BKNamespaceDriver implements NamespaceDriver { - - private static Logger LOG = LoggerFactory.getLogger(BKNamespaceDriver.class); - - // register itself - static { - NamespaceDriverManager.registerDriver(DistributedLogConstants.BACKEND_BK, BKNamespaceDriver.class); - } - - /** - * Extract zk servers fro dl <i>namespace</i>. - * - * @param uri dl namespace - * @return zk servers - */ - public static String getZKServersFromDLUri(URI uri) { - return uri.getAuthority().replace(";", ","); - } - - // resources (passed from initialization) - private DistributedLogConfiguration conf; - private DynamicDistributedLogConfiguration dynConf; - private URI namespace; - private OrderedScheduler scheduler; - private FeatureProvider featureProvider; - private AsyncFailureInjector failureInjector; - private StatsLogger statsLogger; - private StatsLogger perLogStatsLogger; - private String clientId; - private int regionId; - - // - // resources (created internally and initialized at #initialize()) - // - - // namespace binding - private BKDLConfig bkdlConfig; - - // zookeeper clients - // NOTE: The actual zookeeper client is initialized lazily when it is referenced by - // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to - // keep builders and their client wrappers here, as they will be used when - // instantiating readers or writers. - private ZooKeeperClientBuilder sharedWriterZKCBuilder; - private ZooKeeperClient writerZKC; - private ZooKeeperClientBuilder sharedReaderZKCBuilder; - private ZooKeeperClient readerZKC; - // NOTE: The actual bookkeeper client is initialized lazily when it is referenced by - // {@link com.twitter.distributedlog.BookKeeperClient#get()}. So it is safe to - // keep builders and their client wrappers here, as they will be used when - // instantiating readers or writers. - private ClientSocketChannelFactory channelFactory; - private HashedWheelTimer requestTimer; - private BookKeeperClientBuilder sharedWriterBKCBuilder; - private BookKeeperClient writerBKC; - private BookKeeperClientBuilder sharedReaderBKCBuilder; - private BookKeeperClient readerBKC; - - // log stream metadata store - private LogMetadataStore metadataStore; - private LogStreamMetadataStore writerStreamMetadataStore; - private LogStreamMetadataStore readerStreamMetadataStore; - - // - // resources (lazily initialized) - // - - // ledger allocator - private LedgerAllocator allocator; - - // log segment entry stores - private LogSegmentEntryStore writerEntryStore; - private LogSegmentEntryStore readerEntryStore; - - // access control manager - private AccessControlManager accessControlManager; - - // - // states - // - protected boolean initialized = false; - protected AtomicBoolean closed = new AtomicBoolean(false); - - /** - * Public constructor for reflection. - */ - public BKNamespaceDriver() { - } - - @Override - public synchronized NamespaceDriver initialize(DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - URI namespace, - OrderedScheduler scheduler, - FeatureProvider featureProvider, - AsyncFailureInjector failureInjector, - StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - String clientId, - int regionId) throws IOException { - if (initialized) { - return this; - } - // validate the namespace - if ((null == namespace) || (null == namespace.getAuthority()) || (null == namespace.getPath())) { - throw new IOException("Incorrect distributedlog namespace : " + namespace); - } - - // initialize the resources - this.conf = conf; - this.dynConf = dynConf; - this.namespace = namespace; - this.scheduler = scheduler; - this.featureProvider = featureProvider; - this.failureInjector = failureInjector; - this.statsLogger = statsLogger; - this.perLogStatsLogger = perLogStatsLogger; - this.clientId = clientId; - this.regionId = regionId; - - // initialize the zookeeper clients - initializeZooKeeperClients(); - - // initialize the bookkeeper clients - initializeBookKeeperClients(); - - // propagate bkdlConfig to configuration - BKDLConfig.propagateConfiguration(bkdlConfig, conf); - - // initialize the log metadata & stream metadata store - initializeLogStreamMetadataStores(); - - // initialize other resources - initializeOtherResources(); - - initialized = true; - - LOG.info("Initialized BK namespace driver: clientId = {}, regionId = {}, federated = {}.", - new Object[]{clientId, regionId, bkdlConfig.isFederatedNamespace()}); - return this; - } - - private void initializeZooKeeperClients() throws IOException { - // Build the namespace zookeeper client - this.sharedWriterZKCBuilder = createZKClientBuilder( - String.format("dlzk:%s:factory_writer_shared", namespace), - conf, - getZKServersFromDLUri(namespace), - statsLogger.scope("dlzk_factory_writer_shared")); - this.writerZKC = sharedWriterZKCBuilder.build(); - - // Resolve namespace binding - this.bkdlConfig = BKDLConfig.resolveDLConfig(writerZKC, namespace); - - // Build zookeeper client for readers - if (bkdlConfig.getDlZkServersForWriter().equals(bkdlConfig.getDlZkServersForReader())) { - this.sharedReaderZKCBuilder = this.sharedWriterZKCBuilder; - } else { - this.sharedReaderZKCBuilder = createZKClientBuilder( - String.format("dlzk:%s:factory_reader_shared", namespace), - conf, - bkdlConfig.getDlZkServersForReader(), - statsLogger.scope("dlzk_factory_reader_shared")); - } - this.readerZKC = this.sharedReaderZKCBuilder.build(); - } - - private synchronized BKDLConfig getBkdlConfig() { - return bkdlConfig; - } - - private void initializeBookKeeperClients() throws IOException { - this.channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()), - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()), - conf.getBKClientNumberIOThreads()); - this.requestTimer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(), - conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, - conf.getTimeoutTimerNumTicks()); - // Build bookkeeper client for writers - this.sharedWriterBKCBuilder = createBKCBuilder( - String.format("bk:%s:factory_writer_shared", namespace), - conf, - bkdlConfig.getBkZkServersForWriter(), - bkdlConfig.getBkLedgersPath(), - channelFactory, - requestTimer, - Optional.of(featureProvider.scope("bkc")), - statsLogger); - this.writerBKC = this.sharedWriterBKCBuilder.build(); - - // Build bookkeeper client for readers - if (bkdlConfig.getBkZkServersForWriter().equals(bkdlConfig.getBkZkServersForReader())) { - this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder; - } else { - this.sharedReaderBKCBuilder = createBKCBuilder( - String.format("bk:%s:factory_reader_shared", namespace), - conf, - bkdlConfig.getBkZkServersForReader(), - bkdlConfig.getBkLedgersPath(), - channelFactory, - requestTimer, - Optional.<FeatureProvider>absent(), - statsLogger); - } - this.readerBKC = this.sharedReaderBKCBuilder.build(); - } - - private void initializeLogStreamMetadataStores() throws IOException { - // log metadata store - if (bkdlConfig.isFederatedNamespace() || conf.isFederatedNamespaceEnabled()) { - this.metadataStore = new FederatedZKLogMetadataStore(conf, namespace, readerZKC, scheduler); - } else { - this.metadataStore = new ZKLogMetadataStore(conf, namespace, readerZKC, scheduler); - } - - // create log stream metadata store - this.writerStreamMetadataStore = - new ZKLogStreamMetadataStore( - clientId, - conf, - writerZKC, - scheduler, - statsLogger); - this.readerStreamMetadataStore = - new ZKLogStreamMetadataStore( - clientId, - conf, - readerZKC, - scheduler, - statsLogger); - } - - @VisibleForTesting - public static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration conf, URI uri) throws IOException { - String poolPath = conf.getLedgerAllocatorPoolPath(); - LOG.info("PoolPath is {}", poolPath); - if (null == poolPath || !poolPath.startsWith(".") || poolPath.endsWith("/")) { - LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath); - throw new IOException("Invalid ledger allocator pool path specified : " + poolPath); - } - String poolName = conf.getLedgerAllocatorPoolName(); - if (null == poolName) { - LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool."); - throw new IOException("No ledger allocator name specified when enabling ledger allocator pool."); - } - String rootPath = uri.getPath() + "/" + poolPath + "/" + poolName; - try { - PathUtils.validatePath(rootPath); - } catch (IllegalArgumentException iae) { - LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", poolPath); - throw new IOException("Invalid ledger allocator pool path specified : " + poolPath); - } - return rootPath; - } - - private void initializeOtherResources() throws IOException { - // Ledger allocator - if (conf.getEnableLedgerAllocatorPool()) { - String allocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(conf, namespace); - allocator = LedgerAllocatorUtils.createLedgerAllocatorPool( - allocatorPoolPath, - conf.getLedgerAllocatorPoolCoreSize(), - conf, - writerZKC, - writerBKC, - scheduler); - if (null != allocator) { - allocator.start(); - } - LOG.info("Created ledger allocator pool under {} with size {}.", allocatorPoolPath, conf.getLedgerAllocatorPoolCoreSize()); - } else { - allocator = null; - } - - } - - private void checkState() throws IOException { - if (closed.get()) { - LOG.error("BK namespace driver {} is already closed", namespace); - throw new AlreadyClosedException("BK namespace driver " + namespace + " is already closed"); - } - } - - @Override - public void close() throws IOException { - if (!closed.compareAndSet(false, true)) { - return; - } - doClose(); - } - - private void doClose() { - if (null != accessControlManager) { - accessControlManager.close(); - LOG.info("Access Control Manager Stopped."); - } - - // Close the allocator - if (null != allocator) { - Utils.closeQuietly(allocator); - LOG.info("Ledger Allocator stopped."); - } - - // Shutdown log segment metadata stores - Utils.close(writerStreamMetadataStore); - Utils.close(readerStreamMetadataStore); - - writerBKC.close(); - readerBKC.close(); - writerZKC.close(); - readerZKC.close(); - // release bookkeeper resources - channelFactory.releaseExternalResources(); - LOG.info("Release external resources used by channel factory."); - requestTimer.stop(); - LOG.info("Stopped request timer"); - } - - @Override - public URI getUri() { - return namespace; - } - - @Override - public String getScheme() { - return DistributedLogConstants.BACKEND_BK; - } - - @Override - public LogMetadataStore getLogMetadataStore() { - return metadataStore; - } - - @Override - public LogStreamMetadataStore getLogStreamMetadataStore(Role role) { - if (Role.WRITER == role) { - return writerStreamMetadataStore; - } else { - return readerStreamMetadataStore; - } - } - - @Override - public LogSegmentEntryStore getLogSegmentEntryStore(Role role) { - if (Role.WRITER == role) { - return getWriterEntryStore(); - } else { - return getReaderEntryStore(); - } - } - - private LogSegmentEntryStore getWriterEntryStore() { - if (null == writerEntryStore) { - writerEntryStore = new BKLogSegmentEntryStore( - conf, - dynConf, - writerZKC, - writerBKC, - scheduler, - allocator, - statsLogger, - failureInjector); - } - return writerEntryStore; - } - - private LogSegmentEntryStore getReaderEntryStore() { - if (null == readerEntryStore) { - readerEntryStore = new BKLogSegmentEntryStore( - conf, - dynConf, - writerZKC, - readerBKC, - scheduler, - allocator, - statsLogger, - failureInjector); - } - return readerEntryStore; - } - - @Override - public AccessControlManager getAccessControlManager() throws IOException { - if (null == accessControlManager) { - String aclRootPath = getBkdlConfig().getACLRootPath(); - // Build the access control manager - if (aclRootPath == null) { - accessControlManager = DefaultAccessControlManager.INSTANCE; - LOG.info("Created default access control manager for {}", namespace); - } else { - if (!isReservedStreamName(aclRootPath)) { - throw new IOException("Invalid Access Control List Root Path : " + aclRootPath); - } - String zkRootPath = namespace.getPath() + "/" + aclRootPath; - LOG.info("Creating zk based access control manager @ {} for {}", - zkRootPath, namespace); - accessControlManager = new ZKAccessControlManager(conf, readerZKC, - zkRootPath, scheduler); - LOG.info("Created zk based access control manager @ {} for {}", - zkRootPath, namespace); - } - } - return accessControlManager; - } - - @Override - public SubscriptionsStore getSubscriptionsStore(String streamName) { - return new ZKSubscriptionsStore( - writerZKC, - LogMetadataForReader.getSubscribersPath(namespace, streamName, conf.getUnpartitionedStreamName())); - } - - // - // Legacy Intefaces - // - - @Override - public MetadataAccessor getMetadataAccessor(String streamName) - throws InvalidStreamNameException, IOException { - if (getBkdlConfig().isFederatedNamespace()) { - throw new UnsupportedOperationException(); - } - checkState(); - validateName(streamName); - return new ZKMetadataAccessor( - streamName, - conf, - namespace, - sharedWriterZKCBuilder, - sharedReaderZKCBuilder, - statsLogger); - } - - public Map<String, byte[]> enumerateLogsWithMetadataInNamespace() - throws IOException, IllegalArgumentException { - String namespaceRootPath = namespace.getPath(); - HashMap<String, byte[]> result = new HashMap<String, byte[]>(); - ZooKeeperClient zkc = writerZKC; - try { - ZooKeeper zk = Utils.sync(zkc, namespaceRootPath); - Stat currentStat = zk.exists(namespaceRootPath, false); - if (currentStat == null) { - return result; - } - List<String> children = zk.getChildren(namespaceRootPath, false); - for(String child: children) { - if (isReservedStreamName(child)) { - continue; - } - String zkPath = String.format("%s/%s", namespaceRootPath, child); - currentStat = zk.exists(zkPath, false); - if (currentStat == null) { - result.put(child, new byte[0]); - } else { - result.put(child, zk.getData(zkPath, false, currentStat)); - } - } - } catch (InterruptedException ie) { - LOG.error("Interrupted while deleting " + namespaceRootPath, ie); - throw new IOException("Interrupted while reading " + namespaceRootPath, ie); - } catch (KeeperException ke) { - LOG.error("Error reading" + namespaceRootPath + "entry in zookeeper", ke); - throw new IOException("Error reading" + namespaceRootPath + "entry in zookeeper", ke); - } - return result; - } - - // - // Zk & Bk Utils - // - - public static ZooKeeperClientBuilder createZKClientBuilder(String zkcName, - DistributedLogConfiguration conf, - String zkServers, - StatsLogger statsLogger) { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); - } - ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder() - .name(zkcName) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkServers(zkServers) - .retryPolicy(retryPolicy) - .statsLogger(statsLogger) - .zkAclId(conf.getZkAclId()); - LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}," - + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers, conf.getZKNumRetries(), - conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() }); - return builder; - } - - private BookKeeperClientBuilder createBKCBuilder(String bkcName, - DistributedLogConfiguration conf, - String zkServers, - String ledgersPath, - ClientSocketChannelFactory channelFactory, - HashedWheelTimer requestTimer, - Optional<FeatureProvider> featureProviderOptional, - StatsLogger statsLogger) { - BookKeeperClientBuilder builder = BookKeeperClientBuilder.newBuilder() - .name(bkcName) - .dlConfig(conf) - .zkServers(zkServers) - .ledgersPath(ledgersPath) - .channelFactory(channelFactory) - .requestTimer(requestTimer) - .featureProvider(featureProviderOptional) - .statsLogger(statsLogger); - LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}", - new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads() }); - return builder; - } - - // - // Test Methods - // - - @VisibleForTesting - public ZooKeeperClient getWriterZKC() { - return writerZKC; - } - - @VisibleForTesting - public BookKeeperClient getReaderBKC() { - return readerBKC; - } - - @VisibleForTesting - public AsyncFailureInjector getFailureInjector() { - return this.failureInjector; - } - - @VisibleForTesting - public LogStreamMetadataStore getWriterStreamMetadataStore() { - return writerStreamMetadataStore; - } - - @VisibleForTesting - public LedgerAllocator getLedgerAllocator() { - return allocator; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java deleted file mode 100644 index 50b1405..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogMetadataStore.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.metadata.LogMetadataStore; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -import java.net.URI; -import java.util.Iterator; -import java.util.List; - -import static com.twitter.distributedlog.util.DLUtils.*; - -/** - * ZooKeeper based log metadata store - */ -public class ZKLogMetadataStore implements LogMetadataStore { - - final URI namespace; - final Optional<URI> nsOptional; - final ZooKeeperClient zkc; - final ZKNamespaceWatcher nsWatcher; - - public ZKLogMetadataStore( - DistributedLogConfiguration conf, - URI namespace, - ZooKeeperClient zkc, - OrderedScheduler scheduler) { - this.namespace = namespace; - this.nsOptional = Optional.of(this.namespace); - this.zkc = zkc; - this.nsWatcher = new ZKNamespaceWatcher(conf, namespace, zkc, scheduler); - } - - @Override - public Future<URI> createLog(String logName) { - return Future.value(namespace); - } - - @Override - public Future<Optional<URI>> getLogLocation(String logName) { - return Future.value(nsOptional); - } - - @Override - public Future<Iterator<String>> getLogs() { - final Promise<Iterator<String>> promise = new Promise<Iterator<String>>(); - final String nsRootPath = namespace.getPath(); - try { - final ZooKeeper zk = zkc.get(); - zk.sync(nsRootPath, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int syncRc, String syncPath, Object ctx) { - if (KeeperException.Code.OK.intValue() == syncRc) { - zk.getChildren(nsRootPath, false, new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - List<String> results = Lists.newArrayListWithExpectedSize(children.size()); - for (String child : children) { - if (!isReservedStreamName(child)) { - results.add(child); - } - } - promise.setValue(results.iterator()); - } else if (KeeperException.Code.NONODE.intValue() == rc) { - List<String> streams = Lists.newLinkedList(); - promise.setValue(streams.iterator()); - } else { - promise.setException(new ZKException("Error reading namespace " + nsRootPath, - KeeperException.Code.get(rc))); - } - } - }, null); - } else if (KeeperException.Code.NONODE.intValue() == syncRc) { - List<String> streams = Lists.newLinkedList(); - promise.setValue(streams.iterator()); - } else { - promise.setException(new ZKException("Error reading namespace " + nsRootPath, - KeeperException.Code.get(syncRc))); - } - } - }, null); - zkc.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - return promise; - } - - @Override - public void registerNamespaceListener(NamespaceListener listener) { - this.nsWatcher.registerListener(listener); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java deleted file mode 100644 index e55b2f2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentFilters.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * Filters based on current zookeeper log segments. - */ -public class ZKLogSegmentFilters { - - static final Logger LOG = LoggerFactory.getLogger(ZKLogSegmentFilters.class); - - /** - * Write handler filter should return all inprogress log segments and the last completed log segment. - * Because sequence id & ledger sequence number assignment rely on previous log segments. - */ - public static final LogSegmentFilter WRITE_HANDLE_FILTER = new LogSegmentFilter() { - @Override - public Collection<String> filter(Collection<String> fullList) { - List<String> result = new ArrayList<String>(fullList.size()); - String lastCompletedLogSegmentName = null; - long lastLogSegmentSequenceNumber = -1L; - for (String s : fullList) { - if (s.startsWith(DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX)) { - result.add(s); - } else if (s.startsWith(DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX)) { - String[] parts = s.split("_"); - try { - if (2 == parts.length) { - // name: logrecs_<logsegment_sequence_number> - long logSegmentSequenceNumber = Long.parseLong(parts[1]); - if (logSegmentSequenceNumber > lastLogSegmentSequenceNumber) { - lastLogSegmentSequenceNumber = logSegmentSequenceNumber; - lastCompletedLogSegmentName = s; - } - } else if (6 == parts.length) { - // name: logrecs_<start_tx_id>_<end_tx_id>_<logsegment_sequence_number>_<ledger_id>_<region_id> - long logSegmentSequenceNumber = Long.parseLong(parts[3]); - if (logSegmentSequenceNumber > lastLogSegmentSequenceNumber) { - lastLogSegmentSequenceNumber = logSegmentSequenceNumber; - lastCompletedLogSegmentName = s; - } - } else { - // name: logrecs_<start_tx_id>_<end_tx_id> or any unknown names - // we don't know the ledger sequence from the name, so add it to the list - result.add(s); - } - } catch (NumberFormatException nfe) { - LOG.warn("Unexpected sequence number in log segment {} :", s, nfe); - result.add(s); - } - } else { - LOG.error("Unknown log segment name : {}", s); - } - } - if (null != lastCompletedLogSegmentName) { - result.add(lastCompletedLogSegmentName); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Filtered log segments {} from {}.", result, fullList); - } - return result; - } - }; - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java deleted file mode 100644 index 2076dd8..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ /dev/null @@ -1,503 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import com.google.common.collect.ImmutableList; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.callback.LogSegmentNamesListener; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.metadata.LogMetadata; -import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.distributedlog.util.Transaction.OpListener; -import com.twitter.distributedlog.zk.DefaultZKOp; -import com.twitter.distributedlog.zk.ZKOp; -import com.twitter.distributedlog.zk.ZKTransaction; -import com.twitter.distributedlog.zk.ZKVersionedSetOp; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.AsyncCallback.Children2Callback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * ZooKeeper based log segment metadata store. - */ -public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watcher, Children2Callback { - - private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class); - - private static final List<String> EMPTY_LIST = ImmutableList.of(); - - private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> { - - private final String logSegmentsPath; - private final ZKLogSegmentMetadataStore store; - private int currentZKBackOffMs; - - ReadLogSegmentsTask(String logSegmentsPath, - ZKLogSegmentMetadataStore metadataStore) { - this.logSegmentsPath = logSegmentsPath; - this.store = metadataStore; - this.currentZKBackOffMs = store.minZKBackoffMs; - } - - @Override - public void onSuccess(final Versioned<List<String>> segments) { - // reset the back off after a successful operation - currentZKBackOffMs = store.minZKBackoffMs; - store.notifyLogSegmentsUpdated( - logSegmentsPath, - store.listeners.get(logSegmentsPath), - segments); - } - - @Override - public void onFailure(Throwable cause) { - int backoffMs; - if (cause instanceof LogNotFoundException) { - // the log segment has been deleted, remove all the registered listeners - store.notifyLogStreamDeleted(logSegmentsPath, - store.listeners.remove(logSegmentsPath)); - return; - } else { - backoffMs = currentZKBackOffMs; - currentZKBackOffMs = Math.min(2 * currentZKBackOffMs, store.maxZKBackoffMs); - } - store.scheduleTask(logSegmentsPath, this, backoffMs); - } - - @Override - public void run() { - if (null != store.listeners.get(logSegmentsPath)) { - store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this); - } else { - logger.debug("Log segments listener for {} has been removed.", logSegmentsPath); - } - } - } - - /** - * A log segment names listener that keeps tracking the version of list of log segments that it has been notified. - * It only notify the newer log segments. - */ - static class VersionedLogSegmentNamesListener { - - private final LogSegmentNamesListener listener; - private Versioned<List<String>> lastNotifiedLogSegments; - - VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) { - this.listener = listener; - this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST, Version.NEW); - } - - synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments) { - if (lastNotifiedLogSegments.getVersion() == Version.NEW || - lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) { - lastNotifiedLogSegments = logSegments; - listener.onSegmentsUpdated(logSegments); - } - } - - @Override - public int hashCode() { - return listener.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof VersionedLogSegmentNamesListener)) { - return false; - } - VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj; - return listener.equals(other.listener); - } - - @Override - public String toString() { - return listener.toString(); - } - } - - final DistributedLogConfiguration conf; - // settings - final int minZKBackoffMs; - final int maxZKBackoffMs; - final boolean skipMinVersionCheck; - - final ZooKeeperClient zkc; - // log segment listeners - final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners; - // scheduler - final OrderedScheduler scheduler; - final ReentrantReadWriteLock closeLock; - boolean closed = false; - - public ZKLogSegmentMetadataStore(DistributedLogConfiguration conf, - ZooKeeperClient zkc, - OrderedScheduler scheduler) { - this.conf = conf; - this.zkc = zkc; - this.listeners = - new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>(); - this.scheduler = scheduler; - this.closeLock = new ReentrantReadWriteLock(); - // settings - this.minZKBackoffMs = conf.getZKRetryBackoffStartMillis(); - this.maxZKBackoffMs = conf.getZKRetryBackoffMaxMillis(); - this.skipMinVersionCheck = conf.getDLLedgerMetadataSkipMinVersionCheck(); - } - - protected void scheduleTask(Object key, Runnable r, long delayMs) { - closeLock.readLock().lock(); - try { - if (closed) { - return; - } - scheduler.schedule(key, r, delayMs, TimeUnit.MILLISECONDS); - } finally { - closeLock.readLock().unlock(); - } - } - - protected void submitTask(Object key, Runnable r) { - closeLock.readLock().lock(); - try { - if (closed) { - return; - } - scheduler.submit(key, r); - } finally { - closeLock.readLock().unlock(); - } - } - - // max sequence number and max transaction id - - @Override - public void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn, - LogMetadata logMetadata, - Versioned<Long> lssn, - Transaction.OpListener<Version> listener) { - Version version = lssn.getVersion(); - assert(version instanceof ZkVersion); - ZkVersion zkVersion = (ZkVersion) version; - byte[] data = DLUtils.serializeLogSegmentSequenceNumber(lssn.getValue()); - Op setDataOp = Op.setData(logMetadata.getLogSegmentsPath(), data, zkVersion.getZnodeVersion()); - ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener); - txn.addOp(zkOp); - } - - @Override - public void storeMaxTxnId(Transaction<Object> txn, - LogMetadataForWriter logMetadata, - Versioned<Long> transactionId, - Transaction.OpListener<Version> listener) { - Version version = transactionId.getVersion(); - assert(version instanceof ZkVersion); - ZkVersion zkVersion = (ZkVersion) version; - byte[] data = DLUtils.serializeTransactionId(transactionId.getValue()); - Op setDataOp = Op.setData(logMetadata.getMaxTxIdPath(), data, zkVersion.getZnodeVersion()); - ZKOp zkOp = new ZKVersionedSetOp(setDataOp, listener); - txn.addOp(zkOp); - } - - // updates - - @Override - public Transaction<Object> transaction() { - return new ZKTransaction(zkc); - } - - @Override - public void createLogSegment(Transaction<Object> txn, - LogSegmentMetadata segment, - OpListener<Void> listener) { - byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8); - Op createOp = Op.create( - segment.getZkPath(), - finalisedData, - zkc.getDefaultACL(), - CreateMode.PERSISTENT); - txn.addOp(DefaultZKOp.of(createOp, listener)); - } - - @Override - public void deleteLogSegment(Transaction<Object> txn, - final LogSegmentMetadata segment, - final OpListener<Void> listener) { - Op deleteOp = Op.delete( - segment.getZkPath(), - -1); - logger.info("Delete segment : {}", segment); - txn.addOp(DefaultZKOp.of(deleteOp, new OpListener<Void>() { - @Override - public void onCommit(Void r) { - if (null != listener) { - listener.onCommit(r); - } - } - - @Override - public void onAbort(Throwable t) { - logger.info("Aborted transaction on deleting segment {}", segment); - KeeperException.Code kc; - if (t instanceof KeeperException) { - kc = ((KeeperException) t).code(); - } else if (t instanceof ZKException) { - kc = ((ZKException) t).getKeeperExceptionCode(); - } else { - abortListener(t); - return; - } - if (KeeperException.Code.NONODE == kc) { - abortListener(new LogSegmentNotFoundException(segment.getZkPath())); - return; - } - abortListener(t); - } - - private void abortListener(Throwable t) { - if (null != listener) { - listener.onAbort(t); - } - } - })); - } - - @Override - public void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment) { - byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8); - Op updateOp = Op.setData(segment.getZkPath(), finalisedData, -1); - txn.addOp(DefaultZKOp.of(updateOp, null)); - } - - // reads - - /** - * Process the watched events for registered listeners - */ - @Override - public void process(WatchedEvent event) { - if (Event.EventType.None == event.getType() - && Event.KeeperState.Expired == event.getState()) { - Set<String> keySet = new HashSet<String>(listeners.keySet()); - for (String logSegmentsPath : keySet) { - scheduleTask(logSegmentsPath, new ReadLogSegmentsTask(logSegmentsPath, this), 0L); - } - return; - } - String path = event.getPath(); - if (null == path) { - return; - } - switch (event.getType()) { - case NodeDeleted: - notifyLogStreamDeleted(path, listeners.remove(path)); - break; - case NodeChildrenChanged: - new ReadLogSegmentsTask(path, this).run(); - break; - default: - break; - } - } - - @Override - public Future<LogSegmentMetadata> getLogSegment(String logSegmentPath) { - return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck); - } - - Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) { - Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>(); - try { - zkc.get().getChildren(logSegmentsPath, watcher, this, result); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - result.setException(FutureUtils.zkException(e, logSegmentsPath)); - } catch (InterruptedException e) { - result.setException(FutureUtils.zkException(e, logSegmentsPath)); - } - return result; - } - - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx); - if (KeeperException.Code.OK.intValue() == rc) { - /** cversion: the number of changes to the children of this znode **/ - ZkVersion zkVersion = new ZkVersion(stat.getCversion()); - result.setValue(new Versioned(children, zkVersion)); - } else if (KeeperException.Code.NONODE.intValue() == rc) { - result.setException(new LogNotFoundException("Log " + path + " not found")); - } else { - result.setException(new ZKException("Failed to get log segments from " + path, - KeeperException.Code.get(rc))); - } - } - - @Override - public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, - LogSegmentNamesListener listener) { - Watcher zkWatcher; - if (null == listener) { - zkWatcher = null; - } else { - closeLock.readLock().lock(); - try { - if (closed) { - zkWatcher = null; - } else { - Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = - listeners.get(logSegmentsPath); - if (null == listenerSet) { - Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet = - new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>(); - Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet = - listeners.putIfAbsent(logSegmentsPath, newListenerSet); - if (null != oldListenerSet) { - listenerSet = oldListenerSet; - } else { - listenerSet = newListenerSet; - } - } - synchronized (listenerSet) { - listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener)); - if (!listeners.containsKey(logSegmentsPath)) { - // listener set has been removed, add it back - if (null != listeners.putIfAbsent(logSegmentsPath, listenerSet)) { - logger.debug("Listener set is already found for log segments path {}", logSegmentsPath); - } - } - } - zkWatcher = ZKLogSegmentMetadataStore.this; - } - } finally { - closeLock.readLock().unlock(); - } - } - Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher); - if (null != listener) { - getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this)); - } - return zkGetLogSegmentNames(logSegmentsPath, zkWatcher); - } - - @Override - public void unregisterLogSegmentListener(String logSegmentsPath, - LogSegmentNamesListener listener) { - closeLock.readLock().lock(); - try { - if (closed) { - return; - } - Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet = - listeners.get(logSegmentsPath); - if (null == listenerSet) { - return; - } - synchronized (listenerSet) { - listenerSet.remove(listener); - if (listenerSet.isEmpty()) { - listeners.remove(logSegmentsPath, listenerSet); - } - } - } finally { - closeLock.readLock().unlock(); - } - } - - @Override - public void close() throws IOException { - closeLock.writeLock().lock(); - try { - if (closed) { - return; - } - closed = true; - } finally { - closeLock.writeLock().unlock(); - } - } - - // Notifications - - void notifyLogStreamDeleted(String logSegmentsPath, - final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners) { - if (null == listeners) { - return; - } - this.submitTask(logSegmentsPath, new Runnable() { - @Override - public void run() { - for (LogSegmentNamesListener listener : listeners.keySet()) { - listener.onLogStreamDeleted(); - } - } - }); - - } - - void notifyLogSegmentsUpdated(String logSegmentsPath, - final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listeners, - final Versioned<List<String>> segments) { - if (null == listeners) { - return; - } - this.submitTask(logSegmentsPath, new Runnable() { - @Override - public void run() { - for (VersionedLogSegmentNamesListener listener : listeners.values()) { - listener.onSegmentsUpdated(segments); - } - } - }); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java deleted file mode 100644 index eeda804..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import java.io.IOException; -import java.net.URI; - -import com.google.common.annotations.VisibleForTesting; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.MetadataAccessor; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.twitter.distributedlog.impl.BKNamespaceDriver.getZKServersFromDLUri; - -public class ZKMetadataAccessor implements MetadataAccessor { - static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class); - protected final String name; - protected Promise<Void> closePromise; - protected final URI uri; - // zookeeper clients - // NOTE: The actual zookeeper client is initialized lazily when it is referenced by - // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to - // keep builders and their client wrappers here, as they will be used when - // instantiating readers or writers. - protected final ZooKeeperClientBuilder writerZKCBuilder; - protected final ZooKeeperClient writerZKC; - protected final boolean ownWriterZKC; - protected final ZooKeeperClientBuilder readerZKCBuilder; - protected final ZooKeeperClient readerZKC; - protected final boolean ownReaderZKC; - - ZKMetadataAccessor(String name, - DistributedLogConfiguration conf, - URI uri, - ZooKeeperClientBuilder writerZKCBuilder, - ZooKeeperClientBuilder readerZKCBuilder, - StatsLogger statsLogger) { - this.name = name; - this.uri = uri; - - if (null == writerZKCBuilder) { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); - } - this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder() - .name(String.format("dlzk:%s:dlm_writer_shared", name)) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkAclId(conf.getZkAclId()) - .uri(uri) - .retryPolicy(retryPolicy) - .statsLogger(statsLogger.scope("dlzk_dlm_writer_shared")); - this.ownWriterZKC = true; - } else { - this.writerZKCBuilder = writerZKCBuilder; - this.ownWriterZKC = false; - } - this.writerZKC = this.writerZKCBuilder.build(); - - if (null == readerZKCBuilder) { - String zkServersForWriter = getZKServersFromDLUri(uri); - String zkServersForReader; - try { - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.writerZKC, uri); - zkServersForReader = bkdlConfig.getDlZkServersForReader(); - } catch (IOException e) { - LOG.warn("Error on resolving dl metadata bindings for {} : ", uri, e); - zkServersForReader = zkServersForWriter; - } - if (zkServersForReader.equals(zkServersForWriter)) { - LOG.info("Used same zookeeper servers '{}' for both writers and readers for {}.", - zkServersForWriter, name); - this.readerZKCBuilder = this.writerZKCBuilder; - this.ownReaderZKC = false; - } else { - RetryPolicy retryPolicy = null; - if (conf.getZKNumRetries() > 0) { - retryPolicy = new BoundExponentialBackoffRetryPolicy( - conf.getZKRetryBackoffStartMillis(), - conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); - } - this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder() - .name(String.format("dlzk:%s:dlm_reader_shared", name)) - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkServers(zkServersForReader) - .retryPolicy(retryPolicy) - .zkAclId(conf.getZkAclId()) - .statsLogger(statsLogger.scope("dlzk_dlm_reader_shared")); - this.ownReaderZKC = true; - } - } else { - this.readerZKCBuilder = readerZKCBuilder; - this.ownReaderZKC = false; - } - this.readerZKC = this.readerZKCBuilder.build(); - } - - /** - * Get the name of the stream managed by this log manager - * - * @return streamName - */ - @Override - public String getStreamName() { - return name; - } - - /** - * Creates or update the metadata stored at the node associated with the - * name and URI - * @param metadata opaque metadata to be stored for the node - * @throws IOException - */ - @Override - public void createOrUpdateMetadata(byte[] metadata) throws IOException { - checkClosedOrInError("createOrUpdateMetadata"); - - String zkPath = getZKPath(); - LOG.debug("Setting application specific metadata on {}", zkPath); - try { - Stat currentStat = writerZKC.get().exists(zkPath, false); - if (currentStat == null) { - if (metadata.length > 0) { - Utils.zkCreateFullPathOptimistic(writerZKC, - zkPath, - metadata, - writerZKC.getDefaultACL(), - CreateMode.PERSISTENT); - } - } else { - writerZKC.get().setData(zkPath, metadata, currentStat.getVersion()); - } - } catch (InterruptedException ie) { - throw new DLInterruptedException("Interrupted on creating or updating container metadata", ie); - } catch (Exception exc) { - throw new IOException("Exception creating or updating container metadata", exc); - } - } - - /** - * Delete the metadata stored at the associated node. This only deletes the metadata - * and not the node itself - * @throws IOException - */ - @Override - public void deleteMetadata() throws IOException { - checkClosedOrInError("createOrUpdateMetadata"); - createOrUpdateMetadata(null); - } - - /** - * Retrieve the metadata stored at the node - * @return byte array containing the metadata - * @throws IOException - */ - @Override - public byte[] getMetadata() throws IOException { - checkClosedOrInError("createOrUpdateMetadata"); - String zkPath = getZKPath(); - LOG.debug("Getting application specific metadata from {}", zkPath); - try { - Stat currentStat = readerZKC.get().exists(zkPath, false); - if (currentStat == null) { - return null; - } else { - return readerZKC.get().getData(zkPath, false, currentStat); - } - } catch (InterruptedException ie) { - throw new DLInterruptedException("Error reading the max tx id from zk", ie); - } catch (Exception e) { - throw new IOException("Error reading the max tx id from zk", e); - } - } - - /** - * Close the metadata accessor, freeing any resources it may hold. - * @return future represents the close result. - */ - @Override - public Future<Void> asyncClose() { - Promise<Void> closeFuture; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closeFuture = closePromise = new Promise<Void>(); - } - // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests - // the managers created by the namespace - whose zkc will be closed by namespace - try { - if (ownWriterZKC) { - writerZKC.close(); - } - if (ownReaderZKC) { - readerZKC.close(); - } - } catch (Exception e) { - LOG.warn("Exception while closing distributed log manager", e); - } - FutureUtils.setValue(closeFuture, null); - return closeFuture; - } - - @Override - public void close() throws IOException { - FutureUtils.result(asyncClose()); - } - - public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException { - if (null != closePromise) { - throw new AlreadyClosedException("Executing " + operation + " on already closed ZKMetadataAccessor"); - } - } - - protected String getZKPath() { - return String.format("%s/%s", uri.getPath(), name); - } - - @VisibleForTesting - protected ZooKeeperClient getReaderZKC() { - return readerZKC; - } - - @VisibleForTesting - protected ZooKeeperClient getWriterZKC() { - return writerZKC; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java deleted file mode 100644 index 06bc8fb..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.distributedlog.namespace.NamespaceWatcher; -import com.twitter.distributedlog.util.OrderedScheduler; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.twitter.distributedlog.util.DLUtils.*; - -/** - * Watcher on watching a given namespace - */ -public class ZKNamespaceWatcher extends NamespaceWatcher - implements Runnable, Watcher, AsyncCallback.Children2Callback { - - static final Logger logger = LoggerFactory.getLogger(ZKNamespaceWatcher.class); - - private final DistributedLogConfiguration conf; - private final URI uri; - private final ZooKeeperClient zkc; - private final OrderedScheduler scheduler; - private final AtomicBoolean namespaceWatcherSet = new AtomicBoolean(false); - - public ZKNamespaceWatcher(DistributedLogConfiguration conf, - URI uri, - ZooKeeperClient zkc, - OrderedScheduler scheduler) { - this.conf = conf; - this.uri = uri; - this.zkc = zkc; - this.scheduler = scheduler; - } - - private void scheduleTask(Runnable r, long ms) { - try { - scheduler.schedule(r, ms, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ree) { - logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{r, ms, ree}); - } - } - - @Override - public void run() { - try { - doWatchNamespaceChanges(); - } catch (Exception e) { - logger.error("Encountered unknown exception on watching namespace {} ", uri, e); - } - } - - public void watchNamespaceChanges() { - if (!namespaceWatcherSet.compareAndSet(false, true)) { - return; - } - doWatchNamespaceChanges(); - } - - private void doWatchNamespaceChanges() { - try { - zkc.get().getChildren(uri.getPath(), this, this, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - scheduleTask(this, conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted on watching namespace changes for {} : ", uri, e); - scheduleTask(this, conf.getZKSessionTimeoutMilliseconds()); - } - } - - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - logger.info("Received updated logs under {} : {}", uri, children); - List<String> result = new ArrayList<String>(children.size()); - for (String s : children) { - if (isReservedStreamName(s)) { - continue; - } - result.add(s); - } - for (NamespaceListener listener : listeners) { - listener.onStreamsChanged(result.iterator()); - } - } else { - scheduleTask(this, conf.getZKSessionTimeoutMilliseconds()); - } - } - - @Override - public void process(WatchedEvent event) { - if (event.getType() == Event.EventType.None) { - if (event.getState() == Event.KeeperState.Expired) { - scheduleTask(this, conf.getZKSessionTimeoutMilliseconds()); - } - return; - } - if (event.getType() == Event.EventType.NodeChildrenChanged) { - // watch namespace changes again. - doWatchNamespaceChanges(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java deleted file mode 100644 index 8126723..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.acl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.apache.thrift.transport.TMemoryBuffer; -import org.apache.thrift.transport.TMemoryInputTransport; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; - -import static com.google.common.base.Charsets.UTF_8; - -public class ZKAccessControl { - - private static final int BUFFER_SIZE = 4096; - - public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new AccessControlEntry(); - - public static class CorruptedAccessControlException extends IOException { - - private static final long serialVersionUID = 5391285182476211603L; - - public CorruptedAccessControlException(String zkPath, Throwable t) { - super("Access Control @ " + zkPath + " is corrupted.", t); - } - } - - protected final AccessControlEntry accessControlEntry; - protected final String zkPath; - private int zkVersion; - - public ZKAccessControl(AccessControlEntry ace, String zkPath) { - this(ace, zkPath, -1); - } - - private ZKAccessControl(AccessControlEntry ace, String zkPath, int zkVersion) { - this.accessControlEntry = ace; - this.zkPath = zkPath; - this.zkVersion = zkVersion; - } - - @Override - public int hashCode() { - return Objects.hashCode(zkPath, accessControlEntry); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof ZKAccessControl)) { - return false; - } - ZKAccessControl other = (ZKAccessControl) obj; - return Objects.equal(zkPath, other.zkPath) && - Objects.equal(accessControlEntry, other.accessControlEntry); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("entry(path=").append(zkPath).append(", acl=") - .append(accessControlEntry).append(")"); - return sb.toString(); - } - - @VisibleForTesting - public String getZKPath() { - return zkPath; - } - - @VisibleForTesting - public AccessControlEntry getAccessControlEntry() { - return accessControlEntry; - } - - public Future<ZKAccessControl> create(ZooKeeperClient zkc) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - try { - zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT, - new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (KeeperException.Code.OK.intValue() == rc) { - ZKAccessControl.this.zkVersion = 0; - promise.setValue(ZKAccessControl.this); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } catch (IOException e) { - promise.setException(e); - } - return promise; - } - - public Future<ZKAccessControl> update(ZooKeeperClient zkc) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - try { - zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - ZKAccessControl.this.zkVersion = stat.getVersion(); - promise.setValue(ZKAccessControl.this); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } catch (IOException e) { - promise.setException(e); - } - return promise; - } - - public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - - try { - zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - try { - AccessControlEntry ace = deserialize(zkPath, data); - promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion())); - } catch (IOException ioe) { - promise.setException(ioe); - } - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - return promise; - } - - public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) { - final Promise<Void> promise = new Promise<Void>(); - - try { - zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (KeeperException.Code.OK.intValue() == rc || - KeeperException.Code.NONODE.intValue() == rc) { - promise.setValue(null); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - return promise; - } - - static byte[] serialize(AccessControlEntry ace) throws IOException { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - ace.write(protocol); - transport.flush(); - return transport.toString(UTF_8.name()).getBytes(UTF_8); - } catch (TException e) { - throw new IOException("Failed to serialize access control entry : ", e); - } catch (UnsupportedEncodingException uee) { - throw new IOException("Failed to serialize acesss control entry : ", uee); - } - } - - static AccessControlEntry deserialize(String zkPath, byte[] data) throws IOException { - if (data.length == 0) { - return DEFAULT_ACCESS_CONTROL_ENTRY; - } - - AccessControlEntry ace = new AccessControlEntry(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - ace.read(protocol); - } catch (TException e) { - throw new CorruptedAccessControlException(zkPath, e); - } - return ace; - } - -}