http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java index 850f9c8..bcf8129 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java @@ -19,8 +19,14 @@ package com.twitter.distributedlog.logsegment; import com.google.common.annotations.Beta; import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; +import com.twitter.distributedlog.util.Allocator; +import com.twitter.distributedlog.util.Transaction; import com.twitter.util.Future; +import java.io.IOException; + /** * Log Segment Store to read log segments */ @@ -36,12 +42,14 @@ public interface LogSegmentEntryStore { Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment); /** - * Open the writer for writing data to the log <i>segment</i>. + * Create a new log segment allocator for allocating log segment entry writers. * - * @param segment the log <i>segment</i> to write data to - * @return future represent the opened writer + * @param metadata the metadata for the log stream + * @return future represent the log segment allocator */ - Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment); + Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator( + LogMetadataForWriter metadata, + DynamicDistributedLogConfiguration dynConf) throws IOException; /** * Open the reader for reading data to the log <i>segment</i>.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java deleted file mode 100644 index ac36ef2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/BKDLConfig.java +++ /dev/null @@ -1,399 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.thrift.BKDLConfigFormat; -import com.twitter.distributedlog.util.DLUtils; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.apache.thrift.transport.TMemoryBuffer; -import org.apache.thrift.transport.TMemoryInputTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * Configurations for BookKeeper based DL. - */ -public class BKDLConfig implements DLConfig { - - private static final Logger LOG = LoggerFactory.getLogger(BKDLConfig.class); - - private static final int BUFFER_SIZE = 4096; - private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs = - new ConcurrentHashMap<URI, DLConfig>(); - - public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) { - dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID()); - dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo()); - if (bkdlConfig.isFederatedNamespace()) { - dlConf.setCreateStreamIfNotExists(false); - LOG.info("Disabled createIfNotExists for federated namespace."); - } - LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," + - " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.", - new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(), - dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(), - bkdlConfig.isFederatedNamespace() }); - } - - public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) throws IOException { - DLConfig dlConfig = cachedDLConfigs.get(uri); - if (dlConfig == null) { - dlConfig = (new ZkMetadataResolver(zkc).resolve(uri)).getDLConfig(); - DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig); - if (null != oldDLConfig) { - dlConfig = oldDLConfig; - } - } - assert (dlConfig instanceof BKDLConfig); - return (BKDLConfig)dlConfig; - } - - @VisibleForTesting - public static void clearCachedDLConfigs() { - cachedDLConfigs.clear(); - } - - private String bkZkServersForWriter; - private String bkZkServersForReader; - private String bkLedgersPath; - private boolean sanityCheckTxnID = true; - private boolean encodeRegionID = false; - private String dlZkServersForWriter; - private String dlZkServersForReader; - private String aclRootPath; - private Long firstLogSegmentSeqNo; - private boolean isFederatedNamespace = false; - - /** - * Construct a empty config with given <i>uri</i>. - */ - BKDLConfig(URI uri) { - this(DLUtils.getZKServersFromDLUri(uri), - DLUtils.getZKServersFromDLUri(uri), - null, null, null); - } - - /** - * The caller should make sure both dl and bk use same zookeeper server. - * - * @param zkServers - * zk servers used for both dl and bk. - * @param ledgersPath - * ledgers path. - */ - @VisibleForTesting - public BKDLConfig(String zkServers, String ledgersPath) { - this(zkServers, zkServers, zkServers, zkServers, ledgersPath); - } - - public BKDLConfig(String dlZkServersForWriter, - String dlZkServersForReader, - String bkZkServersForWriter, - String bkZkServersForReader, - String bkLedgersPath) { - this.dlZkServersForWriter = dlZkServersForWriter; - this.dlZkServersForReader = dlZkServersForReader; - this.bkZkServersForWriter = bkZkServersForWriter; - this.bkZkServersForReader = bkZkServersForReader; - this.bkLedgersPath = bkLedgersPath; - } - - /** - * @return zk servers used for bk for writers - */ - public String getBkZkServersForWriter() { - return bkZkServersForWriter; - } - - /** - * @return zk servers used for bk for readers - */ - public String getBkZkServersForReader() { - return bkZkServersForReader; - } - - /** - * @return zk servers used for dl for writers - */ - public String getDlZkServersForWriter() { - return dlZkServersForWriter; - } - - /** - * @return zk servers used for dl for readers - */ - public String getDlZkServersForReader() { - return dlZkServersForReader; - } - - /** - * @return ledgers path for bk - */ - public String getBkLedgersPath() { - return bkLedgersPath; - } - - /** - * Enable/Disable sanity check txn id. - * - * @param enabled - * flag to enable/disable sanity check txn id. - * @return bk dl config. - */ - public BKDLConfig setSanityCheckTxnID(boolean enabled) { - this.sanityCheckTxnID = enabled; - return this; - } - - /** - * @return flag to sanity check highest txn id. - */ - public boolean getSanityCheckTxnID() { - return sanityCheckTxnID; - } - - /** - * Enable/Disable encode region id. - * - * @param enabled - * flag to enable/disable encoding region id. - * @return bk dl config - */ - public BKDLConfig setEncodeRegionID(boolean enabled) { - this.encodeRegionID = enabled; - return this; - } - - /** - * @return flag to encode region id. - */ - public boolean getEncodeRegionID() { - return encodeRegionID; - } - - /** - * Set the root path of zk based ACL manager. - * - * @param aclRootPath - * root path of zk based ACL manager. - * @return bk dl config - */ - public BKDLConfig setACLRootPath(String aclRootPath) { - this.aclRootPath = aclRootPath; - return this; - } - - /** - * Get the root path of zk based ACL manager. - * - * @return root path of zk based ACL manager. - */ - public String getACLRootPath() { - return aclRootPath; - } - - /** - * Set the value at which ledger sequence number should start for streams that are being - * upgraded and did not have ledger sequence number to start with or for newly created - * streams - * - * @param firstLogSegmentSeqNo first ledger sequence number - * @return bk dl config - */ - public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) { - this.firstLogSegmentSeqNo = firstLogSegmentSeqNo; - return this; - } - - /** - * Get the value at which ledger sequence number should start for streams that are being - * upgraded and did not have ledger sequence number to start with or for newly created - * streams - * - * @return first ledger sequence number - */ - public Long getFirstLogSegmentSeqNo() { - if (null == firstLogSegmentSeqNo) { - return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO; - } - return firstLogSegmentSeqNo; - } - - /** - * Set the namespace to federated <i>isFederatedNamespace</i>. - * - * @param isFederatedNamespace - * is the namespace federated? - * @return bk dl config - */ - public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) { - this.isFederatedNamespace = isFederatedNamespace; - return this; - } - - /** - * Whether the namespace is federated namespace - * - * @return true if the namespace is a federated namespace. otherwise false. - */ - public boolean isFederatedNamespace() { - return this.isFederatedNamespace; - } - - @Override - public int hashCode() { - return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader, - dlZkServersForWriter, dlZkServersForReader, - bkLedgersPath); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof BKDLConfig)) { - return false; - } - BKDLConfig another = (BKDLConfig) o; - return Objects.equal(bkZkServersForWriter, another.bkZkServersForWriter) && - Objects.equal(bkZkServersForReader, another.bkZkServersForReader) && - Objects.equal(dlZkServersForWriter, another.dlZkServersForWriter) && - Objects.equal(dlZkServersForReader, another.dlZkServersForReader) && - Objects.equal(bkLedgersPath, another.bkLedgersPath) && - sanityCheckTxnID == another.sanityCheckTxnID && - encodeRegionID == another.encodeRegionID && - Objects.equal(aclRootPath, another.aclRootPath) && - Objects.equal(firstLogSegmentSeqNo, another.firstLogSegmentSeqNo) && - Objects.equal(isFederatedNamespace, another.isFederatedNamespace); - - } - - @Override - public String toString() { - return serialize(); - } - - @Override - public String serialize() { - BKDLConfigFormat configFormat = new BKDLConfigFormat(); - if (null != bkZkServersForWriter) { - configFormat.setBkZkServers(bkZkServersForWriter); - } - if (null != bkZkServersForReader) { - configFormat.setBkZkServersForReader(bkZkServersForReader); - } - if (null != dlZkServersForWriter) { - configFormat.setDlZkServersForWriter(dlZkServersForWriter); - } - if (null != dlZkServersForReader) { - configFormat.setDlZkServersForReader(dlZkServersForReader); - } - if (null != bkLedgersPath) { - configFormat.setBkLedgersPath(bkLedgersPath); - } - configFormat.setSanityCheckTxnID(sanityCheckTxnID); - configFormat.setEncodeRegionID(encodeRegionID); - if (null != aclRootPath) { - configFormat.setAclRootPath(aclRootPath); - } - if (null != firstLogSegmentSeqNo) { - configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo); - } - if (isFederatedNamespace) { - configFormat.setFederatedNamespace(true); - } - return serialize(configFormat); - } - - String serialize(BKDLConfigFormat configFormat) { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - configFormat.write(protocol); - transport.flush(); - return transport.toString("UTF-8"); - } catch (TException e) { - throw new RuntimeException("Failed to serialize BKDLConfig : ", e); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("Failed to serialize BKDLConfig : ", e); - } - } - - @Override - public void deserialize(byte[] data) throws IOException { - BKDLConfigFormat configFormat = new BKDLConfigFormat(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - configFormat.read(protocol); - } catch (TException e) { - throw new IOException("Failed to deserialize data '" + - new String(data, UTF_8) + "' : ", e); - } - // bookkeeper cluster settings - if (configFormat.isSetBkZkServers()) { - bkZkServersForWriter = configFormat.getBkZkServers(); - } - if (configFormat.isSetBkZkServersForReader()) { - bkZkServersForReader = configFormat.getBkZkServersForReader(); - } else { - bkZkServersForReader = bkZkServersForWriter; - } - if (configFormat.isSetBkLedgersPath()) { - bkLedgersPath = configFormat.getBkLedgersPath(); - } - // dl zookeeper cluster settings - if (configFormat.isSetDlZkServersForWriter()) { - dlZkServersForWriter = configFormat.getDlZkServersForWriter(); - } - if (configFormat.isSetDlZkServersForReader()) { - dlZkServersForReader = configFormat.getDlZkServersForReader(); - } else { - dlZkServersForReader = dlZkServersForWriter; - } - // dl settings - sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || configFormat.isSanityCheckTxnID(); - encodeRegionID = configFormat.isSetEncodeRegionID() && configFormat.isEncodeRegionID(); - if (configFormat.isSetAclRootPath()) { - aclRootPath = configFormat.getAclRootPath(); - } - - if (configFormat.isSetFirstLogSegmentSeqNo()) { - firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo(); - } - isFederatedNamespace = configFormat.isSetFederatedNamespace() && configFormat.isFederatedNamespace(); - - // Validate the settings - if (null == bkZkServersForWriter || null == bkZkServersForReader || null == bkLedgersPath || - null == dlZkServersForWriter || null == dlZkServersForReader) { - throw new IOException("Missing zk/bk settings in BKDL Config : " + new String(data, UTF_8)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java index e0331c6..c0b5fb7 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java @@ -18,6 +18,7 @@ package com.twitter.distributedlog.metadata; import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.util.Utils; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClientBuilder; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java deleted file mode 100644 index 303fbe6..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/ZkMetadataResolver.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.twitter.distributedlog.ZooKeeperClient; -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.net.URI; - -public class ZkMetadataResolver implements MetadataResolver { - - private final ZooKeeperClient zkc; - - public ZkMetadataResolver(ZooKeeperClient zkc) { - this.zkc = zkc; - } - - @Override - public DLMetadata resolve(URI uri) throws IOException { - String dlPath = uri.getPath(); - PathUtils.validatePath(dlPath); - // Normal case the dl metadata is stored in the last segment - // so lookup last segment first. - String[] parts = StringUtils.split(dlPath, '/'); - if (null == parts || 0 == parts.length) { - throw new IOException("Invalid dlPath to resolve dl metadata : " + dlPath); - } - for (int i = parts.length; i >= 0; i--) { - String pathToResolve = String.format("/%s", StringUtils.join(parts, '/', 0, i)); - byte[] data; - try { - data = zkc.get().getData(pathToResolve, false, new Stat()); - } catch (KeeperException.NoNodeException nne) { - continue; - } catch (KeeperException ke) { - throw new IOException("Fail to resolve dl path : " + pathToResolve); - } catch (InterruptedException ie) { - throw new IOException("Interrupted when resolving dl path : " + pathToResolve); - } - if (null == data || data.length == 0) { - continue; - } - try { - return DLMetadata.deserialize(uri, data); - } catch (IOException ie) { - throw new IOException("Failed to deserialize uri : " + uri); - } - } - throw new IOException("No bkdl config bound under dl path : " + dlPath); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java index b5abe9f..5d1d888 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java @@ -70,6 +70,13 @@ import org.apache.bookkeeper.stats.StatsLogger; @Beta public interface DistributedLogNamespace { + /** + * Get the namespace driver used by this namespace. + * + * @return namespace driver + */ + NamespaceDriver getNamespaceDriver(); + // // Method to operate logs // http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java index a01bb70..07b3848 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java @@ -17,22 +17,29 @@ */ package com.twitter.distributedlog.namespace; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.twitter.distributedlog.BKDistributedLogNamespace; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +import com.twitter.distributedlog.feature.CoreFeatureKeys; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.injector.AsyncRandomFailureInjector; +import com.twitter.distributedlog.util.ConfUtils; +import com.twitter.distributedlog.util.DLUtils; +import com.twitter.distributedlog.util.OrderedScheduler; +import com.twitter.distributedlog.util.PermitLimiter; +import com.twitter.distributedlog.util.SimplePermitLimiter; +import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.feature.SettableFeatureProvider; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; /** * Builder to construct a <code>DistributedLogNamespace</code>. @@ -50,6 +57,7 @@ public class DistributedLogNamespaceBuilder { } private DistributedLogConfiguration _conf = null; + private DynamicDistributedLogConfiguration _dynConf = null; private URI _uri = null; private StatsLogger _statsLogger = NullStatsLogger.INSTANCE; private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE; @@ -73,6 +81,17 @@ public class DistributedLogNamespaceBuilder { } /** + * Dynamic DistributedLog Configuration used for the namespace + * + * @param dynConf dynamic distributedlog configuration + * @return namespace builder + */ + public DistributedLogNamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) { + this._dynConf = dynConf; + return this; + } + + /** * Namespace Location. * * @param uri @@ -146,6 +165,18 @@ public class DistributedLogNamespaceBuilder { return this; } + @SuppressWarnings("deprecation") + private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + DistributedLogConfiguration conf) { + StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger; + if (perLogStatsLogger == NullStatsLogger.INSTANCE && + conf.getEnablePerStreamStat()) { + normalizedPerLogStatsLogger = statsLogger.scope("stream"); + } + return normalizedPerLogStatsLogger; + } + /** * Build the namespace. * @@ -160,25 +191,17 @@ public class DistributedLogNamespaceBuilder { Preconditions.checkNotNull(_conf, "No DistributedLog Configuration."); Preconditions.checkNotNull(_uri, "No DistributedLog URI"); - // Validate the uri and load the backend according to scheme - String scheme = _uri.getScheme(); - Preconditions.checkNotNull(scheme, "Invalid DistributedLog URI : " + _uri); - String[] schemeParts = StringUtils.split(scheme, '-'); - Preconditions.checkArgument(schemeParts.length > 0, - "Invalid distributedlog scheme found : " + _uri); - Preconditions.checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()), - "Unknown distributedlog scheme found : " + _uri); - - // both distributedlog: & distributedlog-bk: use bookkeeper as the backend - // TODO: we could do reflection to load backend in future. - // if we are going to support other backends : e.g. 'distributedlog-mem:'. - if (schemeParts.length > 1) { - String backendProvider = schemeParts[1]; - Preconditions.checkArgument(Objects.equal(DistributedLogConstants.BACKEND_BK, backendProvider.toLowerCase()), - "Backend '" + backendProvider + "' is not supported yet."); + // validate the configuration + _conf.validate(); + if (null == _dynConf) { + _dynConf = ConfUtils.getConstDynConf(_conf); } - // Built the feature provider + // retrieve the namespace driver + NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri); + URI normalizedUri = DLUtils.normalizeURI(_uri); + + // build the feature provider FeatureProvider featureProvider; if (null == _featureProvider) { featureProvider = new SettableFeatureProvider("", 0); @@ -187,25 +210,69 @@ public class DistributedLogNamespaceBuilder { featureProvider = _featureProvider; } - URI bkUri; - try { - bkUri = new URI( - schemeParts[0], // remove backend info from bookkeeper backend - _uri.getAuthority(), - _uri.getPath(), - _uri.getQuery(), - _uri.getFragment()); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Invalid distributedlog uri found : " + _uri, e); - } + // build the failure injector + AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder() + .injectDelays(_conf.getEIInjectReadAheadDelay(), + _conf.getEIInjectReadAheadDelayPercent(), + _conf.getEIInjectMaxReadAheadDelayMs()) + .injectErrors(false, 10) + .injectStops(_conf.getEIInjectReadAheadStall(), 10) + .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries()) + .build(); - return BKDistributedLogNamespace.newBuilder() - .conf(_conf) - .uri(bkUri) - .statsLogger(_statsLogger) - .featureProvider(featureProvider) - .clientId(_clientId) - .regionId(_regionId) + // normalize the per log stats logger + StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf); + + // build the scheduler + StatsLogger schedulerStatsLogger = _statsLogger.scope("factory").scope("thread_pool"); + OrderedScheduler scheduler = OrderedScheduler.newBuilder() + .name("DLM-" + normalizedUri.getPath()) + .corePoolSize(_conf.getNumWorkerThreads()) + .statsLogger(schedulerStatsLogger) + .perExecutorStatsLogger(schedulerStatsLogger) + .traceTaskExecution(_conf.getEnableTaskExecutionStats()) + .traceTaskExecutionWarnTimeUs(_conf.getTaskExecutionWarnTimeMicros()) .build(); + + // initialize the namespace driver + driver.initialize( + _conf, + _dynConf, + normalizedUri, + scheduler, + featureProvider, + failureInjector, + _statsLogger, + perLogStatsLogger, + DLUtils.normalizeClientId(_clientId), + _regionId); + + // initialize the write limiter + PermitLimiter writeLimiter; + if (_conf.getGlobalOutstandingWriteLimit() < 0) { + writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER; + } else { + Feature disableWriteLimitFeature = featureProvider.getFeature( + CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase()); + writeLimiter = new SimplePermitLimiter( + _conf.getOutstandingWriteLimitDarkmode(), + _conf.getGlobalOutstandingWriteLimit(), + _statsLogger.scope("writeLimiter"), + true /* singleton */, + disableWriteLimitFeature); + } + + return new BKDistributedLogNamespace( + _conf, + normalizedUri, + driver, + scheduler, + featureProvider, + writeLimiter, + failureInjector, + _statsLogger, + perLogStatsLogger, + DLUtils.normalizeClientId(_clientId), + _regionId); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java new file mode 100644 index 0000000..738f124 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.namespace; + +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.MetadataAccessor; +import com.twitter.distributedlog.acl.AccessControlManager; +import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +import com.twitter.distributedlog.exceptions.InvalidStreamNameException; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; +import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; +import com.twitter.distributedlog.metadata.LogMetadataStore; +import com.twitter.distributedlog.metadata.LogStreamMetadataStore; +import com.twitter.distributedlog.subscription.SubscriptionsStore; +import com.twitter.distributedlog.util.OrderedScheduler; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.StatsLogger; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; + +/** + * Manager to manage all the stores required by a namespace. + */ +public interface NamespaceDriver extends Closeable { + + enum Role { + WRITER, + READER + } + + /** + * Initialize the namespace manager. + * + * @param conf distributedlog configuration + * @param dynConf dynamic distributedlog configuration + * @param namespace root uri of the namespace + * @param scheduler ordered scheduler + * @param featureProvider feature provider + * @param statsLogger stats logger + * @param perLogStatsLogger per log stream stats logger + * @param clientId client id + * @return namespace manager + * @throws IOException when failed to initialize the namespace manager + */ + NamespaceDriver initialize(DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + URI namespace, + OrderedScheduler scheduler, + FeatureProvider featureProvider, + AsyncFailureInjector failureInjector, + StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + String clientId, + int regionId) throws IOException; + + /** + * Get the scheme of the namespace driver. + * + * @return the scheme of the namespace driver. + */ + String getScheme(); + + /** + * Get the root uri of the namespace driver. + * + * @return the root uri of the namespace driver. + */ + URI getUri(); + + /** + * Retrieve the log {@code metadata store} used by the namespace. + * + * @return the log metadata store + */ + LogMetadataStore getLogMetadataStore(); + + /** + * Retrieve the log stream {@code metadata store} used by the namespace. + * + * @param role the role to retrieve the log stream metadata store. + * @return the log stream metadata store + */ + LogStreamMetadataStore getLogStreamMetadataStore(Role role); + + /** + * Retrieve the log segment {@code entry store} used by the namespace. + * + * @param role the role to retrieve the log segment entry store. + * @return the log segment entry store. + * @throws IOException when failed to open log segment entry store. + */ + LogSegmentEntryStore getLogSegmentEntryStore(Role role); + + /** + * Create an access control manager to manage/check acl for logs. + * + * @return access control manager for logs under the namespace. + * @throws IOException + */ + AccessControlManager getAccessControlManager() + throws IOException; + + /** + * Retrieve the metadata accessor for log stream {@code streamName}. + * (TODO: it is a legacy interface. should remove it if we have metadata of stream.) + * + * @param streamName name of log stream. + * @return metadata accessor for log stream {@code streamName}. + */ + MetadataAccessor getMetadataAccessor(String streamName) + throws InvalidStreamNameException, IOException; + + /** + * Retrieve the subscriptions store for log stream {@code streamName}. + * + * @return the subscriptions store for log stream {@code streamName} + */ + SubscriptionsStore getSubscriptionsStore(String streamName); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java new file mode 100644 index 0000000..79945ad --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.namespace; + +import com.google.common.base.Objects; +import com.google.common.collect.Sets; +import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.impl.BKNamespaceDriver; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.google.common.base.Preconditions.*; + +/** + * The basic service for managing a set of namespace drivers. + */ +public class NamespaceDriverManager { + + private static final Logger logger = LoggerFactory.getLogger(NamespaceDriverManager.class); + + static class NamespaceDriverInfo { + + final Class<? extends NamespaceDriver> driverClass; + final String driverClassName; + + NamespaceDriverInfo(Class<? extends NamespaceDriver> driverClass) { + this.driverClass = driverClass; + this.driverClassName = this.driverClass.getName(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("driver[") + .append(driverClassName) + .append("]"); + return sb.toString(); + } + } + + private static final ConcurrentMap<String, NamespaceDriverInfo> drivers; + private static boolean initialized = false; + + static { + drivers = new ConcurrentHashMap<String, NamespaceDriverInfo>(); + initialize(); + } + + static void initialize() { + if (initialized) { + return; + } + loadInitialDrivers(); + initialized = true; + logger.info("DistributedLog NamespaceDriverManager initialized"); + } + + private static void loadInitialDrivers() { + Set<String> driverList = Sets.newHashSet(); + // add default bookkeeper based driver + driverList.add(BKNamespaceDriver.class.getName()); + // load drivers from system property + String driversStr = System.getProperty("distributedlog.namespace.drivers"); + if (null != driversStr) { + String[] driversArray = StringUtils.split(driversStr, ':'); + for (String driver : driversArray) { + driverList.add(driver); + } + } + // initialize the drivers + for (String driverClsName : driverList) { + try { + NamespaceDriver driver = + ReflectionUtils.newInstance(driverClsName, NamespaceDriver.class); + NamespaceDriverInfo driverInfo = new NamespaceDriverInfo(driver.getClass()); + drivers.put(driver.getScheme().toLowerCase(), driverInfo); + } catch (Exception ex) { + logger.warn("Failed to load namespace driver {} : ", driverClsName, ex); + } + } + } + + /** + * Prevent the NamespaceDriverManager class from being instantiated. + */ + private NamespaceDriverManager() {} + + /** + * Register the namespace {@code driver}. + * + * @param driver the namespace driver + * @return the namespace driver manager + */ + public static void registerDriver(String backend, Class<? extends NamespaceDriver> driver) { + if (!initialized) { + initialize(); + } + + String scheme = backend.toLowerCase(); + NamespaceDriverInfo oldDriverInfo = drivers.get(scheme); + if (null != oldDriverInfo) { + return; + } + NamespaceDriverInfo newDriverInfo = new NamespaceDriverInfo(driver); + oldDriverInfo = drivers.putIfAbsent(scheme, newDriverInfo); + if (null != oldDriverInfo) { + logger.debug("Driver for {} is already there.", scheme); + } + } + + /** + * Retrieve the namespace driver for {@code scheme}. + * + * @param scheme the scheme for the namespace driver + * @return the namespace driver + * @throws NullPointerException when scheme is null + */ + public static NamespaceDriver getDriver(String scheme) { + checkNotNull(scheme, "Driver Scheme is null"); + if (!initialized) { + initialize(); + } + NamespaceDriverInfo driverInfo = drivers.get(scheme.toLowerCase()); + if (null == driverInfo) { + throw new IllegalArgumentException("Unknown backend " + scheme); + } + return ReflectionUtils.newInstance(driverInfo.driverClass); + } + + /** + * Retrieve the namespace driver for {@code uri}. + * + * @param uri the distributedlog uri + * @return the namespace driver for {@code uri} + * @throws NullPointerException if the distributedlog {@code uri} is null or doesn't have scheme + * or there is no namespace driver registered for the scheme + * @throws IllegalArgumentException if the distributedlog {@code uri} scheme is illegal + */ + public static NamespaceDriver getDriver(URI uri) { + // Validate the uri and load the backend according to scheme + checkNotNull(uri, "DistributedLog uri is null"); + String scheme = uri.getScheme(); + checkNotNull(scheme, "Invalid distributedlog uri : " + uri); + scheme = scheme.toLowerCase(); + String[] schemeParts = StringUtils.split(scheme, '-'); + checkArgument(schemeParts.length > 0, + "Invalid distributedlog scheme found : " + uri); + checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()), + "Unknown distributedlog scheme found : " + uri); + // bookkeeper is the default backend + String backend = DistributedLogConstants.BACKEND_BK; + if (schemeParts.length > 1) { + backend = schemeParts[1]; + } + return getDriver(backend); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java deleted file mode 100644 index 9cd2da5..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionStateStore.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.subscription; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; - -import com.google.common.base.Charsets; - -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; - -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.util.Future; -import com.twitter.util.Promise; - -public class ZKSubscriptionStateStore implements SubscriptionStateStore { - - static final Logger logger = LoggerFactory.getLogger(ZKSubscriptionStateStore.class); - - private final ZooKeeperClient zooKeeperClient; - private final String zkPath; - private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null); - - public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) { - this.zooKeeperClient = zooKeeperClient; - this.zkPath = zkPath; - } - - @Override - public void close() throws IOException { - } - - /** - * Get the last committed position stored for this subscription - */ - @Override - public Future<DLSN> getLastCommitPosition() { - if (null != lastCommittedPosition.get()) { - return Future.value(lastCommittedPosition.get()); - } else { - return getLastCommitPositionFromZK(); - } - } - - Future<DLSN> getLastCommitPositionFromZK() { - final Promise<DLSN> result = new Promise<DLSN>(); - try { - logger.debug("Reading last commit position from path {}", zkPath); - zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc); - if (KeeperException.Code.NONODE.intValue() == rc) { - result.setValue(DLSN.NonInclusiveLowerBound); - } else if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); - } else { - try { - DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8)); - result.setValue(dlsn); - } catch (Exception t) { - logger.warn("Invalid last commit position found from path {}", zkPath, t); - // invalid dlsn recorded in subscription state store - result.setValue(DLSN.NonInclusiveLowerBound); - } - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - result.setException(zkce); - } catch (InterruptedException ie) { - result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie)); - } - return result; - } - - /** - * Advances the position associated with the subscriber - * - * @param newPosition - new commit position - */ - @Override - public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) { - if (null == lastCommittedPosition.get() || - (newPosition.compareTo(lastCommittedPosition.get()) > 0)) { - lastCommittedPosition.set(newPosition); - return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient, - zkPath, newPosition.serialize().getBytes(Charsets.UTF_8), - zooKeeperClient.getDefaultACL(), - CreateMode.PERSISTENT); - } else { - return Future.Done(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java deleted file mode 100644 index f1e6251..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.subscription; - -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.Promise; - -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * ZooKeeper Based Subscriptions Store. - */ -public class ZKSubscriptionsStore implements SubscriptionsStore { - - private final ZooKeeperClient zkc; - private final String zkPath; - private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers = - new ConcurrentHashMap<String, ZKSubscriptionStateStore>(); - - public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) { - this.zkc = zkc; - this.zkPath = zkPath; - } - - private ZKSubscriptionStateStore getSubscriber(String subscriberId) { - ZKSubscriptionStateStore ss = subscribers.get(subscriberId); - if (ss == null) { - ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc, - getSubscriberZKPath(subscriberId)); - ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS); - if (oldSS == null) { - ss = newSS; - } else { - try { - newSS.close(); - } catch (IOException e) { - // ignore the exception - } - ss = oldSS; - } - } - return ss; - } - - private String getSubscriberZKPath(String subscriberId) { - return String.format("%s/%s", zkPath, subscriberId); - } - - @Override - public Future<DLSN> getLastCommitPosition(String subscriberId) { - return getSubscriber(subscriberId).getLastCommitPosition(); - } - - @Override - public Future<Map<String, DLSN>> getLastCommitPositions() { - final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>(); - try { - this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (KeeperException.Code.NONODE.intValue() == rc) { - result.setValue(new HashMap<String, DLSN>()); - } else if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); - } else { - getLastCommitPositions(result, children); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - result.setException(zkce); - } catch (InterruptedException ie) { - result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie)); - } - return result; - } - - private void getLastCommitPositions(final Promise<Map<String, DLSN>> result, - List<String> subscribers) { - List<Future<Pair<String, DLSN>>> futures = - new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size()); - for (String s : subscribers) { - final String subscriber = s; - Future<Pair<String, DLSN>> future = - // Get the last commit position from zookeeper - getSubscriber(subscriber).getLastCommitPositionFromZK().map( - new AbstractFunction1<DLSN, Pair<String, DLSN>>() { - @Override - public Pair<String, DLSN> apply(DLSN dlsn) { - return Pair.of(subscriber, dlsn); - } - }); - futures.add(future); - } - Future.collect(futures).foreach( - new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() { - @Override - public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) { - Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>(); - for (Pair<String, DLSN> pair : subscriptions) { - subscriptionMap.put(pair.getLeft(), pair.getRight()); - } - result.setValue(subscriptionMap); - return BoxedUnit.UNIT; - } - }); - } - - @Override - public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) { - return getSubscriber(subscriberId).advanceCommitPosition(newPosition); - } - - @Override - public Future<Boolean> deleteSubscriber(String subscriberId) { - subscribers.remove(subscriberId); - String path = getSubscriberZKPath(subscriberId); - return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1)); - } - - @Override - public void close() throws IOException { - // no-op - for (SubscriptionStateStore store : subscribers.values()) { - store.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java index 30d6908..4565921 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java @@ -30,7 +30,6 @@ import java.io.PrintWriter; import java.net.MalformedURLException; import java.net.URI; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Enumeration; @@ -53,11 +52,16 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Preconditions; import com.twitter.distributedlog.BKDistributedLogNamespace; import com.twitter.distributedlog.Entry; +import com.twitter.distributedlog.MetadataAccessor; import com.twitter.distributedlog.callback.NamespaceListener; +import com.twitter.distributedlog.impl.BKNamespaceDriver; import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.namespace.NamespaceDriver; import com.twitter.distributedlog.util.Utils; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -100,17 +104,15 @@ import com.twitter.distributedlog.ZooKeeperClientBuilder; import com.twitter.distributedlog.auditor.DLAuditor; import com.twitter.distributedlog.bk.LedgerAllocator; import com.twitter.distributedlog.bk.LedgerAllocatorUtils; -import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.MetadataUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.util.SchedulerUtils; import com.twitter.util.Await; -import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import static com.google.common.base.Charsets.UTF_8; -@SuppressWarnings("deprecation") public class DistributedLogTool extends Tool { static final Logger logger = LoggerFactory.getLogger(DistributedLogTool.class); @@ -161,7 +163,7 @@ public class DistributedLogTool extends Tool { protected URI uri; protected String zkAclId = null; protected boolean force = false; - protected com.twitter.distributedlog.DistributedLogManagerFactory factory = null; + protected DistributedLogNamespace namespace = null; protected PerDLCommand(String name, String description) { super(name, description); @@ -187,8 +189,8 @@ public class DistributedLogTool extends Tool { return runCmd(); } finally { synchronized (this) { - if (null != factory) { - factory.close(); + if (null != namespace) { + namespace.close(); } } } @@ -252,35 +254,33 @@ public class DistributedLogTool extends Tool { this.force = force; } - protected synchronized com.twitter.distributedlog.DistributedLogManagerFactory getFactory() throws IOException { - if (null == this.factory) { - this.factory = new com.twitter.distributedlog.DistributedLogManagerFactory(getConf(), getUri()); - logger.info("Construct DLM : uri = {}", getUri()); - } - return this.factory; - } - protected DistributedLogNamespace getNamespace() throws IOException { - return getFactory().getNamespace(); + if (null == this.namespace) { + this.namespace = DistributedLogNamespaceBuilder.newBuilder() + .uri(getUri()) + .conf(getConf()) + .build(); + } + return this.namespace; } protected LogSegmentMetadataStore getLogSegmentMetadataStore() throws IOException { - DistributedLogNamespace namespace = getFactory().getNamespace(); - assert(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getWriterStreamMetadataStore() + return getNamespace() + .getNamespaceDriver() + .getLogStreamMetadataStore(NamespaceDriver.Role.READER) .getLogSegmentMetadataStore(); } protected ZooKeeperClient getZooKeeperClient() throws IOException { - DistributedLogNamespace namespace = getFactory().getNamespace(); - assert(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL(); + NamespaceDriver driver = getNamespace().getNamespaceDriver(); + assert(driver instanceof BKNamespaceDriver); + return ((BKNamespaceDriver) driver).getWriterZKC(); } protected BookKeeperClient getBookKeeperClient() throws IOException { - DistributedLogNamespace namespace = getFactory().getNamespace(); - assert(namespace instanceof BKDistributedLogNamespace); - return ((BKDistributedLogNamespace) namespace).getReaderBKC(); + NamespaceDriver driver = getNamespace().getNamespaceDriver(); + assert(driver instanceof BKNamespaceDriver); + return ((BKNamespaceDriver) driver).getReaderBKC(); } } @@ -347,6 +347,10 @@ public class DistributedLogTool extends Tool { } } + /** + * NOTE: we might consider adding a command to 'delete' namespace. The implementation of the namespace + * driver should implement the 'delete' operation. + */ protected static class DeleteAllocatorPoolCommand extends PerDLCommand { int concurrency = 1; @@ -380,8 +384,12 @@ public class DistributedLogTool extends Tool { String rootPath = getUri().getPath() + "/" + allocationPoolPath; final ScheduledExecutorService allocationExecutor = Executors.newSingleThreadScheduledExecutor(); ExecutorService executorService = Executors.newFixedThreadPool(concurrency); + Preconditions.checkArgument(getNamespace() instanceof BKDistributedLogNamespace); + BKDistributedLogNamespace bkns = (BKDistributedLogNamespace) getNamespace(); + final ZooKeeperClient zkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getWriterZKC(); + final BookKeeperClient bkc = ((BKNamespaceDriver) bkns.getNamespaceDriver()).getReaderBKC(); try { - List<String> pools = getZooKeeperClient().get().getChildren(rootPath, false); + List<String> pools = zkc.get().getChildren(rootPath, false); final LinkedBlockingQueue<String> poolsToDelete = new LinkedBlockingQueue<String>(); if (getForce() || IOUtils.confirmPrompt("Are you sure you want to delete allocator pools : " + pools)) { for (String pool : pools) { @@ -401,7 +409,7 @@ public class DistributedLogTool extends Tool { try { LedgerAllocator allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(), - getZooKeeperClient(), getBookKeeperClient(), + zkc, bkc, allocationExecutor); allocator.delete(); System.out.println("Deleted allocator pool : " + poolPath + " ."); @@ -454,43 +462,35 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { - if (printMetadata) { - printStreamsWithMetadata(getFactory()); - } else { - printStreams(getFactory()); - } + printStreams(getNamespace()); return 0; } - protected void printStreamsWithMetadata(com.twitter.distributedlog.DistributedLogManagerFactory factory) - throws Exception { - Map<String, byte[]> streams = factory.enumerateLogsWithMetadataInNamespace(); + protected void printStreams(DistributedLogNamespace namespace) throws Exception { + Iterator<String> streams = namespace.getLogs(); System.out.println("Streams under " + getUri() + " : "); System.out.println("--------------------------------"); - for (Map.Entry<String, byte[]> entry : streams.entrySet()) { - println(entry.getKey()); - if (null == entry.getValue() || entry.getValue().length == 0) { + while (streams.hasNext()) { + String streamName = streams.next(); + System.out.println(streamName); + if (!printMetadata) { + continue; + } + MetadataAccessor accessor = + namespace.getNamespaceDriver().getMetadataAccessor(streamName); + byte[] metadata = accessor.getMetadata(); + if (null == metadata || metadata.length == 0) { continue; } if (printHex) { - System.out.println(Hex.encodeHexString(entry.getValue())); + System.out.println(Hex.encodeHexString(metadata)); } else { - System.out.println(new String(entry.getValue(), UTF_8)); + System.out.println(new String(metadata, UTF_8)); } System.out.println(""); } System.out.println("--------------------------------"); } - - protected void printStreams(com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception { - Collection<String> streams = factory.enumerateAllLogsInNamespace(); - System.out.println("Streams under " + getUri() + " : "); - System.out.println("--------------------------------"); - for (String stream : streams) { - System.out.println(stream); - } - System.out.println("--------------------------------"); - } } public static class WatchNamespaceCommand extends PerDLCommand implements NamespaceListener { @@ -609,16 +609,17 @@ public class DistributedLogTool extends Tool { private void inspectStreams(final SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates) throws Exception { - Collection<String> streamCollection = getFactory().enumerateAllLogsInNamespace(); + Iterator<String> streamCollection = getNamespace().getLogs(); final List<String> streams = new ArrayList<String>(); - if (null != streamPrefix) { - for (String s : streamCollection) { + while (streamCollection.hasNext()) { + String s = streamCollection.next(); + if (null != streamPrefix) { if (s.startsWith(streamPrefix)) { streams.add(s); } + } else { + streams.add(s); } - } else { - streams.addAll(streamCollection); } if (0 == streams.size()) { return; @@ -660,8 +661,7 @@ public class DistributedLogTool extends Tool { for (int i = startIdx; i < endIdx; i++) { String s = streams.get(i); BookKeeperClient bkc = getBookKeeperClient(); - DistributedLogManager dlm = - getFactory().createDistributedLogManagerWithSharedClients(s); + DistributedLogManager dlm = getNamespace().openLog(s); try { List<LogSegmentMetadata> segments = dlm.getLogSegments(); if (segments.size() <= 1) { @@ -782,20 +782,21 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { getConf().setZkAclId(getZkAclId()); - return truncateStreams(getFactory()); + return truncateStreams(getNamespace()); } - private int truncateStreams(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception { - Collection<String> streamCollection = factory.enumerateAllLogsInNamespace(); + private int truncateStreams(final DistributedLogNamespace namespace) throws Exception { + Iterator<String> streamCollection = namespace.getLogs(); final List<String> streams = new ArrayList<String>(); - if (null != streamPrefix) { - for (String s : streamCollection) { + while (streamCollection.hasNext()) { + String s = streamCollection.next(); + if (null != streamPrefix) { if (s.startsWith(streamPrefix)) { streams.add(s); } + } else { + streams.add(s); } - } else { - streams.addAll(streamCollection); } if (0 == streams.size()) { return 0; @@ -813,7 +814,7 @@ public class DistributedLogTool extends Tool { @Override public void run() { try { - truncateStreams(factory, streams, tid, numStreamsPerThreads); + truncateStreams(namespace, streams, tid, numStreamsPerThreads); System.out.println("Thread " + tid + " finished."); } catch (IOException e) { System.err.println("Thread " + tid + " quits with exception : " + e.getMessage()); @@ -828,14 +829,13 @@ public class DistributedLogTool extends Tool { return 0; } - private void truncateStreams(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams, + private void truncateStreams(DistributedLogNamespace namespace, List<String> streams, int tid, int numStreamsPerThreads) throws IOException { int startIdx = tid * numStreamsPerThreads; int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads); for (int i = startIdx; i < endIdx; i++) { String s = streams.get(i); - DistributedLogManager dlm = - factory.createDistributedLogManagerWithSharedClients(s); + DistributedLogManager dlm = namespace.openLog(s); try { if (deleteStream) { dlm.delete(); @@ -930,7 +930,7 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { - DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName()); + DistributedLogManager dlm = getNamespace().openLog(getStreamName()); try { if (listEppStats) { bkc = new SimpleBookKeeperClient(getConf(), getUri()); @@ -1078,7 +1078,7 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { - DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName()); + DistributedLogManager dlm = getNamespace().openLog(getStreamName()); try { long count = 0; if (null == endDLSN) { @@ -1141,7 +1141,7 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { getConf().setZkAclId(getZkAclId()); - DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName()); + DistributedLogManager dlm = getNamespace().openLog(getStreamName()); try { dlm.delete(); } finally { @@ -1347,7 +1347,7 @@ public class DistributedLogTool extends Tool { } getConf().setZkAclId(getZkAclId()); for (String stream : streams) { - getFactory().getNamespace().createLog(stream); + getNamespace().createLog(stream); } return 0; } @@ -1435,7 +1435,7 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { - DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName()); + DistributedLogManager dlm = getNamespace().openLog(getStreamName()); long totalCount = dlm.getLogRecordCount(); try { AsyncLogReader reader; @@ -1536,7 +1536,7 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { - DistributedLogManager dlm = getFactory().createDistributedLogManagerWithSharedClients(getStreamName()); + DistributedLogManager dlm = getNamespace().openLog(getStreamName()); try { return inspectAndRepair(dlm.getLogSegments()); } finally { @@ -2640,11 +2640,11 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { getConf().setZkAclId(getZkAclId()); - return truncateStream(getFactory(), getStreamName(), dlsn); + return truncateStream(getNamespace(), getStreamName(), dlsn); } - private int truncateStream(final com.twitter.distributedlog.DistributedLogManagerFactory factory, String streamName, DLSN dlsn) throws Exception { - DistributedLogManager dlm = factory.createDistributedLogManagerWithSharedClients(streamName); + private int truncateStream(final DistributedLogNamespace namespace, String streamName, DLSN dlsn) throws Exception { + DistributedLogManager dlm = namespace.openLog(streamName); try { long totalRecords = dlm.getLogRecordCount(); long recordsAfterTruncate = Await.result(dlm.getLogRecordCountAsync(dlsn)); @@ -2731,7 +2731,6 @@ public class DistributedLogTool extends Tool { int numThreads = 1; String streamPrefix = null; String subscriberId = null; - AtomicInteger streamIndex = new AtomicInteger(); DeleteSubscriberCommand() { super("delete_subscriber", "Delete the subscriber in subscription store. "); @@ -2764,20 +2763,21 @@ public class DistributedLogTool extends Tool { @Override protected int runCmd() throws Exception { getConf().setZkAclId(getZkAclId()); - return deleteSubscriber(getFactory()); + return deleteSubscriber(getNamespace()); } - private int deleteSubscriber(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception { - Collection<String> streamCollection = factory.enumerateAllLogsInNamespace(); + private int deleteSubscriber(final DistributedLogNamespace namespace) throws Exception { + Iterator<String> streamCollection = namespace.getLogs(); final List<String> streams = new ArrayList<String>(); - if (null != streamPrefix) { - for (String s : streamCollection) { + while (streamCollection.hasNext()) { + String s = streamCollection.next(); + if (null != streamPrefix) { if (s.startsWith(streamPrefix)) { streams.add(s); } + } else { + streams.add(s); } - } else { - streams.addAll(streamCollection); } if (0 == streams.size()) { return 0; @@ -2796,7 +2796,7 @@ public class DistributedLogTool extends Tool { @Override public void run() { try { - deleteSubscriber(factory, streams, tid, numStreamsPerThreads); + deleteSubscriber(namespace, streams, tid, numStreamsPerThreads); System.out.println("Thread " + tid + " finished."); } catch (Exception e) { System.err.println("Thread " + tid + " quits with exception : " + e.getMessage()); @@ -2811,14 +2811,13 @@ public class DistributedLogTool extends Tool { return 0; } - private void deleteSubscriber(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams, + private void deleteSubscriber(DistributedLogNamespace namespace, List<String> streams, int tid, int numStreamsPerThreads) throws Exception { int startIdx = tid * numStreamsPerThreads; int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads); for (int i = startIdx; i < endIdx; i++) { final String s = streams.get(i); - DistributedLogManager dlm = - factory.createDistributedLogManagerWithSharedClients(s); + DistributedLogManager dlm = namespace.openLog(s); final CountDownLatch countDownLatch = new CountDownLatch(1); dlm.getSubscriptionsStore().deleteSubscriber(subscriberId) .addEventListener(new FutureEventListener<Boolean>() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java index 63db1fe..2f9e091 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java @@ -17,34 +17,27 @@ */ package com.twitter.distributedlog.util; +import com.google.common.base.Objects; +import com.twitter.distributedlog.DistributedLogConstants; import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.exceptions.InvalidStreamNameException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.lang.StringUtils; +import java.net.InetAddress; import java.net.URI; +import java.net.URISyntaxException; import java.util.List; import static com.google.common.base.Charsets.UTF_8; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; /** * Utilities about DL implementations like uri, log segments, metadata serialization and deserialization. */ public class DLUtils { - static final Logger logger = LoggerFactory.getLogger(DLUtils.class); - - /** - * Extract zk servers fro dl <i>uri</i>. - * - * @param uri - * dl uri - * @return zk servers - */ - public static String getZKServersFromDLUri(URI uri) { - return uri.getAuthority().replace(";", ","); - } - /** * Find the log segment whose transaction ids are not less than provided <code>transactionId</code>. * @@ -224,4 +217,105 @@ public class DLUtils { public static long bytes2LogSegmentId(byte[] data) { return Long.parseLong(new String(data, UTF_8)); } + + /** + * Normalize the uri. + * + * @param uri the distributedlog uri. + * @return the normalized uri + */ + public static URI normalizeURI(URI uri) { + checkNotNull(uri, "DistributedLog uri is null"); + String scheme = uri.getScheme(); + checkNotNull(scheme, "Invalid distributedlog uri : " + uri); + scheme = scheme.toLowerCase(); + String[] schemeParts = StringUtils.split(scheme, '-'); + checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()), + "Unknown distributedlog scheme found : " + uri); + URI normalizedUri; + try { + normalizedUri = new URI( + schemeParts[0], // remove backend info + uri.getAuthority(), + uri.getPath(), + uri.getQuery(), + uri.getFragment()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid distributedlog uri found : " + uri, e); + } + return normalizedUri; + } + + private static String getHostIpLockClientId() { + try { + return InetAddress.getLocalHost().toString(); + } catch(Exception ex) { + return DistributedLogConstants.UNKNOWN_CLIENT_ID; + } + } + + /** + * Normalize the client id. + * + * @return the normalized client id. + */ + public static String normalizeClientId(String clientId) { + String normalizedClientId; + if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) { + normalizedClientId = getHostIpLockClientId(); + } else { + normalizedClientId = clientId; + } + return normalizedClientId; + } + + /** + * Is it a reserved stream name in bkdl namespace? + * + * @param name + * stream name + * @return true if it is reserved name, otherwise false. + */ + public static boolean isReservedStreamName(String name) { + return name.startsWith("."); + } + + /** + * Validate the stream name. + * + * @param nameOfStream + * name of stream + * @throws InvalidStreamNameException + */ + public static void validateName(String nameOfStream) + throws InvalidStreamNameException { + String reason = null; + char chars[] = nameOfStream.toCharArray(); + char c; + // validate the stream to see if meet zookeeper path's requirement + for (int i = 0; i < chars.length; i++) { + c = chars[i]; + + if (c == 0) { + reason = "null character not allowed @" + i; + break; + } else if (c == '/') { + reason = "'/' not allowed @" + i; + break; + } else if (c > '\u0000' && c < '\u001f' + || c > '\u007f' && c < '\u009F' + || c > '\ud800' && c < '\uf8ff' + || c > '\ufff0' && c < '\uffff') { + reason = "invalid charater @" + i; + break; + } + } + if (null != reason) { + throw new InvalidStreamNameException(nameOfStream, reason); + } + if (isReservedStreamName(nameOfStream)) { + throw new InvalidStreamNameException(nameOfStream, + "Stream Name is reserved"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java index 266409e..f206a25 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java @@ -271,7 +271,7 @@ public class FutureUtils { * * @param throwable the cause of the exception * @return the bk exception return code. if the exception isn't bk exceptions, - * it would return bk exception code. + * it would return {@link BKException.Code#UnexpectedConditionException}. */ public static int bkResultCode(Throwable throwable) { if (throwable instanceof BKException) { @@ -455,13 +455,13 @@ public class FutureUtils { * @param key submit key of the ordered scheduler */ public static <T> void setException(final Promise<T> promise, - final Throwable throwable, + final Throwable cause, OrderedScheduler scheduler, Object key) { scheduler.submit(key, new Runnable() { @Override public void run() { - setException(promise, throwable); + setException(promise, cause); } }); }