http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java index ee9b00e..71a1f98 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java @@ -18,16 +18,15 @@ package org.apache.distributedlog.metadata; import com.google.common.base.Preconditions; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { @@ -57,8 +56,8 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { } @Override - public Future<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata segment, - LogRecordWithDLSN record) { + public CompletableFuture<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata segment, + LogRecordWithDLSN record) { DLSN dlsn = record.getDlsn(); Preconditions.checkState(!segment.isInProgress(), "Updating last dlsn for an inprogress log segment isn't supported."); @@ -73,7 +72,7 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { } @Override - public Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment, + public CompletableFuture<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment, long logSegmentSeqNo) { String newZkPath = segment.getZkPath() .replace(formatLogSegmentSequenceNumber(segment.getLogSegmentSequenceNumber()), @@ -92,7 +91,7 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { * @return new log segment */ @Override - public Future<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata segment) { + public CompletableFuture<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata segment) { final LogSegmentMetadata newSegment = segment.mutator() .setTruncationStatus(LogSegmentMetadata.TruncationStatus.ACTIVE) .build(); @@ -106,7 +105,7 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { * @return new log segment */ @Override - public Future<LogSegmentMetadata> setLogSegmentTruncated(LogSegmentMetadata segment) { + public CompletableFuture<LogSegmentMetadata> setLogSegmentTruncated(LogSegmentMetadata segment) { final LogSegmentMetadata newSegment = segment.mutator() .setTruncationStatus(LogSegmentMetadata.TruncationStatus.TRUNCATED) .build(); @@ -130,7 +129,7 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { * @return new log segment */ @Override - public Future<LogSegmentMetadata> setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, DLSN minActiveDLSN) { + public CompletableFuture<LogSegmentMetadata> setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, DLSN minActiveDLSN) { final LogSegmentMetadata newSegment = segment.mutator() .setTruncationStatus(LogSegmentMetadata.TruncationStatus.PARTIALLY_TRUNCATED) .setMinActiveDLSN(minActiveDLSN) @@ -150,28 +149,18 @@ public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { return newSegment; } - protected Future<LogSegmentMetadata> updateSegmentMetadata(final LogSegmentMetadata segment) { + protected CompletableFuture<LogSegmentMetadata> updateSegmentMetadata(final LogSegmentMetadata segment) { Transaction<Object> txn = transaction(); metadataStore.updateLogSegment(txn, segment); - return txn.execute().map(new AbstractFunction1<Void, LogSegmentMetadata>() { - @Override - public LogSegmentMetadata apply(Void value) { - return segment; - } - }); + return txn.execute().thenApply((value) -> segment); } - protected Future<LogSegmentMetadata> addNewSegmentAndDeleteOldSegment( + protected CompletableFuture<LogSegmentMetadata> addNewSegmentAndDeleteOldSegment( final LogSegmentMetadata newSegment, LogSegmentMetadata oldSegment) { LOG.info("old segment {} new segment {}", oldSegment, newSegment); Transaction<Object> txn = transaction(); addNewSegmentAndDeleteOldSegment(txn, newSegment, oldSegment); - return txn.execute().map(new AbstractFunction1<Void, LogSegmentMetadata>() { - @Override - public LogSegmentMetadata apply(Void value) { - return newSegment; - } - }); + return txn.execute().thenApply((value) -> newSegment); } protected void addNewSegmentAndDeleteOldSegment(Transaction<Object> txn,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java index f1e8f06..37ecab4 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java @@ -19,11 +19,11 @@ package org.apache.distributedlog.metadata; import com.google.common.annotations.Beta; import com.google.common.base.Optional; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.lock.DistributedLock; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; -import org.apache.distributedlog.util.PermitManager; +import org.apache.distributedlog.common.util.PermitManager; import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Future; import java.io.Closeable; import java.net.URI; @@ -47,10 +47,10 @@ public interface LogStreamMetadataStore extends Closeable { * * @param uri the location of the log stream * @param logName the name of the log stream - * @return future represents the existence of a log stream. {@link org.apache.distributedlog.LogNotFoundException} - * is thrown if the log doesn't exist + * @return future represents the existence of a log stream. + * {@link org.apache.distributedlog.exceptions.LogNotFoundException} is thrown if the log doesn't exist */ - Future<Void> logExists(URI uri, String logName); + CompletableFuture<Void> logExists(URI uri, String logName); /** * Create the read lock for the log stream. @@ -59,7 +59,7 @@ public interface LogStreamMetadataStore extends Closeable { * @param readerId the reader id used for lock * @return the read lock */ - Future<DistributedLock> createReadLock(LogMetadataForReader metadata, + CompletableFuture<DistributedLock> createReadLock(LogMetadataForReader metadata, Optional<String> readerId); /** @@ -79,7 +79,7 @@ public interface LogStreamMetadataStore extends Closeable { * @param createIfNotExists flag to create the stream if it doesn't exist * @return the metadata of the log */ - Future<LogMetadataForWriter> getLog(URI uri, + CompletableFuture<LogMetadataForWriter> getLog(URI uri, String streamName, boolean ownAllocator, boolean createIfNotExists); @@ -91,7 +91,7 @@ public interface LogStreamMetadataStore extends Closeable { * @param streamName the name of the log stream * @return future represents the result of the deletion. */ - Future<Void> deleteLog(URI uri, String streamName); + CompletableFuture<Void> deleteLog(URI uri, String streamName); /** * Get the log segment metadata store. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java index 06a0600..793a2c9 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/MetadataUpdater.java @@ -17,11 +17,11 @@ */ package org.apache.distributedlog.metadata; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Future; /** * An updater to update metadata. It contains utility functions on mutating metadata. @@ -44,8 +44,8 @@ public interface MetadataUpdater { * correct last record. * @return new log segment */ - Future<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata segment, - LogRecordWithDLSN record); + CompletableFuture<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata segment, + LogRecordWithDLSN record); /** * Change ledger sequence number of <i>segment</i> to given <i>logSegmentSeqNo</i>. @@ -56,7 +56,7 @@ public interface MetadataUpdater { * ledger sequence number to change. * @return new log segment */ - Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment, + CompletableFuture<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment, long logSegmentSeqNo); /** @@ -66,7 +66,7 @@ public interface MetadataUpdater { * log segment to change truncation status to active. * @return new log segment */ - Future<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata segment); + CompletableFuture<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata segment); /** * Change the truncation status of a <i>log segment</i> to truncated @@ -75,7 +75,7 @@ public interface MetadataUpdater { * log segment to change truncation status to truncated. * @return new log segment */ - Future<LogSegmentMetadata> setLogSegmentTruncated(LogSegmentMetadata segment); + CompletableFuture<LogSegmentMetadata> setLogSegmentTruncated(LogSegmentMetadata segment); /** * Change the truncation status of a <i>log segment</i> to truncated. The operation won't be executed @@ -98,7 +98,7 @@ public interface MetadataUpdater { * DLSN within the log segment before which log has been truncated * @return new log segment */ - Future<LogSegmentMetadata> setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, + CompletableFuture<LogSegmentMetadata> setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, DLSN minActiveDLSN); /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java deleted file mode 100644 index 4cbee98..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.namespace; - -import com.google.common.annotations.Beta; -import com.google.common.base.Optional; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; -import org.apache.distributedlog.exceptions.LogNotFoundException; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.callback.NamespaceListener; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.exceptions.InvalidStreamNameException; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * A namespace is the basic unit for managing a set of distributedlogs. - * - * <h4>Namespace Interface</h4> - * - * <P> - * The <code>DistributedLogNamespace</code> interface is implemented by different backend providers. - * There are several components are required for an implementation: - * <OL> - * <LI>Log Management -- manage logs in a given namespace. e.g. create/open/delete log, list of logs, - * watch the changes of logs. - * <LI>Access Control -- manage the access controls for logs in the namespace. - * </OL> - * </P> - * - * <h4>Namespace Location</h4> - * - * At the highest level, a <code>DistributedLogNamespace</code> is located by a <code>URI</code>. The location - * URI is in string form has the syntax - * - * <blockquote> - * distributedlog[<tt><b>-</b></tt><i>provider</i>]<tt><b>:</b></tt><i>provider-specific-path</i> - * </blockquote> - * - * where square brackets [...] delineate optional components and the characters <tt><b>-</b></tt> and <tt><b>:</b></tt> - * stand for themselves. - * - * The <code>provider</code> part in the URI indicates what is the backend used for this namespace. For example: - * <i>distributedlog-bk</i> URI is storing logs in bookkeeper, while <i>distributedlog-mem</i> URI is storing logs in - * memory. The <code>provider</code> part is optional. It would use bookkeeper backend if the <i>provider</i> part - * is omitted. - * - * @see DistributedLogManager - * @since 0.3.32 - */ -@Beta -public interface DistributedLogNamespace { - - /** - * Get the namespace driver used by this namespace. - * - * @return namespace driver - */ - NamespaceDriver getNamespaceDriver(); - - // - // Method to operate logs - // - - /** - * Create a log named <i>logName</i>. - * - * @param logName - * name of the log - * @throws InvalidStreamNameException if log name is invalid. - * @throws IOException when encountered issues with backend. - */ - void createLog(String logName) - throws InvalidStreamNameException, IOException; - - /** - * Delete a log named <i>logName</i>. - * - * @param logName - * name of the log - * @throws InvalidStreamNameException if log name is invalid - * @throws LogNotFoundException if log doesn't exist - * @throws IOException when encountered issues with backend - */ - void deleteLog(String logName) - throws InvalidStreamNameException, LogNotFoundException, IOException; - - /** - * Open a log named <i>logName</i>. - * A distributedlog manager is returned to access log <i>logName</i>. - * - * @param logName - * name of the log - * @return distributedlog manager instance. - * @throws InvalidStreamNameException if log name is invalid. - * @throws IOException when encountered issues with backend. - */ - DistributedLogManager openLog(String logName) - throws InvalidStreamNameException, IOException; - - /** - * Open a log named <i>logName</i> with specific log configurations. - * - * <p>This method allows the caller to override global configuration settings by - * supplying log configuration overrides. Log config overrides come in two flavors, - * static and dynamic. Static config never changes in the lifecyle of <code>DistributedLogManager</code>, - * dynamic config changes by reloading periodically and safe to access from any context.</p> - * - * @param logName - * name of the log - * @param logConf - * static log configuration - * @param dynamicLogConf - * dynamic log configuration - * @return distributedlog manager instance. - * @throws InvalidStreamNameException if log name is invalid. - * @throws IOException when encountered issues with backend. - */ - DistributedLogManager openLog(String logName, - Optional<DistributedLogConfiguration> logConf, - Optional<DynamicDistributedLogConfiguration> dynamicLogConf, - Optional<StatsLogger> perStreamStatsLogger) - throws InvalidStreamNameException, IOException; - - /** - * Check whether the log <i>logName</i> exist. - * - * @param logName - * name of the log - * @return <code>true</code> if the log exists, otherwise <code>false</code>. - * @throws IOException when encountered exceptions on checking - */ - boolean logExists(String logName) - throws IOException; - - /** - * Retrieve the logs under the namespace. - * - * @return iterator of the logs under the namespace. - * @throws IOException when encountered issues with backend. - */ - Iterator<String> getLogs() - throws IOException; - - // - // Methods for namespace - // - - /** - * Register namespace listener on stream updates under the namespace. - * - * @param listener - * listener to receive stream updates under the namespace - */ - void registerNamespaceListener(NamespaceListener listener); - - /** - * Create an access control manager to manage/check acl for logs. - * - * @return access control manager for logs under the namespace. - * @throws IOException - */ - AccessControlManager createAccessControlManager() - throws IOException; - - /** - * Close the namespace. - */ - void close(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java deleted file mode 100644 index 2706201..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.namespace; - -import com.google.common.base.Preconditions; -import org.apache.distributedlog.BKDistributedLogNamespace; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogConstants; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.feature.CoreFeatureKeys; -import org.apache.distributedlog.injector.AsyncFailureInjector; -import org.apache.distributedlog.injector.AsyncRandomFailureInjector; -import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.DLUtils; -import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.PermitLimiter; -import org.apache.distributedlog.util.SimplePermitLimiter; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.SettableFeatureProvider; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; - -/** - * Builder to construct a <code>DistributedLogNamespace</code>. - * The builder takes the responsibility of loading backend according to the uri. - * - * @see DistributedLogNamespace - * @since 0.3.32 - */ -public class DistributedLogNamespaceBuilder { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogNamespaceBuilder.class); - - public static DistributedLogNamespaceBuilder newBuilder() { - return new DistributedLogNamespaceBuilder(); - } - - private DistributedLogConfiguration _conf = null; - private DynamicDistributedLogConfiguration _dynConf = null; - private URI _uri = null; - private StatsLogger _statsLogger = NullStatsLogger.INSTANCE; - private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE; - private FeatureProvider _featureProvider = null; - private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID; - private int _regionId = DistributedLogConstants.LOCAL_REGION_ID; - - // private constructor - private DistributedLogNamespaceBuilder() {} - - /** - * DistributedLog Configuration used for the namespace. - * - * @param conf - * distributedlog configuration - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder conf(DistributedLogConfiguration conf) { - this._conf = conf; - return this; - } - - /** - * Dynamic DistributedLog Configuration used for the namespace - * - * @param dynConf dynamic distributedlog configuration - * @return namespace builder - */ - public DistributedLogNamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) { - this._dynConf = dynConf; - return this; - } - - /** - * Namespace Location. - * - * @param uri - * namespace location uri. - * @see DistributedLogNamespace - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder uri(URI uri) { - this._uri = uri; - return this; - } - - /** - * Stats Logger used for stats collection - * - * @param statsLogger - * stats logger - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder statsLogger(StatsLogger statsLogger) { - this._statsLogger = statsLogger; - return this; - } - - /** - * Stats Logger used for collecting per log stats. - * - * @param statsLogger - * stats logger for collecting per log stats - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder perLogStatsLogger(StatsLogger statsLogger) { - this._perLogStatsLogger = statsLogger; - return this; - } - - /** - * Feature provider used to control the availabilities of features in the namespace. - * - * @param featureProvider - * feature provider to control availabilities of features. - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder featureProvider(FeatureProvider featureProvider) { - this._featureProvider = featureProvider; - return this; - } - - /** - * Client Id used for accessing the namespace - * - * @param clientId - * client id used for accessing the namespace - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder clientId(String clientId) { - this._clientId = clientId; - return this; - } - - /** - * Region Id used for encoding logs in the namespace. The region id - * is useful when the namespace is globally spanning over regions. - * - * @param regionId - * region id. - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder regionId(int regionId) { - this._regionId = regionId; - return this; - } - - @SuppressWarnings("deprecation") - private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - DistributedLogConfiguration conf) { - StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger; - if (perLogStatsLogger == NullStatsLogger.INSTANCE && - conf.getEnablePerStreamStat()) { - normalizedPerLogStatsLogger = statsLogger.scope("stream"); - } - return normalizedPerLogStatsLogger; - } - - /** - * Build the namespace. - * - * @return the namespace instance. - * @throws IllegalArgumentException when there is illegal argument provided in the builder - * @throws NullPointerException when there is null argument provided in the builder - * @throws IOException when fail to build the backend - */ - public DistributedLogNamespace build() - throws IllegalArgumentException, NullPointerException, IOException { - // Check arguments - Preconditions.checkNotNull(_conf, "No DistributedLog Configuration."); - Preconditions.checkNotNull(_uri, "No DistributedLog URI"); - - // validate the configuration - _conf.validate(); - if (null == _dynConf) { - _dynConf = ConfUtils.getConstDynConf(_conf); - } - - // retrieve the namespace driver - NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri); - URI normalizedUri = DLUtils.normalizeURI(_uri); - - // build the feature provider - FeatureProvider featureProvider; - if (null == _featureProvider) { - featureProvider = new SettableFeatureProvider("", 0); - logger.info("No feature provider is set. All features are disabled now."); - } else { - featureProvider = _featureProvider; - } - - // build the failure injector - AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder() - .injectDelays(_conf.getEIInjectReadAheadDelay(), - _conf.getEIInjectReadAheadDelayPercent(), - _conf.getEIInjectMaxReadAheadDelayMs()) - .injectErrors(false, 10) - .injectStops(_conf.getEIInjectReadAheadStall(), 10) - .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries()) - .build(); - - // normalize the per log stats logger - StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf); - - // build the scheduler - StatsLogger schedulerStatsLogger = _statsLogger.scope("factory").scope("thread_pool"); - OrderedScheduler scheduler = OrderedScheduler.newBuilder() - .name("DLM-" + normalizedUri.getPath()) - .corePoolSize(_conf.getNumWorkerThreads()) - .statsLogger(schedulerStatsLogger) - .perExecutorStatsLogger(schedulerStatsLogger) - .traceTaskExecution(_conf.getEnableTaskExecutionStats()) - .traceTaskExecutionWarnTimeUs(_conf.getTaskExecutionWarnTimeMicros()) - .build(); - - // initialize the namespace driver - driver.initialize( - _conf, - _dynConf, - normalizedUri, - scheduler, - featureProvider, - failureInjector, - _statsLogger, - perLogStatsLogger, - DLUtils.normalizeClientId(_clientId), - _regionId); - - // initialize the write limiter - PermitLimiter writeLimiter; - if (_conf.getGlobalOutstandingWriteLimit() < 0) { - writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER; - } else { - Feature disableWriteLimitFeature = featureProvider.getFeature( - CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase()); - writeLimiter = new SimplePermitLimiter( - _conf.getOutstandingWriteLimitDarkmode(), - _conf.getGlobalOutstandingWriteLimit(), - _statsLogger.scope("writeLimiter"), - true /* singleton */, - disableWriteLimitFeature); - } - - return new BKDistributedLogNamespace( - _conf, - normalizedUri, - driver, - scheduler, - featureProvider, - writeLimiter, - failureInjector, - _statsLogger, - perLogStatsLogger, - DLUtils.normalizeClientId(_clientId), - _regionId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java index 2f5adc6..cf970ef 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java @@ -18,16 +18,15 @@ package org.apache.distributedlog.namespace; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.MetadataAccessor; +import org.apache.distributedlog.api.MetadataAccessor; import org.apache.distributedlog.acl.AccessControlManager; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.InvalidStreamNameException; import org.apache.distributedlog.injector.AsyncFailureInjector; import org.apache.distributedlog.logsegment.LogSegmentEntryStore; -import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; import org.apache.distributedlog.metadata.LogMetadataStore; import org.apache.distributedlog.metadata.LogStreamMetadataStore; -import org.apache.distributedlog.subscription.SubscriptionsStore; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.StatsLogger; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java b/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java deleted file mode 100644 index 14db547..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRate.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.rate; - -public interface MovingAverageRate { - double get(); - void add(long amount); - void inc(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java deleted file mode 100644 index cd33e24..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/MovingAverageRateFactory.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.rate; - -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.TimerTask; -import com.twitter.util.Timer; -import com.twitter.util.Time; -import java.util.concurrent.CopyOnWriteArrayList; -import scala.runtime.BoxedUnit; - -public class MovingAverageRateFactory { - - private static final int DEFAULT_INTERVAL_SECS = 1; - - private final Timer timer; - private final TimerTask timerTask; - private final CopyOnWriteArrayList<SampledMovingAverageRate> avgs; - - public MovingAverageRateFactory(Timer timer) { - this.avgs = new CopyOnWriteArrayList<SampledMovingAverageRate>(); - this.timer = timer; - Function0<BoxedUnit> sampleTask = new Function0<BoxedUnit>() { - public BoxedUnit apply() { - sampleAll(); - return null; - } - }; - this.timerTask = timer.schedulePeriodically( - Time.now(), Duration.fromSeconds(DEFAULT_INTERVAL_SECS), sampleTask); - } - - public MovingAverageRate create(int intervalSecs) { - SampledMovingAverageRate avg = new SampledMovingAverageRate(intervalSecs); - avgs.add(avg); - return avg; - } - - public void close() { - timerTask.cancel(); - avgs.clear(); - } - - private void sampleAll() { - for (SampledMovingAverageRate avg : avgs) { - avg.sample(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java b/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java deleted file mode 100644 index 0b3ccac..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/rate/SampledMovingAverageRate.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.rate; - -import com.twitter.common.stats.Rate; -import com.twitter.util.TimerTask; -import com.twitter.util.Timer; -import com.twitter.util.Time; -import java.util.concurrent.atomic.AtomicLong; - -class SampledMovingAverageRate implements MovingAverageRate { - private final Rate rate; - private final AtomicLong total; - - private double value; - - public SampledMovingAverageRate(int intervalSecs) { - this.total = new AtomicLong(0); - this.rate = Rate.of("Ignore", total) - .withWindowSize(intervalSecs) - .build(); - this.value = 0; - } - - @Override - public double get() { - return value; - } - - @Override - public void add(long amount) { - total.getAndAdd(amount); - } - - @Override - public void inc() { - add(1); - } - - void sample() { - value = rate.doSample(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java deleted file mode 100644 index d81f8a4..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/readahead/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * ReadAhead Mechanism for distributedlog streaming reads - */ -package org.apache.distributedlog.readahead; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java b/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java deleted file mode 100644 index 199aa4c..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BKExceptionStatsLogger.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.stats; - -import org.apache.bookkeeper.client.BKException.Code; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; - -import java.util.HashMap; -import java.util.Map; - -/** - * A Util to logger stats on bk exceptions. - */ -public class BKExceptionStatsLogger { - - public static String getMessage(int code) { - switch (code) { - case Code.OK: - return "OK"; - case Code.ReadException: - return "ReadException"; - case Code.QuorumException: - return "QuorumException"; - case Code.NoBookieAvailableException: - return "NoBookieAvailableException"; - case Code.DigestNotInitializedException: - return "DigestNotInitializedException"; - case Code.DigestMatchException: - return "DigestMatchException"; - case Code.NotEnoughBookiesException: - return "NotEnoughBookiesException"; - case Code.NoSuchLedgerExistsException: - return "NoSuchLedgerExistsException"; - case Code.BookieHandleNotAvailableException: - return "BookieHandleNotAvailableException"; - case Code.ZKException: - return "ZKException"; - case Code.LedgerRecoveryException: - return "LedgerRecoveryException"; - case Code.LedgerClosedException: - return "LedgerClosedException"; - case Code.WriteException: - return "WriteException"; - case Code.NoSuchEntryException: - return "NoSuchEntryException"; - case Code.IncorrectParameterException: - return "IncorrectParameterException"; - case Code.InterruptedException: - return "InterruptedException"; - case Code.ProtocolVersionException: - return "ProtocolVersionException"; - case Code.MetadataVersionException: - return "MetadataVersionException"; - case Code.LedgerFencedException: - return "LedgerFencedException"; - case Code.UnauthorizedAccessException: - return "UnauthorizedAccessException"; - case Code.UnclosedFragmentException: - return "UnclosedFragmentException"; - case Code.WriteOnReadOnlyBookieException: - return "WriteOnReadOnlyBookieException"; - case Code.IllegalOpException: - return "IllegalOpException"; - default: - return "UnexpectedException"; - } - } - - private final StatsLogger parentLogger; - private final Map<Integer, Counter> exceptionCounters; - - public BKExceptionStatsLogger(StatsLogger parentLogger) { - this.parentLogger = parentLogger; - this.exceptionCounters = new HashMap<Integer, Counter>(); - } - - public Counter getExceptionCounter(int rc) { - Counter counter = exceptionCounters.get(rc); - if (null != counter) { - return counter; - } - // TODO: it would be better to have BKException.Code.get(rc) - synchronized (exceptionCounters) { - counter = exceptionCounters.get(rc); - if (null != counter) { - return counter; - } - counter = parentLogger.getCounter(getMessage(rc)); - exceptionCounters.put(rc, counter); - } - return counter; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java b/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java deleted file mode 100644 index b6ca733..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/BroadCastStatsLogger.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.stats; - -import com.google.common.base.Preconditions; - -import org.apache.bookkeeper.stats.CachingStatsLogger; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.OpStatsData; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Stats Loggers that broadcast stats to multiple {@link StatsLogger}. - */ -public class BroadCastStatsLogger { - - /** - * Create a broadcast stats logger of two stats loggers `<code>first</code>` and - * `<code>second</code>`. The returned stats logger doesn't allow registering any - * {@link Gauge}. - * - * @param first - * first stats logger - * @param second - * second stats logger - * @return broadcast stats logger - */ - public static StatsLogger two(StatsLogger first, StatsLogger second) { - return new CachingStatsLogger(new Two(first, second)); - } - - static class Two implements StatsLogger { - protected final StatsLogger first; - protected final StatsLogger second; - - private Two(StatsLogger first, StatsLogger second) { - super(); - Preconditions.checkNotNull(first); - Preconditions.checkNotNull(second); - this.first = first; - this.second = second; - } - - @Override - public OpStatsLogger getOpStatsLogger(final String statName) { - final OpStatsLogger firstLogger = first.getOpStatsLogger(statName); - final OpStatsLogger secondLogger = second.getOpStatsLogger(statName); - return new OpStatsLogger() { - @Override - public void registerFailedEvent(long l) { - firstLogger.registerFailedEvent(l); - secondLogger.registerFailedEvent(l); - } - - @Override - public void registerSuccessfulEvent(long l) { - firstLogger.registerSuccessfulEvent(l); - secondLogger.registerSuccessfulEvent(l); - } - - @Override - public OpStatsData toOpStatsData() { - // Eventually consistent. - return firstLogger.toOpStatsData(); - } - - @Override - public void clear() { - firstLogger.clear(); - secondLogger.clear(); - } - }; - } - - @Override - public Counter getCounter(final String statName) { - final Counter firstCounter = first.getCounter(statName); - final Counter secondCounter = second.getCounter(statName); - return new Counter() { - @Override - public void clear() { - firstCounter.clear(); - secondCounter.clear(); - } - - @Override - public void inc() { - firstCounter.inc(); - secondCounter.inc(); - } - - @Override - public void dec() { - firstCounter.dec(); - secondCounter.dec(); - } - - @Override - public void add(long l) { - firstCounter.add(l); - secondCounter.add(l); - } - - @Override - public Long get() { - // Eventually consistent. - return firstCounter.get(); - } - }; - } - - @Override - public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) { - // Different underlying stats loggers have different semantics wrt. gauge registration. - throw new RuntimeException("Cannot register a gauge on BroadCastStatsLogger.Two"); - } - - @Override - public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { - // no-op - } - - @Override - public StatsLogger scope(final String scope) { - return new Two(first.scope(scope), second.scope(scope)); - } - - @Override - public void removeScope(String scope, StatsLogger statsLogger) { - if (!(statsLogger instanceof Two)) { - return; - } - - Two another = (Two) statsLogger; - - first.removeScope(scope, another.first); - second.removeScope(scope, another.second); - } - } - - /** - * Create a broadcast stats logger of two stats loggers <code>master</code> and <code>slave</code>. - * It is similar as {@link #two(StatsLogger, StatsLogger)}, but it allows registering {@link Gauge}s. - * The {@link Gauge} will be registered under master. - * - * @param master - * master stats logger to receive {@link Counter}, {@link OpStatsLogger} and {@link Gauge}. - * @param slave - * slave stats logger to receive only {@link Counter} and {@link OpStatsLogger}. - * @return broadcast stats logger - */ - public static StatsLogger masterslave(StatsLogger master, StatsLogger slave) { - return new CachingStatsLogger(new MasterSlave(master, slave)); - } - - static class MasterSlave extends Two { - - private MasterSlave(StatsLogger master, StatsLogger slave) { - super(master, slave); - } - - @Override - public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) { - first.registerGauge(statName, gauge); - } - - @Override - public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { - first.unregisterGauge(statName, gauge); - } - - @Override - public StatsLogger scope(String scope) { - return new MasterSlave(first.scope(scope), second.scope(scope)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java b/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java deleted file mode 100644 index 43641f0..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.stats; - -import com.google.common.base.Stopwatch; -import com.twitter.util.FutureEventListener; -import org.apache.bookkeeper.stats.OpStatsLogger; -import java.util.concurrent.TimeUnit; - -public class OpStatsListener<T> implements FutureEventListener<T> { - OpStatsLogger opStatsLogger; - Stopwatch stopwatch; - - public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) { - this.opStatsLogger = opStatsLogger; - if (null == stopwatch) { - this.stopwatch = Stopwatch.createStarted(); - } else { - this.stopwatch = stopwatch; - } - } - - public OpStatsListener(OpStatsLogger opStatsLogger) { - this(opStatsLogger, null); - } - - @Override - public void onSuccess(T value) { - opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - - @Override - public void onFailure(Throwable cause) { - opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java deleted file mode 100644 index ebfc32a..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionStateStore.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.subscription; - -import java.io.Closeable; - -import scala.runtime.BoxedUnit; - -import org.apache.distributedlog.DLSN; -import com.twitter.util.Future; - -public interface SubscriptionStateStore extends Closeable { - /** - * Get the last committed position stored for this subscription - * - * @return future represents the last commit position - */ - public Future<DLSN> getLastCommitPosition(); - - /** - * Advances the position associated with the subscriber - * - * @param newPosition - new commit position - * @return future represents the advance result - */ - public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java deleted file mode 100644 index 1974f1e..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.subscription; - -import org.apache.distributedlog.DLSN; -import com.twitter.util.Future; -import scala.runtime.BoxedUnit; - -import java.io.Closeable; -import java.util.Map; - -/** - * Store to manage subscriptions - */ -public interface SubscriptionsStore extends Closeable { - - /** - * Get the last committed position stored for <i>subscriberId</i>. - * - * @param subscriberId - * subscriber id - * @return future representing last committed position. - */ - public Future<DLSN> getLastCommitPosition(String subscriberId); - - /** - * Get the last committed positions for all subscribers. - * - * @return future representing last committed positions for all subscribers. - */ - public Future<Map<String, DLSN>> getLastCommitPositions(); - - /** - * Advance the last committed position for <i>subscriberId</i>. - * - * @param subscriberId - * subscriber id. - * @param newPosition - * new committed position. - * @return future representing advancing result. - */ - public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition); - - /** - * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the - * data stored under this subscriber will be lost. - * @param subscriberId subscriber id - * @return future represent success or failure. - * return true only if there's such subscriber and we removed it successfully. - * return false if there's no such subscriber, or we failed to remove. - */ - public Future<Boolean> deleteSubscriber(String subscriberId); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java index efee3ca..ffe2303 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/thrift/package-info.java @@ -18,4 +18,4 @@ /** * Generated thrift code. */ -package org.apache.distributedlog.thrift; \ No newline at end of file +package org.apache.distributedlog.thrift; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java index 2c27088..64229d1 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java @@ -55,13 +55,15 @@ import java.util.regex.Pattern; import com.google.common.base.Preconditions; import org.apache.distributedlog.BKDistributedLogNamespace; import org.apache.distributedlog.Entry; -import org.apache.distributedlog.MetadataAccessor; +import org.apache.distributedlog.api.MetadataAccessor; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.callback.NamespaceListener; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Utils; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -86,16 +88,16 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; -import org.apache.distributedlog.AsyncLogReader; -import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.BookKeeperClientBuilder; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.DistributedLogConstants; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.exceptions.LogNotFoundException; -import org.apache.distributedlog.LogReader; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.LogSegmentMetadata; @@ -107,9 +109,7 @@ import org.apache.distributedlog.bk.LedgerAllocatorUtils; import org.apache.distributedlog.impl.metadata.BKDLConfig; import org.apache.distributedlog.metadata.MetadataUpdater; import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import org.apache.distributedlog.util.SchedulerUtils; -import com.twitter.util.Await; -import com.twitter.util.FutureEventListener; +import org.apache.distributedlog.common.util.SchedulerUtils; import static com.google.common.base.Charsets.UTF_8; @@ -163,7 +163,7 @@ public class DistributedLogTool extends Tool { protected URI uri; protected String zkAclId = null; protected boolean force = false; - protected DistributedLogNamespace namespace = null; + protected Namespace namespace = null; protected PerDLCommand(String name, String description) { super(name, description); @@ -252,9 +252,9 @@ public class DistributedLogTool extends Tool { this.force = force; } - protected DistributedLogNamespace getNamespace() throws IOException { + protected Namespace getNamespace() throws IOException { if (null == this.namespace) { - this.namespace = DistributedLogNamespaceBuilder.newBuilder() + this.namespace = NamespaceBuilder.newBuilder() .uri(getUri()) .conf(getConf()) .build(); @@ -464,7 +464,7 @@ public class DistributedLogTool extends Tool { return 0; } - protected void printStreams(DistributedLogNamespace namespace) throws Exception { + protected void printStreams(Namespace namespace) throws Exception { Iterator<String> streams = namespace.getLogs(); System.out.println("Streams under " + getUri() + " : "); System.out.println("--------------------------------"); @@ -536,7 +536,7 @@ public class DistributedLogTool extends Tool { System.out.println(""); } - protected void watchAndReportChanges(DistributedLogNamespace namespace) throws Exception { + protected void watchAndReportChanges(Namespace namespace) throws Exception { namespace.registerNamespaceListener(this); } } @@ -783,7 +783,7 @@ public class DistributedLogTool extends Tool { return truncateStreams(getNamespace()); } - private int truncateStreams(final DistributedLogNamespace namespace) throws Exception { + private int truncateStreams(final Namespace namespace) throws Exception { Iterator<String> streamCollection = namespace.getLogs(); final List<String> streams = new ArrayList<String>(); while (streamCollection.hasNext()) { @@ -827,7 +827,7 @@ public class DistributedLogTool extends Tool { return 0; } - private void truncateStreams(DistributedLogNamespace namespace, List<String> streams, + private void truncateStreams(Namespace namespace, List<String> streams, int tid, int numStreamsPerThreads) throws IOException { int startIdx = tid * numStreamsPerThreads; int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads); @@ -957,7 +957,7 @@ public class DistributedLogTool extends Tool { } private void printHeader(DistributedLogManager dlm) throws Exception { - DLSN firstDlsn = Await.result(dlm.getFirstDLSNAsync()); + DLSN firstDlsn = FutureUtils.result(dlm.getFirstDLSNAsync()); boolean endOfStreamMarked = dlm.isEndOfStreamMarked(); DLSN lastDlsn = dlm.getLastDLSN(); long firstTxnId = dlm.getFirstTxId(); @@ -1121,7 +1121,7 @@ public class DistributedLogTool extends Tool { } long countToLastRecord(DistributedLogManager dlm) throws Exception { - return Await.result(dlm.getLogRecordCountAsync(startDLSN)).longValue(); + return FutureUtils.result(dlm.getLogRecordCountAsync(startDLSN)).longValue(); } @Override @@ -1439,7 +1439,7 @@ public class DistributedLogTool extends Tool { AsyncLogReader reader; Object startOffset; try { - DLSN lastDLSN = Await.result(dlm.getLastDLSNAsync()); + DLSN lastDLSN = FutureUtils.result(dlm.getLastDLSNAsync()); System.out.println("Last DLSN : " + lastDLSN); if (null == fromDLSN) { reader = dlm.getAsyncLogReader(fromTxnId); @@ -1468,7 +1468,7 @@ public class DistributedLogTool extends Tool { private void dumpRecords(AsyncLogReader reader) throws Exception { int numRead = 0; - LogRecord record = Await.result(reader.readNext()); + LogRecord record = FutureUtils.result(reader.readNext()); while (record != null) { // dump the record dumpRecord(record); @@ -1476,7 +1476,7 @@ public class DistributedLogTool extends Tool { if (numRead >= count) { break; } - record = Await.result(reader.readNext()); + record = FutureUtils.result(reader.readNext()); } if (numRead == 0) { System.out.println("No records."); @@ -2641,18 +2641,18 @@ public class DistributedLogTool extends Tool { return truncateStream(getNamespace(), getStreamName(), dlsn); } - private int truncateStream(final DistributedLogNamespace namespace, String streamName, DLSN dlsn) throws Exception { + private int truncateStream(final Namespace namespace, String streamName, DLSN dlsn) throws Exception { DistributedLogManager dlm = namespace.openLog(streamName); try { long totalRecords = dlm.getLogRecordCount(); - long recordsAfterTruncate = Await.result(dlm.getLogRecordCountAsync(dlsn)); + long recordsAfterTruncate = FutureUtils.result(dlm.getLogRecordCountAsync(dlsn)); long recordsToTruncate = totalRecords - recordsAfterTruncate; if (!getForce() && !IOUtils.confirmPrompt("Do you want to truncate " + streamName + " at dlsn " + dlsn + " (" + recordsToTruncate + " records)?")) { return 0; } else { AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); try { - if (!Await.result(writer.truncate(dlsn))) { + if (!FutureUtils.result(writer.truncate(dlsn))) { System.out.println("Failed to truncate."); } return 0; @@ -2764,7 +2764,7 @@ public class DistributedLogTool extends Tool { return deleteSubscriber(getNamespace()); } - private int deleteSubscriber(final DistributedLogNamespace namespace) throws Exception { + private int deleteSubscriber(final Namespace namespace) throws Exception { Iterator<String> streamCollection = namespace.getLogs(); final List<String> streams = new ArrayList<String>(); while (streamCollection.hasNext()) { @@ -2809,7 +2809,7 @@ public class DistributedLogTool extends Tool { return 0; } - private void deleteSubscriber(DistributedLogNamespace namespace, List<String> streams, + private void deleteSubscriber(Namespace namespace, List<String> streams, int tid, int numStreamsPerThreads) throws Exception { int startIdx = tid * numStreamsPerThreads; int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads); @@ -2818,7 +2818,7 @@ public class DistributedLogTool extends Tool { DistributedLogManager dlm = namespace.openLog(s); final CountDownLatch countDownLatch = new CountDownLatch(1); dlm.getSubscriptionsStore().deleteSubscriber(subscriberId) - .addEventListener(new FutureEventListener<Boolean>() { + .whenComplete(new FutureEventListener<Boolean>() { @Override public void onFailure(Throwable cause) { System.out.println("Failed to delete subscriber for stream " + s); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java index c0da29a..a9b81e2 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Allocator.java @@ -17,12 +17,11 @@ */ package org.apache.distributedlog.util; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.io.AsyncCloseable; import org.apache.distributedlog.io.AsyncDeleteable; import org.apache.distributedlog.util.Transaction.OpListener; -import com.twitter.util.Future; - -import java.io.IOException; /** * A common interface to allocate <i>I</i> under transaction <i>T</i>. @@ -47,7 +46,7 @@ import java.io.IOException; * final Transaction<T> txn = ...; * * // Try obtain object I - * Future<I> tryObtainFuture = allocator.tryObtain(txn, new OpListener<I>() { + * CompletableFuture<I> tryObtainFuture = allocator.tryObtain(txn, new OpListener<I>() { * public void onCommit(I resource) { * // the obtain succeed, process with the resource * } @@ -97,6 +96,6 @@ public interface Allocator<I, T> extends AsyncCloseable, AsyncDeleteable { * transaction. * @return future result returning <i>I</i> that would be obtained under transaction <code>txn</code>. */ - Future<I> tryObtain(Transaction<T> txn, OpListener<I> listener); + CompletableFuture<I> tryObtain(Transaction<T> txn, OpListener<I> listener); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java index 41c2be7..aa7cefe 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/ConfUtils.java @@ -18,7 +18,7 @@ package org.apache.distributedlog.util; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.ConcurrentConstConfiguration; +import org.apache.distributedlog.common.config.ConcurrentConstConfiguration; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.commons.configuration.Configuration;