http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java deleted file mode 100644 index 0e5e6d4..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogSegmentMetadataStoreUpdater.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.google.common.base.Preconditions; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.LogRecordWithDLSN; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.util.Future; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; - -public class LogSegmentMetadataStoreUpdater implements MetadataUpdater { - - static final Logger LOG = LoggerFactory.getLogger(LogSegmentMetadataStoreUpdater.class); - - public static MetadataUpdater createMetadataUpdater(DistributedLogConfiguration conf, - LogSegmentMetadataStore metadataStore) { - return new LogSegmentMetadataStoreUpdater(conf, metadataStore); - } - - protected final LogSegmentMetadataStore metadataStore; - protected final LogSegmentMetadata.LogSegmentMetadataVersion metadataVersion; - - protected LogSegmentMetadataStoreUpdater(DistributedLogConfiguration conf, - LogSegmentMetadataStore metadataStore) { - this.metadataStore = metadataStore; - this.metadataVersion = LogSegmentMetadata.LogSegmentMetadataVersion.of(conf.getDLLedgerMetadataLayoutVersion()); - } - - private String formatLogSegmentSequenceNumber(long logSegmentSeqNo) { - return String.format("%018d", logSegmentSeqNo); - } - - @Override - public Transaction<Object> transaction() { - return metadataStore.transaction(); - } - - @Override - public Future<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata segment, - LogRecordWithDLSN record) { - DLSN dlsn = record.getDlsn(); - Preconditions.checkState(!segment.isInProgress(), - "Updating last dlsn for an inprogress log segment isn't supported."); - Preconditions.checkArgument(segment.isDLSNinThisSegment(dlsn), - "DLSN " + dlsn + " doesn't belong to segment " + segment); - final LogSegmentMetadata newSegment = segment.mutator() - .setLastDLSN(dlsn) - .setLastTxId(record.getTransactionId()) - .setRecordCount(record) - .build(); - return updateSegmentMetadata(newSegment); - } - - @Override - public Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment, - long logSegmentSeqNo) { - String newZkPath = segment.getZkPath() - .replace(formatLogSegmentSequenceNumber(segment.getLogSegmentSequenceNumber()), - formatLogSegmentSequenceNumber(logSegmentSeqNo)); - final LogSegmentMetadata newSegment = segment.mutator() - .setLogSegmentSequenceNumber(logSegmentSeqNo) - .setZkPath(newZkPath) - .build(); - return addNewSegmentAndDeleteOldSegment(newSegment, segment); - } - - /** - * Change the truncation status of a <i>log segment</i> to be active - * - * @param segment log segment to change truncation status to active. - * @return new log segment - */ - @Override - public Future<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata segment) { - final LogSegmentMetadata newSegment = segment.mutator() - .setTruncationStatus(LogSegmentMetadata.TruncationStatus.ACTIVE) - .build(); - return addNewSegmentAndDeleteOldSegment(newSegment, segment); - } - - /** - * Change the truncation status of a <i>log segment</i> to truncated - * - * @param segment log segment to change truncation status to truncated. - * @return new log segment - */ - @Override - public Future<LogSegmentMetadata> setLogSegmentTruncated(LogSegmentMetadata segment) { - final LogSegmentMetadata newSegment = segment.mutator() - .setTruncationStatus(LogSegmentMetadata.TruncationStatus.TRUNCATED) - .build(); - return addNewSegmentAndDeleteOldSegment(newSegment, segment); - } - - @Override - public LogSegmentMetadata setLogSegmentTruncated(Transaction<Object> txn, LogSegmentMetadata segment) { - final LogSegmentMetadata newSegment = segment.mutator() - .setTruncationStatus(LogSegmentMetadata.TruncationStatus.TRUNCATED) - .build(); - addNewSegmentAndDeleteOldSegment(txn, newSegment, segment); - return newSegment; - } - - /** - * Change the truncation status of a <i>log segment</i> to partially truncated - * - * @param segment log segment to change sequence number. - * @param minActiveDLSN DLSN within the log segment before which log has been truncated - * @return new log segment - */ - @Override - public Future<LogSegmentMetadata> setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, DLSN minActiveDLSN) { - final LogSegmentMetadata newSegment = segment.mutator() - .setTruncationStatus(LogSegmentMetadata.TruncationStatus.PARTIALLY_TRUNCATED) - .setMinActiveDLSN(minActiveDLSN) - .build(); - return addNewSegmentAndDeleteOldSegment(newSegment, segment); - } - - @Override - public LogSegmentMetadata setLogSegmentPartiallyTruncated(Transaction<Object> txn, - LogSegmentMetadata segment, - DLSN minActiveDLSN) { - final LogSegmentMetadata newSegment = segment.mutator() - .setTruncationStatus(LogSegmentMetadata.TruncationStatus.PARTIALLY_TRUNCATED) - .setMinActiveDLSN(minActiveDLSN) - .build(); - addNewSegmentAndDeleteOldSegment(txn, newSegment, segment); - return newSegment; - } - - protected Future<LogSegmentMetadata> updateSegmentMetadata(final LogSegmentMetadata segment) { - Transaction<Object> txn = transaction(); - metadataStore.updateLogSegment(txn, segment); - return txn.execute().map(new AbstractFunction1<Void, LogSegmentMetadata>() { - @Override - public LogSegmentMetadata apply(Void value) { - return segment; - } - }); - } - - protected Future<LogSegmentMetadata> addNewSegmentAndDeleteOldSegment( - final LogSegmentMetadata newSegment, LogSegmentMetadata oldSegment) { - LOG.info("old segment {} new segment {}", oldSegment, newSegment); - Transaction<Object> txn = transaction(); - addNewSegmentAndDeleteOldSegment(txn, newSegment, oldSegment); - return txn.execute().map(new AbstractFunction1<Void, LogSegmentMetadata>() { - @Override - public LogSegmentMetadata apply(Void value) { - return newSegment; - } - }); - } - - protected void addNewSegmentAndDeleteOldSegment(Transaction<Object> txn, - LogSegmentMetadata newSegment, - LogSegmentMetadata oldSegment) { - metadataStore.deleteLogSegment(txn, oldSegment, null); - metadataStore.createLogSegment(txn, newSegment, null); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java deleted file mode 100644 index 7242a5e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogStreamMetadataStore.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.google.common.annotations.Beta; -import com.google.common.base.Optional; -import com.twitter.distributedlog.lock.DistributedLock; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.util.PermitManager; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.util.Future; - -import java.io.Closeable; -import java.net.URI; - -/** - * The interface to manage the log stream metadata. The implementation is responsible - * for creating the metadata layout. - */ -@Beta -public interface LogStreamMetadataStore extends Closeable { - - /** - * Create a transaction for the metadata operations happening in the metadata store. - * - * @return transaction for the metadata operations - */ - Transaction<Object> newTransaction(); - - /** - * Ensure the existence of a log stream - * - * @param uri the location of the log stream - * @param logName the name of the log stream - * @return future represents the existence of a log stream. {@link com.twitter.distributedlog.LogNotFoundException} - * is thrown if the log doesn't exist - */ - Future<Void> logExists(URI uri, String logName); - - /** - * Create the read lock for the log stream. - * - * @param metadata the metadata for a log stream - * @param readerId the reader id used for lock - * @return the read lock - */ - Future<DistributedLock> createReadLock(LogMetadataForReader metadata, - Optional<String> readerId); - - /** - * Create the write lock for the log stream. - * - * @param metadata the metadata for a log stream - * @return the write lock - */ - DistributedLock createWriteLock(LogMetadataForWriter metadata); - - /** - * Create the metadata of a log. - * - * @param uri the location to store the metadata of the log - * @param streamName the name of the log stream - * @param ownAllocator whether to use its own allocator or external allocator - * @param createIfNotExists flag to create the stream if it doesn't exist - * @return the metadata of the log - */ - Future<LogMetadataForWriter> getLog(URI uri, - String streamName, - boolean ownAllocator, - boolean createIfNotExists); - - /** - * Delete the metadata of a log. - * - * @param uri the location to store the metadata of the log - * @param streamName the name of the log stream - * @return future represents the result of the deletion. - */ - Future<Void> deleteLog(URI uri, String streamName); - - /** - * Get the log segment metadata store. - * - * @return the log segment metadata store. - */ - LogSegmentMetadataStore getLogSegmentMetadataStore(); - - /** - * Get the permit manager for this metadata store. It can be used for limiting the concurrent - * metadata operations. The implementation can disable handing over the permits when the metadata - * store is unavailable (for example zookeeper session expired). - * - * @return the permit manager - */ - PermitManager getPermitManager(); - - - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataResolver.java deleted file mode 100644 index 417cab8..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataResolver.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import java.io.IOException; -import java.net.URI; - -/** - * Resolver to resolve the metadata used to instantiate a DL instance. - * - * <p> - * E.g. we stored a common dl config under /messaging/distributedlog to use - * bookkeeper cluster x. so all the distributedlog instances under this path - * inherit this dl config. if a dl D is allocated under /messaging/distributedlog, - * but use a different cluster y, so its metadata is stored /messaging/distributedlog/D. - * The resolver resolve the URI - * </p> - * - * <p> - * The resolver looks up the uri path and tries to interpret the path segments from - * bottom-to-top to see if there is a DL metadata bound. It stops when it found valid - * dl metadata. - * </p> - */ -public interface MetadataResolver { - - /** - * Resolve the path to get the DL metadata. - * - * @param uri - * dl uri - * @return dl metadata. - * @throws IOException - */ - public DLMetadata resolve(URI uri) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataUpdater.java deleted file mode 100644 index b98f168..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/MetadataUpdater.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.LogRecordWithDLSN; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.util.Future; - -/** - * An updater to update metadata. It contains utility functions on mutating metadata. - */ -public interface MetadataUpdater { - - /** - * Start a transaction on metadata updates - * - * @return transaction - */ - Transaction<Object> transaction(); - - /** - * Update the log segment metadata with correct last <i>record</i>. - * - * @param segment - * log segment to update last dlsn. - * @param record - * correct last record. - * @return new log segment - */ - Future<LogSegmentMetadata> updateLastRecord(LogSegmentMetadata segment, - LogRecordWithDLSN record); - - /** - * Change ledger sequence number of <i>segment</i> to given <i>logSegmentSeqNo</i>. - * - * @param segment - * log segment to change sequence number. - * @param logSegmentSeqNo - * ledger sequence number to change. - * @return new log segment - */ - Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment, - long logSegmentSeqNo); - - /** - * Change the truncation status of a <i>log segment</i> to be active - * - * @param segment - * log segment to change truncation status to active. - * @return new log segment - */ - Future<LogSegmentMetadata> setLogSegmentActive(LogSegmentMetadata segment); - - /** - * Change the truncation status of a <i>log segment</i> to truncated - * - * @param segment - * log segment to change truncation status to truncated. - * @return new log segment - */ - Future<LogSegmentMetadata> setLogSegmentTruncated(LogSegmentMetadata segment); - - /** - * Change the truncation status of a <i>log segment</i> to truncated. The operation won't be executed - * immediately. The update only happens after {@link Transaction#execute()}. - * - * @param txn - * transaction used to set the log segment status - * @param segment - * segment to set truncation status to truncated - * @return log segment that truncation status is set to truncated. - */ - LogSegmentMetadata setLogSegmentTruncated(Transaction<Object> txn, LogSegmentMetadata segment); - - /** - * Change the truncation status of a <i>log segment</i> to partially truncated - * - * @param segment - * log segment to change sequence number. - * @param minActiveDLSN - * DLSN within the log segment before which log has been truncated - * @return new log segment - */ - Future<LogSegmentMetadata> setLogSegmentPartiallyTruncated(LogSegmentMetadata segment, - DLSN minActiveDLSN); - - /** - * Change the truncation status of a <i>log segment</i> to partially truncated. The operation won't be - * executed until {@link Transaction#execute()}. - * - * @param txn - * transaction used to set the log segment status - * @param segment - * segment to set truncation status to partially truncated - * @param minActiveDLSN - * DLSN within the log segment before which log has been truncated - * @return log segment that truncation status has been set to partially truncated - */ - LogSegmentMetadata setLogSegmentPartiallyTruncated(Transaction<Object> txn, - LogSegmentMetadata segment, - DLSN minActiveDLSN); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/package-info.java deleted file mode 100644 index f740c77..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Metadata management for distributedlog. - */ -package com.twitter.distributedlog.metadata; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java deleted file mode 100644 index 5d1d888..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.namespace; - -import com.google.common.annotations.Beta; -import com.google.common.base.Optional; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * A namespace is the basic unit for managing a set of distributedlogs. - * - * <h4>Namespace Interface</h4> - * - * <P> - * The <code>DistributedLogNamespace</code> interface is implemented by different backend providers. - * There are several components are required for an implementation: - * <OL> - * <LI>Log Management -- manage logs in a given namespace. e.g. create/open/delete log, list of logs, - * watch the changes of logs. - * <LI>Access Control -- manage the access controls for logs in the namespace. - * </OL> - * </P> - * - * <h4>Namespace Location</h4> - * - * At the highest level, a <code>DistributedLogNamespace</code> is located by a <code>URI</code>. The location - * URI is in string form has the syntax - * - * <blockquote> - * distributedlog[<tt><b>-</b></tt><i>provider</i>]<tt><b>:</b></tt><i>provider-specific-path</i> - * </blockquote> - * - * where square brackets [...] delineate optional components and the characters <tt><b>-</b></tt> and <tt><b>:</b></tt> - * stand for themselves. - * - * The <code>provider</code> part in the URI indicates what is the backend used for this namespace. For example: - * <i>distributedlog-bk</i> URI is storing logs in bookkeeper, while <i>distributedlog-mem</i> URI is storing logs in - * memory. The <code>provider</code> part is optional. It would use bookkeeper backend if the <i>provider</i> part - * is omitted. - * - * @see DistributedLogManager - * @since 0.3.32 - */ -@Beta -public interface DistributedLogNamespace { - - /** - * Get the namespace driver used by this namespace. - * - * @return namespace driver - */ - NamespaceDriver getNamespaceDriver(); - - // - // Method to operate logs - // - - /** - * Create a log named <i>logName</i>. - * - * @param logName - * name of the log - * @throws InvalidStreamNameException if log name is invalid. - * @throws IOException when encountered issues with backend. - */ - void createLog(String logName) - throws InvalidStreamNameException, IOException; - - /** - * Delete a log named <i>logName</i>. - * - * @param logName - * name of the log - * @throws InvalidStreamNameException if log name is invalid - * @throws LogNotFoundException if log doesn't exist - * @throws IOException when encountered issues with backend - */ - void deleteLog(String logName) - throws InvalidStreamNameException, LogNotFoundException, IOException; - - /** - * Open a log named <i>logName</i>. - * A distributedlog manager is returned to access log <i>logName</i>. - * - * @param logName - * name of the log - * @return distributedlog manager instance. - * @throws InvalidStreamNameException if log name is invalid. - * @throws IOException when encountered issues with backend. - */ - DistributedLogManager openLog(String logName) - throws InvalidStreamNameException, IOException; - - /** - * Open a log named <i>logName</i> with specific log configurations. - * - * <p>This method allows the caller to override global configuration settings by - * supplying log configuration overrides. Log config overrides come in two flavors, - * static and dynamic. Static config never changes in the lifecyle of <code>DistributedLogManager</code>, - * dynamic config changes by reloading periodically and safe to access from any context.</p> - * - * @param logName - * name of the log - * @param logConf - * static log configuration - * @param dynamicLogConf - * dynamic log configuration - * @return distributedlog manager instance. - * @throws InvalidStreamNameException if log name is invalid. - * @throws IOException when encountered issues with backend. - */ - DistributedLogManager openLog(String logName, - Optional<DistributedLogConfiguration> logConf, - Optional<DynamicDistributedLogConfiguration> dynamicLogConf, - Optional<StatsLogger> perStreamStatsLogger) - throws InvalidStreamNameException, IOException; - - /** - * Check whether the log <i>logName</i> exist. - * - * @param logName - * name of the log - * @return <code>true</code> if the log exists, otherwise <code>false</code>. - * @throws IOException when encountered exceptions on checking - */ - boolean logExists(String logName) - throws IOException; - - /** - * Retrieve the logs under the namespace. - * - * @return iterator of the logs under the namespace. - * @throws IOException when encountered issues with backend. - */ - Iterator<String> getLogs() - throws IOException; - - // - // Methods for namespace - // - - /** - * Register namespace listener on stream updates under the namespace. - * - * @param listener - * listener to receive stream updates under the namespace - */ - void registerNamespaceListener(NamespaceListener listener); - - /** - * Create an access control manager to manage/check acl for logs. - * - * @return access control manager for logs under the namespace. - * @throws IOException - */ - AccessControlManager createAccessControlManager() - throws IOException; - - /** - * Close the namespace. - */ - void close(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java deleted file mode 100644 index 07b3848..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespaceBuilder.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.namespace; - -import com.google.common.base.Preconditions; -import com.twitter.distributedlog.BKDistributedLogNamespace; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.feature.CoreFeatureKeys; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.injector.AsyncRandomFailureInjector; -import com.twitter.distributedlog.util.ConfUtils; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.PermitLimiter; -import com.twitter.distributedlog.util.SimplePermitLimiter; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.SettableFeatureProvider; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; - -/** - * Builder to construct a <code>DistributedLogNamespace</code>. - * The builder takes the responsibility of loading backend according to the uri. - * - * @see DistributedLogNamespace - * @since 0.3.32 - */ -public class DistributedLogNamespaceBuilder { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogNamespaceBuilder.class); - - public static DistributedLogNamespaceBuilder newBuilder() { - return new DistributedLogNamespaceBuilder(); - } - - private DistributedLogConfiguration _conf = null; - private DynamicDistributedLogConfiguration _dynConf = null; - private URI _uri = null; - private StatsLogger _statsLogger = NullStatsLogger.INSTANCE; - private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE; - private FeatureProvider _featureProvider = null; - private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID; - private int _regionId = DistributedLogConstants.LOCAL_REGION_ID; - - // private constructor - private DistributedLogNamespaceBuilder() {} - - /** - * DistributedLog Configuration used for the namespace. - * - * @param conf - * distributedlog configuration - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder conf(DistributedLogConfiguration conf) { - this._conf = conf; - return this; - } - - /** - * Dynamic DistributedLog Configuration used for the namespace - * - * @param dynConf dynamic distributedlog configuration - * @return namespace builder - */ - public DistributedLogNamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) { - this._dynConf = dynConf; - return this; - } - - /** - * Namespace Location. - * - * @param uri - * namespace location uri. - * @see DistributedLogNamespace - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder uri(URI uri) { - this._uri = uri; - return this; - } - - /** - * Stats Logger used for stats collection - * - * @param statsLogger - * stats logger - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder statsLogger(StatsLogger statsLogger) { - this._statsLogger = statsLogger; - return this; - } - - /** - * Stats Logger used for collecting per log stats. - * - * @param statsLogger - * stats logger for collecting per log stats - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder perLogStatsLogger(StatsLogger statsLogger) { - this._perLogStatsLogger = statsLogger; - return this; - } - - /** - * Feature provider used to control the availabilities of features in the namespace. - * - * @param featureProvider - * feature provider to control availabilities of features. - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder featureProvider(FeatureProvider featureProvider) { - this._featureProvider = featureProvider; - return this; - } - - /** - * Client Id used for accessing the namespace - * - * @param clientId - * client id used for accessing the namespace - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder clientId(String clientId) { - this._clientId = clientId; - return this; - } - - /** - * Region Id used for encoding logs in the namespace. The region id - * is useful when the namespace is globally spanning over regions. - * - * @param regionId - * region id. - * @return namespace builder. - */ - public DistributedLogNamespaceBuilder regionId(int regionId) { - this._regionId = regionId; - return this; - } - - @SuppressWarnings("deprecation") - private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - DistributedLogConfiguration conf) { - StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger; - if (perLogStatsLogger == NullStatsLogger.INSTANCE && - conf.getEnablePerStreamStat()) { - normalizedPerLogStatsLogger = statsLogger.scope("stream"); - } - return normalizedPerLogStatsLogger; - } - - /** - * Build the namespace. - * - * @return the namespace instance. - * @throws IllegalArgumentException when there is illegal argument provided in the builder - * @throws NullPointerException when there is null argument provided in the builder - * @throws IOException when fail to build the backend - */ - public DistributedLogNamespace build() - throws IllegalArgumentException, NullPointerException, IOException { - // Check arguments - Preconditions.checkNotNull(_conf, "No DistributedLog Configuration."); - Preconditions.checkNotNull(_uri, "No DistributedLog URI"); - - // validate the configuration - _conf.validate(); - if (null == _dynConf) { - _dynConf = ConfUtils.getConstDynConf(_conf); - } - - // retrieve the namespace driver - NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri); - URI normalizedUri = DLUtils.normalizeURI(_uri); - - // build the feature provider - FeatureProvider featureProvider; - if (null == _featureProvider) { - featureProvider = new SettableFeatureProvider("", 0); - logger.info("No feature provider is set. All features are disabled now."); - } else { - featureProvider = _featureProvider; - } - - // build the failure injector - AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder() - .injectDelays(_conf.getEIInjectReadAheadDelay(), - _conf.getEIInjectReadAheadDelayPercent(), - _conf.getEIInjectMaxReadAheadDelayMs()) - .injectErrors(false, 10) - .injectStops(_conf.getEIInjectReadAheadStall(), 10) - .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries()) - .build(); - - // normalize the per log stats logger - StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf); - - // build the scheduler - StatsLogger schedulerStatsLogger = _statsLogger.scope("factory").scope("thread_pool"); - OrderedScheduler scheduler = OrderedScheduler.newBuilder() - .name("DLM-" + normalizedUri.getPath()) - .corePoolSize(_conf.getNumWorkerThreads()) - .statsLogger(schedulerStatsLogger) - .perExecutorStatsLogger(schedulerStatsLogger) - .traceTaskExecution(_conf.getEnableTaskExecutionStats()) - .traceTaskExecutionWarnTimeUs(_conf.getTaskExecutionWarnTimeMicros()) - .build(); - - // initialize the namespace driver - driver.initialize( - _conf, - _dynConf, - normalizedUri, - scheduler, - featureProvider, - failureInjector, - _statsLogger, - perLogStatsLogger, - DLUtils.normalizeClientId(_clientId), - _regionId); - - // initialize the write limiter - PermitLimiter writeLimiter; - if (_conf.getGlobalOutstandingWriteLimit() < 0) { - writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER; - } else { - Feature disableWriteLimitFeature = featureProvider.getFeature( - CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase()); - writeLimiter = new SimplePermitLimiter( - _conf.getOutstandingWriteLimitDarkmode(), - _conf.getGlobalOutstandingWriteLimit(), - _statsLogger.scope("writeLimiter"), - true /* singleton */, - disableWriteLimitFeature); - } - - return new BKDistributedLogNamespace( - _conf, - normalizedUri, - driver, - scheduler, - featureProvider, - writeLimiter, - failureInjector, - _statsLogger, - perLogStatsLogger, - DLUtils.normalizeClientId(_clientId), - _regionId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java deleted file mode 100644 index 738f124..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriver.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.namespace; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.MetadataAccessor; -import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.metadata.LogMetadataStore; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.subscription.SubscriptionsStore; -import com.twitter.distributedlog.util.OrderedScheduler; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.StatsLogger; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; - -/** - * Manager to manage all the stores required by a namespace. - */ -public interface NamespaceDriver extends Closeable { - - enum Role { - WRITER, - READER - } - - /** - * Initialize the namespace manager. - * - * @param conf distributedlog configuration - * @param dynConf dynamic distributedlog configuration - * @param namespace root uri of the namespace - * @param scheduler ordered scheduler - * @param featureProvider feature provider - * @param statsLogger stats logger - * @param perLogStatsLogger per log stream stats logger - * @param clientId client id - * @return namespace manager - * @throws IOException when failed to initialize the namespace manager - */ - NamespaceDriver initialize(DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - URI namespace, - OrderedScheduler scheduler, - FeatureProvider featureProvider, - AsyncFailureInjector failureInjector, - StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - String clientId, - int regionId) throws IOException; - - /** - * Get the scheme of the namespace driver. - * - * @return the scheme of the namespace driver. - */ - String getScheme(); - - /** - * Get the root uri of the namespace driver. - * - * @return the root uri of the namespace driver. - */ - URI getUri(); - - /** - * Retrieve the log {@code metadata store} used by the namespace. - * - * @return the log metadata store - */ - LogMetadataStore getLogMetadataStore(); - - /** - * Retrieve the log stream {@code metadata store} used by the namespace. - * - * @param role the role to retrieve the log stream metadata store. - * @return the log stream metadata store - */ - LogStreamMetadataStore getLogStreamMetadataStore(Role role); - - /** - * Retrieve the log segment {@code entry store} used by the namespace. - * - * @param role the role to retrieve the log segment entry store. - * @return the log segment entry store. - * @throws IOException when failed to open log segment entry store. - */ - LogSegmentEntryStore getLogSegmentEntryStore(Role role); - - /** - * Create an access control manager to manage/check acl for logs. - * - * @return access control manager for logs under the namespace. - * @throws IOException - */ - AccessControlManager getAccessControlManager() - throws IOException; - - /** - * Retrieve the metadata accessor for log stream {@code streamName}. - * (TODO: it is a legacy interface. should remove it if we have metadata of stream.) - * - * @param streamName name of log stream. - * @return metadata accessor for log stream {@code streamName}. - */ - MetadataAccessor getMetadataAccessor(String streamName) - throws InvalidStreamNameException, IOException; - - /** - * Retrieve the subscriptions store for log stream {@code streamName}. - * - * @return the subscriptions store for log stream {@code streamName} - */ - SubscriptionsStore getSubscriptionsStore(String streamName); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java deleted file mode 100644 index 79945ad..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceDriverManager.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.namespace; - -import com.google.common.base.Objects; -import com.google.common.collect.Sets; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.impl.BKNamespaceDriver; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static com.google.common.base.Preconditions.*; - -/** - * The basic service for managing a set of namespace drivers. - */ -public class NamespaceDriverManager { - - private static final Logger logger = LoggerFactory.getLogger(NamespaceDriverManager.class); - - static class NamespaceDriverInfo { - - final Class<? extends NamespaceDriver> driverClass; - final String driverClassName; - - NamespaceDriverInfo(Class<? extends NamespaceDriver> driverClass) { - this.driverClass = driverClass; - this.driverClassName = this.driverClass.getName(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("driver[") - .append(driverClassName) - .append("]"); - return sb.toString(); - } - } - - private static final ConcurrentMap<String, NamespaceDriverInfo> drivers; - private static boolean initialized = false; - - static { - drivers = new ConcurrentHashMap<String, NamespaceDriverInfo>(); - initialize(); - } - - static void initialize() { - if (initialized) { - return; - } - loadInitialDrivers(); - initialized = true; - logger.info("DistributedLog NamespaceDriverManager initialized"); - } - - private static void loadInitialDrivers() { - Set<String> driverList = Sets.newHashSet(); - // add default bookkeeper based driver - driverList.add(BKNamespaceDriver.class.getName()); - // load drivers from system property - String driversStr = System.getProperty("distributedlog.namespace.drivers"); - if (null != driversStr) { - String[] driversArray = StringUtils.split(driversStr, ':'); - for (String driver : driversArray) { - driverList.add(driver); - } - } - // initialize the drivers - for (String driverClsName : driverList) { - try { - NamespaceDriver driver = - ReflectionUtils.newInstance(driverClsName, NamespaceDriver.class); - NamespaceDriverInfo driverInfo = new NamespaceDriverInfo(driver.getClass()); - drivers.put(driver.getScheme().toLowerCase(), driverInfo); - } catch (Exception ex) { - logger.warn("Failed to load namespace driver {} : ", driverClsName, ex); - } - } - } - - /** - * Prevent the NamespaceDriverManager class from being instantiated. - */ - private NamespaceDriverManager() {} - - /** - * Register the namespace {@code driver}. - * - * @param driver the namespace driver - * @return the namespace driver manager - */ - public static void registerDriver(String backend, Class<? extends NamespaceDriver> driver) { - if (!initialized) { - initialize(); - } - - String scheme = backend.toLowerCase(); - NamespaceDriverInfo oldDriverInfo = drivers.get(scheme); - if (null != oldDriverInfo) { - return; - } - NamespaceDriverInfo newDriverInfo = new NamespaceDriverInfo(driver); - oldDriverInfo = drivers.putIfAbsent(scheme, newDriverInfo); - if (null != oldDriverInfo) { - logger.debug("Driver for {} is already there.", scheme); - } - } - - /** - * Retrieve the namespace driver for {@code scheme}. - * - * @param scheme the scheme for the namespace driver - * @return the namespace driver - * @throws NullPointerException when scheme is null - */ - public static NamespaceDriver getDriver(String scheme) { - checkNotNull(scheme, "Driver Scheme is null"); - if (!initialized) { - initialize(); - } - NamespaceDriverInfo driverInfo = drivers.get(scheme.toLowerCase()); - if (null == driverInfo) { - throw new IllegalArgumentException("Unknown backend " + scheme); - } - return ReflectionUtils.newInstance(driverInfo.driverClass); - } - - /** - * Retrieve the namespace driver for {@code uri}. - * - * @param uri the distributedlog uri - * @return the namespace driver for {@code uri} - * @throws NullPointerException if the distributedlog {@code uri} is null or doesn't have scheme - * or there is no namespace driver registered for the scheme - * @throws IllegalArgumentException if the distributedlog {@code uri} scheme is illegal - */ - public static NamespaceDriver getDriver(URI uri) { - // Validate the uri and load the backend according to scheme - checkNotNull(uri, "DistributedLog uri is null"); - String scheme = uri.getScheme(); - checkNotNull(scheme, "Invalid distributedlog uri : " + uri); - scheme = scheme.toLowerCase(); - String[] schemeParts = StringUtils.split(scheme, '-'); - checkArgument(schemeParts.length > 0, - "Invalid distributedlog scheme found : " + uri); - checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()), - "Unknown distributedlog scheme found : " + uri); - // bookkeeper is the default backend - String backend = DistributedLogConstants.BACKEND_BK; - if (schemeParts.length > 1) { - backend = schemeParts[1]; - } - return getDriver(backend); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceWatcher.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceWatcher.java deleted file mode 100644 index f836520..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/NamespaceWatcher.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.namespace; - -import com.twitter.distributedlog.callback.NamespaceListener; - -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * Namespace Watcher watching namespace changes. - */ -public abstract class NamespaceWatcher { - - protected final CopyOnWriteArraySet<NamespaceListener> listeners = - new CopyOnWriteArraySet<NamespaceListener>(); - - /** - * Register listener for namespace changes. - * - * @param listener - * listener to add - */ - public void registerListener(NamespaceListener listener) { - if (listeners.add(listener)) { - watchNamespaceChanges(); - } - } - - /** - * Unregister listener from the namespace watcher. - * - * @param listener - * listener to remove from namespace watcher - */ - public void unregisterListener(NamespaceListener listener) { - listeners.remove(listener); - } - - /** - * Watch the namespace changes. It would be triggered each time - * a namspace listener is added. The implementation should handle - * this. - */ - protected abstract void watchNamespaceChanges(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/package-info.java deleted file mode 100644 index d659f44..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Namespace - */ -package com.twitter.distributedlog.namespace; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolver.java deleted file mode 100644 index 2298faf..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolver.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.net; - -import org.apache.bookkeeper.net.DNSToSwitchMapping; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Abstract DNS resolver for bookkeeper ensemble placement. - */ -public abstract class DNSResolver implements DNSToSwitchMapping { - static final Logger LOG = LoggerFactory.getLogger(DNSResolver.class); - - protected final ConcurrentMap<String, String> domainNameToNetworkLocation = - new ConcurrentHashMap<String, String>(); - - protected final ConcurrentMap<String, String> hostNameToRegion = - new ConcurrentHashMap<String, String>(); - - /** - * Construct the default dns resolver without host-region overrides. - */ - public DNSResolver() { - this(""); - } - - /** - * Construct the dns resolver with host-region overrides. - * <p> - * <i>hostRegionOverrides</i> is a string of pairs of host-region mapping - * (host:region) separated by ';'. during dns resolution, the host will be resolved - * to override region. example: <i>host1:region1;host2:region2;...</i> - * - * @param hostRegionOverrides - * pairs of host-region mapping separated by ';' - */ - public DNSResolver(String hostRegionOverrides) { - if (StringUtils.isNotBlank(hostRegionOverrides)) { - // Host Region Overrides are of the form - // HN1:R1;HN2:R2;... - String[] overrides = hostRegionOverrides.split(";"); - - for (String override : overrides) { - String[] parts = override.split(":"); - if (parts.length != 2) { - LOG.warn("Incorrect override specified", override); - } else { - hostNameToRegion.putIfAbsent(parts[0], parts[1]); - } - } - } // otherwise, no overrides were specified - } - - /** - * {@inheritDoc} - */ - @Override - public List<String> resolve(List<String> names) { - List<String> networkLocations = new ArrayList<String>(names.size()); - for (String name : names) { - networkLocations.add(resolve(name)); - } - return networkLocations; - } - - private String resolve(String domainName) { - String networkLocation = domainNameToNetworkLocation.get(domainName); - if (null == networkLocation) { - networkLocation = resolveToNetworkLocation(domainName); - domainNameToNetworkLocation.put(domainName, networkLocation); - } - return networkLocation; - } - - /** - * Resolve the <code>domainName</code> to its network location. - * - * @param domainName - * domain name - * @return the network location of <i>domainName</i> - */ - protected abstract String resolveToNetworkLocation(String domainName); - - /** - * {@inheritDoc} - */ - @Override - public void reloadCachedMappings() { - domainNameToNetworkLocation.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRacks.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRacks.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRacks.java deleted file mode 100644 index a0298d0..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRacks.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.net; - -/** - * Resolve the dns by racks. - * <p> - * It resolves domain name like `(region)-(rack)-xxx-xxx.*` to network location - * `/(region)/(rack)`. If resolution failed, it returns `/default-region/default-rack`. - * <p> - * region could be override in <code>hostRegionOverrides</code>. for example, if the - * host name is <i>regionA-rack1-xx-yyy</i>, it would be resolved to `/regionA/rack1` - * without any overrides. If the specified overrides is <i>regionA-rack1-xx-yyy:regionB</i>, - * the resolved network location would be <i>/regionB/rack1</i>. - * <p> - * Region overrides provide optimization hits to bookkeeper if two `logical` regions are - * in same or close locations. - * - * @see DNSResolver#DNSResolver(String) - */ -public class DNSResolverForRacks extends DNSResolver { - static final String DEFAULT_RACK = "/default-region/default-rack"; - - public DNSResolverForRacks() { - } - - public DNSResolverForRacks(String hostRegionOverrides) { - super(hostRegionOverrides); - } - - @Override - protected String resolveToNetworkLocation(String domainName) { - String[] parts = domainName.split("\\."); - if (parts.length <= 0) { - return DEFAULT_RACK; - } - - String hostName = parts[0]; - String[] labels = hostName.split("-"); - if (labels.length != 4) { - return DEFAULT_RACK; - } - - String region = hostNameToRegion.get(hostName); - if (null == region) { - region = labels[0]; - } - - return String.format("/%s/%s", region, labels[1]); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRows.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRows.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRows.java deleted file mode 100644 index f585640..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/DNSResolverForRows.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.net; - -/** - * Resolve the dns by rows. - * <p> - * It resolves domain name like `(region)-(row)xx-xxx-xxx.*` to network location - * `/(region)/(row)`. If resolution failed, it returns `/default-region/default-row`. - * <p> - * region could be override in <code>hostRegionOverrides</code>. for example, if the - * host name is <i>regionA-row1-xx-yyy</i>, it would be resolved to `/regionA/row1` - * without any overrides. If the specified overrides is <i>regionA-row1-xx-yyy:regionB</i>, - * the resolved network location would be <i>/regionB/row1</i>. - * <p> - * Region overrides provide optimization hits to bookkeeper if two `logical` regions are - * in same or close locations. - * - * @see DNSResolver#DNSResolver(String) - */ -public class DNSResolverForRows extends DNSResolver { - static final String DEFAULT_ROW = "/default-region/default-row"; - - public DNSResolverForRows() { - } - - public DNSResolverForRows(String hostRegionOverrides) { - super(hostRegionOverrides); - } - - @Override - protected String resolveToNetworkLocation(String domainName) { - String[] parts = domainName.split("\\."); - if (parts.length <= 0) { - return DEFAULT_ROW; - } - String hostName = parts[0]; - String[] labels = hostName.split("-"); - if (labels.length != 4) { - return DEFAULT_ROW; - } - - String region = hostNameToRegion.get(hostName); - if (null == region) { - region = labels[0]; - } - - final String rack = labels[1]; - - if (rack.length() < 2) { - // Default to rack name if the rack name format cannot be recognized - return String.format("/%s/%s", region, rack); - } else { - return String.format("/%s/%s", region, rack.substring(0, 2)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/net/NetUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/NetUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/net/NetUtils.java deleted file mode 100644 index ce0d360..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/NetUtils.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.net; - -import org.apache.bookkeeper.net.DNSToSwitchMapping; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; - -/** - * Utils about network - */ -public class NetUtils { - - /** - * Get the dns resolver from class <code>resolverClassName</code> with optional - * <code>hostRegionOverrides</code>. - * <p> - * It would try to load the class with the constructor with <code>hostRegionOverrides</code>. - * If it fails, it would fall back to load the class with default empty constructor. - * The interpretion of <code>hostRegionOverrides</code> is up to the implementation. - * - * @param resolverCls - * resolver class - * @param hostRegionOverrides - * host region overrides - * @return dns resolver - */ - public static DNSToSwitchMapping getDNSResolver(Class<? extends DNSToSwitchMapping> resolverCls, - String hostRegionOverrides) { - // first try to construct the dns resolver with overrides - Constructor<? extends DNSToSwitchMapping> constructor; - Object[] parameters; - try { - constructor = resolverCls.getDeclaredConstructor(String.class); - parameters = new Object[] { hostRegionOverrides }; - } catch (NoSuchMethodException nsme) { - // no constructor with overrides - try { - constructor = resolverCls.getDeclaredConstructor(); - parameters = new Object[0]; - } catch (NoSuchMethodException nsme1) { - throw new RuntimeException("Unable to find constructor for dns resolver " - + resolverCls, nsme1); - } - } - constructor.setAccessible(true); - try { - return constructor.newInstance(parameters); - } catch (InstantiationException ie) { - throw new RuntimeException("Unable to instantiate dns resolver " + resolverCls, ie); - } catch (IllegalAccessException iae) { - throw new RuntimeException("Illegal access to dns resolver " + resolverCls, iae); - } catch (InvocationTargetException ite) { - throw new RuntimeException("Unable to construct dns resolver " + resolverCls, ite); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/net/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/net/package-info.java deleted file mode 100644 index 9093fef..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/net/package-info.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package contains all the utilities of network. - * - * <h2>DNSResolver</h2> - * - * DNS resolver is the utility to resolve host name to a string which represents this host's network location. - * BookKeeper will use such network locations to place ensemble to ensure rack or region diversity to ensure - * data availability in the case of switch/router/region is down. - * <p> - * Available dns resolvers: - * <ul> - * <li>{@link com.twitter.distributedlog.net.DNSResolverForRacks} - * <li>{@link com.twitter.distributedlog.net.DNSResolverForRows} - * </ul> - */ -package com.twitter.distributedlog.net; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/package-info.java deleted file mode 100644 index 4c1fe57..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Protocol & Core of DistributedLog - */ -package com.twitter.distributedlog; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRate.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRate.java deleted file mode 100644 index 98eae00..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRate.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.rate; - -public interface MovingAverageRate { - double get(); - void add(long amount); - void inc(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java deleted file mode 100644 index a77f753..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.rate; - -import com.twitter.util.Duration; -import com.twitter.util.Function0; -import com.twitter.util.TimerTask; -import com.twitter.util.Timer; -import com.twitter.util.Time; -import java.util.concurrent.CopyOnWriteArrayList; -import scala.runtime.BoxedUnit; - -public class MovingAverageRateFactory { - - private static final int DEFAULT_INTERVAL_SECS = 1; - - private final Timer timer; - private final TimerTask timerTask; - private final CopyOnWriteArrayList<SampledMovingAverageRate> avgs; - - public MovingAverageRateFactory(Timer timer) { - this.avgs = new CopyOnWriteArrayList<SampledMovingAverageRate>(); - this.timer = timer; - Function0<BoxedUnit> sampleTask = new Function0<BoxedUnit>() { - public BoxedUnit apply() { - sampleAll(); - return null; - } - }; - this.timerTask = timer.schedulePeriodically( - Time.now(), Duration.fromSeconds(DEFAULT_INTERVAL_SECS), sampleTask); - } - - public MovingAverageRate create(int intervalSecs) { - SampledMovingAverageRate avg = new SampledMovingAverageRate(intervalSecs); - avgs.add(avg); - return avg; - } - - public void close() { - timerTask.cancel(); - avgs.clear(); - } - - private void sampleAll() { - for (SampledMovingAverageRate avg : avgs) { - avg.sample(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/SampledMovingAverageRate.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/SampledMovingAverageRate.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/SampledMovingAverageRate.java deleted file mode 100644 index a616324..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/SampledMovingAverageRate.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.rate; - -import com.twitter.common.stats.Rate; -import com.twitter.util.TimerTask; -import com.twitter.util.Timer; -import com.twitter.util.Time; -import java.util.concurrent.atomic.AtomicLong; - -class SampledMovingAverageRate implements MovingAverageRate { - private final Rate rate; - private final AtomicLong total; - - private double value; - - public SampledMovingAverageRate(int intervalSecs) { - this.total = new AtomicLong(0); - this.rate = Rate.of("Ignore", total) - .withWindowSize(intervalSecs) - .build(); - this.value = 0; - } - - @Override - public double get() { - return value; - } - - @Override - public void add(long amount) { - total.getAndAdd(amount); - } - - @Override - public void inc() { - add(1); - } - - void sample() { - value = rate.doSample(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/package-info.java deleted file mode 100644 index 4945133..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * ReadAhead Mechanism for distributedlog streaming reads - */ -package com.twitter.distributedlog.readahead; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstDLSNNotLessThanSelector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstDLSNNotLessThanSelector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstDLSNNotLessThanSelector.java deleted file mode 100644 index 443c503..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstDLSNNotLessThanSelector.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.selector; - -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.LogRecordWithDLSN; - -/** - * Save the first record with a dlsn not less than the dlsn provided. - */ -public class FirstDLSNNotLessThanSelector implements LogRecordSelector { - - LogRecordWithDLSN result; - final DLSN dlsn; - - public FirstDLSNNotLessThanSelector(DLSN dlsn) { - this.dlsn = dlsn; - } - - @Override - public void process(LogRecordWithDLSN record) { - if ((record.getDlsn().compareTo(dlsn) >= 0) && (null == result)) { - this.result = record; - } - } - - @Override - public LogRecordWithDLSN result() { - return this.result; - } -}