http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java deleted file mode 100644 index bb98e07..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLockFactory.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Factory to create zookeeper based locks. - */ -public class ZKSessionLockFactory implements SessionLockFactory { - - private final ZooKeeperClient zkc; - private final String clientId; - private final OrderedScheduler lockStateExecutor; - private final long lockOpTimeout; - private final int lockCreationRetries; - private final long zkRetryBackoffMs; - - // Stats - private final StatsLogger lockStatsLogger; - - public ZKSessionLockFactory(ZooKeeperClient zkc, - String clientId, - OrderedScheduler lockStateExecutor, - int lockCreationRetries, - long lockOpTimeout, - long zkRetryBackoffMs, - StatsLogger statsLogger) { - this.zkc = zkc; - this.clientId = clientId; - this.lockStateExecutor = lockStateExecutor; - this.lockCreationRetries = lockCreationRetries; - this.lockOpTimeout = lockOpTimeout; - this.zkRetryBackoffMs = zkRetryBackoffMs; - - this.lockStatsLogger = statsLogger.scope("lock"); - } - - @Override - public Future<SessionLock> createLock(String lockPath, - DistributedLockContext context) { - AtomicInteger numRetries = new AtomicInteger(lockCreationRetries); - final AtomicReference<Throwable> interruptedException = new AtomicReference<Throwable>(null); - Promise<SessionLock> createPromise = - new Promise<SessionLock>(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - interruptedException.set(t); - return BoxedUnit.UNIT; - } - }); - createLock( - lockPath, - context, - interruptedException, - numRetries, - createPromise, - 0L); - return createPromise; - } - - void createLock(final String lockPath, - final DistributedLockContext context, - final AtomicReference<Throwable> interruptedException, - final AtomicInteger numRetries, - final Promise<SessionLock> createPromise, - final long delayMs) { - lockStateExecutor.schedule(lockPath, new Runnable() { - @Override - public void run() { - if (null != interruptedException.get()) { - createPromise.updateIfEmpty(new Throw<SessionLock>(interruptedException.get())); - return; - } - try { - SessionLock lock = new ZKSessionLock( - zkc, - lockPath, - clientId, - lockStateExecutor, - lockOpTimeout, - lockStatsLogger, - context); - createPromise.updateIfEmpty(new Return<SessionLock>(lock)); - } catch (DLInterruptedException dlie) { - // if the creation is interrupted, throw the exception without retrie. - createPromise.updateIfEmpty(new Throw<SessionLock>(dlie)); - return; - } catch (IOException e) { - if (numRetries.getAndDecrement() < 0) { - createPromise.updateIfEmpty(new Throw<SessionLock>(e)); - return; - } - createLock( - lockPath, - context, - interruptedException, - numRetries, - createPromise, - zkRetryBackoffMs); - } - } - }, delayMs, TimeUnit.MILLISECONDS); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java deleted file mode 100644 index 02d905d..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Distributed locking mechanism in distributedlog - */ -package com.twitter.distributedlog.lock; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java deleted file mode 100644 index 81eb5ed..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.annotations.Beta; -import com.twitter.distributedlog.Entry; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.util.List; - -/** - * An interface class to read the enveloped entry (serialized bytes of - * {@link com.twitter.distributedlog.Entry}) from a log segment - */ -@Beta -public interface LogSegmentEntryReader extends AsyncCloseable { - - interface StateChangeListener { - - /** - * Notify when caught up on inprogress. - */ - void onCaughtupOnInprogress(); - - } - - /** - * Start the reader. The method to signal the implementation - * to start preparing the data for consumption {@link #readNext(int)} - */ - void start(); - - /** - * Register the state change listener - * - * @param listener register the state change listener - * @return entry reader - */ - LogSegmentEntryReader registerListener(StateChangeListener listener); - - /** - * Unregister the state change listener - * - * @param listener register the state change listener - * @return entry reader - */ - LogSegmentEntryReader unregisterListener(StateChangeListener listener); - - /** - * Return the log segment metadata for this reader. - * - * @return the log segment metadata - */ - LogSegmentMetadata getSegment(); - - /** - * Update the log segment each time when the metadata has changed. - * - * @param segment new metadata of the log segment. - */ - void onLogSegmentMetadataUpdated(LogSegmentMetadata segment); - - /** - * Read next <i>numEntries</i> entries from current log segment. - * <p> - * <i>numEntries</i> will be best-effort. - * - * @param numEntries num entries to read from current log segment - * @return A promise that when satisified will contain a non-empty list of entries with their content. - * @throws {@link com.twitter.distributedlog.exceptions.EndOfLogSegmentException} when - * read entries beyond the end of a <i>closed</i> log segment. - */ - Future<List<Entry.Reader>> readNext(int numEntries); - - /** - * Return the last add confirmed entry id (LAC). - * - * @return the last add confirmed entry id. - */ - long getLastAddConfirmed(); - - /** - * Is the reader reading beyond last add confirmed. - * - * @return true if the reader is reading beyond last add confirmed - */ - boolean isBeyondLastAddConfirmed(); - - /** - * Has the log segment reader caught up with the inprogress log segment. - * - * @return true only if the log segment is inprogress and it is caught up, otherwise return false. - */ - boolean hasCaughtUpOnInprogress(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java deleted file mode 100644 index bcf8129..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryStore.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.annotations.Beta; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.util.Allocator; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.util.Future; - -import java.io.IOException; - -/** - * Log Segment Store to read log segments - */ -@Beta -public interface LogSegmentEntryStore { - - /** - * Delete the actual log segment from the entry store. - * - * @param segment log segment metadata - * @return future represent the delete result - */ - Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment); - - /** - * Create a new log segment allocator for allocating log segment entry writers. - * - * @param metadata the metadata for the log stream - * @return future represent the log segment allocator - */ - Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator( - LogMetadataForWriter metadata, - DynamicDistributedLogConfiguration dynConf) throws IOException; - - /** - * Open the reader for reading data to the log <i>segment</i>. - * - * @param segment the log <i>segment</i> to read data from - * @param startEntryId the start entry id - * @return future represent the opened reader - */ - Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, - long startEntryId); - - /** - * Open the reader for reading entries from a random access log <i>segment</i>. - * - * @param segment the log <i>segment</i> to read entries from - * @param fence the flag to fence log segment - * @return future represent the opened random access reader - */ - Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(LogSegmentMetadata segment, - boolean fence); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java deleted file mode 100644 index 8b7d9b2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.annotations.Beta; -import com.twitter.distributedlog.Entry; -import com.twitter.distributedlog.util.Sizable; -import org.apache.bookkeeper.client.AsyncCallback; - -/** - * An interface class to write the enveloped entry (serialized bytes of - * {@link Entry} into the log segment. - * - * <p>It is typically used by {@link LogSegmentWriter}. - * - * @see LogSegmentWriter - * - * TODO: The interface is leveraging bookkeeper's callback and status code now - * Consider making it more generic. - */ -@Beta -public interface LogSegmentEntryWriter extends Sizable { - - /** - * Get the log segment id. - * - * @return log segment id. - */ - long getLogSegmentId(); - - /** - * Close the entry writer. - */ - void asyncClose(AsyncCallback.CloseCallback callback, Object ctx); - - /** - * Async add entry to the log segment. - * <p>The implementation semantic follows - * {@link org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry( - * byte[], int, int, AsyncCallback.AddCallback, Object)} - * - * @param data - * data to add - * @param offset - * offset in the data - * @param length - * length of the data - * @param callback - * callback - * @param ctx - * ctx - * @see org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry( - * byte[], int, int, AsyncCallback.AddCallback, Object) - */ - void asyncAddEntry(byte[] data, int offset, int length, - AsyncCallback.AddCallback callback, Object ctx); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java deleted file mode 100644 index f8bf183..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentFilter.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import java.util.Collection; - -/** - * Filter to filter log segments - */ -public interface LogSegmentFilter { - - public static final LogSegmentFilter DEFAULT_FILTER = new LogSegmentFilter() { - @Override - public Collection<String> filter(Collection<String> fullList) { - return fullList; - } - }; - - /** - * Filter the log segments from the full log segment list. - * - * @param fullList - * full list of log segment names. - * @return filtered log segment names - */ - Collection<String> filter(Collection<String> fullList); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java deleted file mode 100644 index d4ca3ea..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataCache.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.base.Ticker; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.LogSegmentMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * Cache the log segment metadata - */ -public class LogSegmentMetadataCache implements RemovalListener<String, LogSegmentMetadata> { - - private static final Logger logger = LoggerFactory.getLogger(LogSegmentMetadataCache.class); - - private final Cache<String, LogSegmentMetadata> cache; - private final boolean isCacheEnabled; - - public LogSegmentMetadataCache(DistributedLogConfiguration conf, - Ticker ticker) { - cache = CacheBuilder.newBuilder() - .concurrencyLevel(conf.getNumWorkerThreads()) - .initialCapacity(1024) - .expireAfterAccess(conf.getLogSegmentCacheTTLMs(), TimeUnit.MILLISECONDS) - .maximumSize(conf.getLogSegmentCacheMaxSize()) - .removalListener(this) - .ticker(ticker) - .recordStats() - .build(); - this.isCacheEnabled = conf.isLogSegmentCacheEnabled(); - logger.info("Log segment cache is enabled = {}", this.isCacheEnabled); - } - - /** - * Add the log <i>segment</i> of <i>path</i> to the cache. - * - * @param path the path of the log segment - * @param segment log segment metadata - */ - public void put(String path, LogSegmentMetadata segment) { - if (isCacheEnabled) { - cache.put(path, segment); - } - } - - /** - * Invalid the cache entry associated with <i>path</i>. - * - * @param path the path of the log segment - */ - public void invalidate(String path) { - if (isCacheEnabled) { - cache.invalidate(path); - } - } - - /** - * Retrieve the log segment of <i>path</i> from the cache. - * - * @param path the path of the log segment. - * @return log segment metadata if exists, otherwise null. - */ - public LogSegmentMetadata get(String path) { - return cache.getIfPresent(path); - } - - @Override - public void onRemoval(RemovalNotification<String, LogSegmentMetadata> notification) { - if (notification.wasEvicted()) { - if (logger.isDebugEnabled()) { - logger.debug("Log segment of {} was evicted.", notification.getKey()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java deleted file mode 100644 index dda76e5..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentMetadataStore.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.annotations.Beta; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.callback.LogSegmentNamesListener; -import com.twitter.distributedlog.metadata.LogMetadata; -import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.distributedlog.util.Transaction.OpListener; -import com.twitter.util.Future; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; - -import java.io.Closeable; -import java.util.List; - -/** - * Interface for log segment metadata store. All operations that modify log segments should - * be executed under a {@link Transaction}. - */ -@Beta -public interface LogSegmentMetadataStore extends Closeable { - - /** - * Start the transaction on changing log segment metadata store. - * - * @return transaction of the log segment metadata store. - */ - Transaction<Object> transaction(); - - // The reason to keep storing log segment sequence number & log record transaction id - // in this log segment metadata store interface is to share the transaction that used - // to start/complete log segment. It is a bit hard to separate them out right now. - - /** - * Store the maximum log segment sequence number on <code>path</code>. - * - * @param txn - * transaction to execute for storing log segment sequence number. - * @param logMetadata - * metadata of the log stream - * @param sequenceNumber - * log segment sequence number to store - * @param listener - * listener on the result to this operation - */ - void storeMaxLogSegmentSequenceNumber(Transaction<Object> txn, - LogMetadata logMetadata, - Versioned<Long> sequenceNumber, - OpListener<Version> listener); - - /** - * Store the maximum transaction id for <code>path</code> - * - * @param txn - * transaction to execute for storing transaction id - * @param logMetadata - * metadata of the log stream - * @param transactionId - * transaction id to store - * @param listener - * listener on the result to this operation - */ - void storeMaxTxnId(Transaction<Object> txn, - LogMetadataForWriter logMetadata, - Versioned<Long> transactionId, - OpListener<Version> listener); - - /** - * Create a log segment <code>segment</code> under transaction <code>txn</code>. - * - * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation - * immediately. the operation should be executed via {@link Transaction#execute()} - * - * @param txn - * transaction to execute for this operation - * @param segment - * segment to create - * @param opListener - * the listener on the operation result - */ - void createLogSegment(Transaction<Object> txn, - LogSegmentMetadata segment, - OpListener<Void> opListener); - - /** - * Delete a log segment <code>segment</code> under transaction <code>txn</code>. - * - * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation - * immediately. the operation should be executed via {@link Transaction#execute()} - * - * @param txn - * transaction to execute for this operation - * @param segment - * segment to delete - */ - void deleteLogSegment(Transaction<Object> txn, - LogSegmentMetadata segment, - OpListener<Void> opListener); - - /** - * Update a log segment <code>segment</code> under transaction <code>txn</code>. - * - * NOTE: this operation shouldn't be a blocking call. and it shouldn't execute the operation - * immediately. the operation should be executed via {@link Transaction#execute()} - * - * @param txn - * transaction to execute for this operation - * @param segment - * segment to update - */ - void updateLogSegment(Transaction<Object> txn, LogSegmentMetadata segment); - - /** - * Retrieve the log segment associated <code>path</code>. - * - * @param logSegmentPath - * path to store log segment metadata - * @return future of the retrieved log segment metadata - */ - Future<LogSegmentMetadata> getLogSegment(String logSegmentPath); - - /** - * Retrieve the list of log segments under <code>logSegmentsPath</code> and register a <i>listener</i> - * for subsequent changes for the list of log segments. - * - * @param logSegmentsPath - * path to store list of log segments - * @param listener - * log segment listener on log segment changes - * @return future of the retrieved list of log segment names - */ - Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, - LogSegmentNamesListener listener); - - /** - * Unregister a log segment <code>listener</code> on log segment changes under <code>logSegmentsPath</code>. - * - * @param logSegmentsPath - * log segments path - * @param listener - * log segment listener on log segment changes - */ - void unregisterLogSegmentListener(String logSegmentsPath, - LogSegmentNamesListener listener); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java deleted file mode 100644 index 70472ca..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentRandomAccessEntryReader.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.twitter.distributedlog.Entry; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.util.List; - -/** - * An interface class to read entries {@link com.twitter.distributedlog.Entry} - * from a random access log segment. - */ -public interface LogSegmentRandomAccessEntryReader extends AsyncCloseable { - - /** - * Read entries [startEntryId, endEntryId] from a random access log segment. - * - * @param startEntryId start entry id - * @param endEntryId end entry id - * @return A promise that when satisfied will contain a list of entries of [startEntryId, endEntryId]. - */ - Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId); - - /** - * Return the last add confirmed entry id (LAC). - * - * @return the last add confirmed entry id. - */ - long getLastAddConfirmed(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java deleted file mode 100644 index a0b4610..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentWriter.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.annotations.Beta; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.exceptions.BKTransmitException; -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.io.AsyncAbortable; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.io.IOException; - -/** - * An interface class to write log records into a log segment. - */ -@Beta -public interface LogSegmentWriter extends AsyncCloseable, AsyncAbortable { - - /** - * Get the unique log segment id. - * - * @return log segment id. - */ - public long getLogSegmentId(); - - /** - * Write a log record to a log segment. - * - * @param record single log record - * @return a future representing write result. A {@link DLSN} is returned if write succeeds, - * otherwise, exceptions are returned. - * @throws com.twitter.distributedlog.exceptions.LogRecordTooLongException if log record is too long - * @throws com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException on invalid enveloped entry - * @throws LockingException if failed to acquire lock for the writer - * @throws BKTransmitException if failed to transmit data to bk - * @throws com.twitter.distributedlog.exceptions.WriteException if failed to write to bk - */ - public Future<DLSN> asyncWrite(LogRecord record); - - /** - * This isn't a simple synchronous version of {@code asyncWrite}. It has different semantic. - * This method only writes data to the buffer and flushes buffer if needed. - * - * TODO: we should remove this method. when we rewrite synchronous writer based on asynchronous writer, - * since this is the semantic needed to be provided in higher level but just calling write & flush. - * - * @param record single log record - * @throws IOException when tried to flush the buffer. - * @see LogSegmentWriter#asyncWrite(LogRecord) - */ - public void write(LogRecord record) throws IOException; - - /** - * Transmit the buffered data and wait for it being persisted and return the last acknowledged - * transaction id. - * - * @return future representing the transmit result with last acknowledged transaction id. - */ - public Future<Long> flush(); - - /** - * Commit the current acknowledged data. It is the consequent operation of {@link #flush()}, - * which makes all the acknowledged data visible to - * - * @return future representing the commit result. - */ - public Future<Long> commit(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java deleted file mode 100644 index 5f88c5a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/PerStreamLogSegmentCache.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Managing log segments in local cache. - * - * <p> - * Caching of log segment metadata assumes that the data contained in the ZNodes for individual - * log segments is never updated after creation i.e we never call setData. A log segment - * is finalized by creating a new ZNode and deleting the in progress node. This code will have - * to change if we change the behavior - * </p> - */ -public class PerStreamLogSegmentCache { - - static final Logger LOG = LoggerFactory.getLogger(PerStreamLogSegmentCache.class); - - protected final String streamName; - protected final boolean validateLogSegmentSequenceNumber; - protected final Map<String, LogSegmentMetadata> logSegments = - new HashMap<String, LogSegmentMetadata>(); - protected final ConcurrentMap<Long, LogSegmentMetadata> lid2LogSegments = - new ConcurrentHashMap<Long, LogSegmentMetadata>(); - - @VisibleForTesting - PerStreamLogSegmentCache(String streamName) { - this(streamName, true); - } - - public PerStreamLogSegmentCache(String streamName, - boolean validateLogSegmentSequenceNumber) { - this.streamName = streamName; - this.validateLogSegmentSequenceNumber = validateLogSegmentSequenceNumber; - } - - /** - * Retrieve log segments from the cache. - * - * - first sort the log segments in ascending order - * - do validation and assign corresponding sequence id - * - apply comparator after validation - * - * @param comparator - * comparator to sort the returned log segments. - * @return list of sorted and filtered log segments. - * @throws UnexpectedException if unexpected condition detected (e.g. ledger sequence number gap) - */ - public List<LogSegmentMetadata> getLogSegments(Comparator<LogSegmentMetadata> comparator) - throws UnexpectedException { - List<LogSegmentMetadata> segmentsToReturn; - synchronized (logSegments) { - segmentsToReturn = new ArrayList<LogSegmentMetadata>(logSegments.size()); - segmentsToReturn.addAll(logSegments.values()); - } - Collections.sort(segmentsToReturn, LogSegmentMetadata.COMPARATOR); - - LogSegmentMetadata prevSegment = null; - if (validateLogSegmentSequenceNumber) { - // validation ledger sequence number to ensure the log segments are unique. - for (int i = 0; i < segmentsToReturn.size(); i++) { - LogSegmentMetadata segment = segmentsToReturn.get(i); - - if (null != prevSegment - && prevSegment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value - && segment.getVersion() >= LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V2_LEDGER_SEQNO.value - && prevSegment.getLogSegmentSequenceNumber() + 1 != segment.getLogSegmentSequenceNumber()) { - LOG.error("{} found ledger sequence number gap between log segment {} and {}", - new Object[] { streamName, prevSegment, segment }); - throw new UnexpectedException(streamName + " found ledger sequence number gap between log segment " - + prevSegment.getLogSegmentSequenceNumber() + " and " + segment.getLogSegmentSequenceNumber()); - } - prevSegment = segment; - } - } - - prevSegment = null; - long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; - for (int i = 0; i < segmentsToReturn.size(); i++) { - LogSegmentMetadata segment = segmentsToReturn.get(i); - // assign sequence id - if (!segment.isInProgress()) { - if (segment.supportsSequenceId()) { - startSequenceId = segment.getStartSequenceId() + segment.getRecordCount(); - if (null != prevSegment && prevSegment.supportsSequenceId() - && prevSegment.getStartSequenceId() > segment.getStartSequenceId()) { - LOG.warn("{} found decreasing start sequence id in log segment {}, previous is {}", - new Object[] { streamName, segment, prevSegment }); - } - } else { - startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; - } - } else { - if (segment.supportsSequenceId()) { - LogSegmentMetadata newSegment = segment.mutator() - .setStartSequenceId(startSequenceId == DistributedLogConstants.UNASSIGNED_SEQUENCE_ID ? 0L : startSequenceId) - .build(); - segmentsToReturn.set(i, newSegment); - } - - break; - } - prevSegment = segment; - } - if (comparator != LogSegmentMetadata.COMPARATOR) { - Collections.sort(segmentsToReturn, comparator); - } - return segmentsToReturn; - } - - /** - * Add the segment <i>metadata</i> for <i>name</i> in the cache. - * - * @param name - * segment name. - * @param metadata - * segment metadata. - */ - public void add(String name, LogSegmentMetadata metadata) { - synchronized (logSegments) { - if (!logSegments.containsKey(name)) { - logSegments.put(name, metadata); - LOG.info("{} added log segment ({} : {}) to cache.", - new Object[]{ streamName, name, metadata }); - } - LogSegmentMetadata oldMetadata = lid2LogSegments.remove(metadata.getLogSegmentId()); - if (null == oldMetadata) { - lid2LogSegments.put(metadata.getLogSegmentId(), metadata); - } else { - if (oldMetadata.isInProgress() && !metadata.isInProgress()) { - lid2LogSegments.put(metadata.getLogSegmentId(), metadata); - } else { - lid2LogSegments.put(oldMetadata.getLogSegmentId(), oldMetadata); - } - } - } - } - - /** - * Retrieve log segment <code>name</code> from the cache. - * - * @param name - * name of the log segment. - * @return log segment metadata - */ - public LogSegmentMetadata get(String name) { - synchronized (logSegments) { - return logSegments.get(name); - } - } - - /** - * Update the log segment cache with removed/added segments. - * - * @param segmentsRemoved - * segments that removed - * @param segmentsAdded - * segments that added - */ - public void update(Set<String> segmentsRemoved, - Map<String, LogSegmentMetadata> segmentsAdded) { - synchronized (logSegments) { - for (Map.Entry<String, LogSegmentMetadata> entry : segmentsAdded.entrySet()) { - add(entry.getKey(), entry.getValue()); - } - for (String segment : segmentsRemoved) { - remove(segment); - } - } - } - - /** - * Diff with new received segment list <code>segmentReceived</code>. - * - * @param segmentsReceived - * new received segment list - * @return segments added (left) and removed (right). - */ - public Pair<Set<String>, Set<String>> diff(Set<String> segmentsReceived) { - Set<String> segmentsAdded; - Set<String> segmentsRemoved; - synchronized (logSegments) { - Set<String> segmentsCached = logSegments.keySet(); - segmentsAdded = Sets.difference(segmentsReceived, segmentsCached).immutableCopy(); - segmentsRemoved = Sets.difference(segmentsCached, segmentsReceived).immutableCopy(); - } - return Pair.of(segmentsAdded, segmentsRemoved); - } - - /** - * Remove log segment <code>name</code> from the cache. - * - * @param name - * name of the log segment. - * @return log segment metadata. - */ - public LogSegmentMetadata remove(String name) { - synchronized (logSegments) { - LogSegmentMetadata metadata = logSegments.remove(name); - if (null != metadata) { - lid2LogSegments.remove(metadata.getLogSegmentId(), metadata); - LOG.debug("Removed log segment ({} : {}) from cache.", name, metadata); - } - return metadata; - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java deleted file mode 100644 index 0101bff..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/RollingPolicy.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.twitter.distributedlog.util.Sizable; - -public interface RollingPolicy { - /** - * Determines if a rollover may be appropriate at this time. - * - * @param sizable - * Any object that is sizable. - * @param lastRolloverTimeMs - * last rolling time in millis. - * @return true if a rollover is required. otherwise, false. - */ - boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java deleted file mode 100644 index 8b1fa0f..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/SizeBasedRollingPolicy.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.twitter.distributedlog.util.Sizable; - -public class SizeBasedRollingPolicy implements RollingPolicy { - - final long maxSize; - - public SizeBasedRollingPolicy(long maxSize) { - this.maxSize = maxSize; - } - - @Override - public boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs) { - return sizable.size() > maxSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java deleted file mode 100644 index bc88720..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/TimeBasedRollingPolicy.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.logsegment; - -import com.twitter.distributedlog.util.Sizable; -import com.twitter.distributedlog.util.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TimeBasedRollingPolicy implements RollingPolicy { - - final static Logger LOG = LoggerFactory.getLogger(TimeBasedRollingPolicy.class); - - final long rollingIntervalMs; - - public TimeBasedRollingPolicy(long rollingIntervalMs) { - this.rollingIntervalMs = rollingIntervalMs; - } - - @Override - public boolean shouldRollover(Sizable sizable, long lastRolloverTimeMs) { - long elapsedMs = Utils.elapsedMSec(lastRolloverTimeMs); - boolean shouldSwitch = elapsedMs > rollingIntervalMs; - if (shouldSwitch) { - LOG.debug("Last Finalize Time: {} elapsed time (MSec): {}", lastRolloverTimeMs, - elapsedMs); - } - return shouldSwitch; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java deleted file mode 100644 index 0f5b877..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Log Segment Management - */ -package com.twitter.distributedlog.logsegment; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java deleted file mode 100644 index 178074a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLConfig.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import java.io.IOException; - -/** - * Specific config of a given implementation of DL - */ -public interface DLConfig { - /** - * Serialize the dl config into a string. - */ - public String serialize(); - - /** - * Deserialize the dl config from a readable stream. - * - * @param data - * bytes to desrialize dl config. - * @throws IOException if fail to deserialize the dl config string representation. - */ - public void deserialize(byte[] data) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java deleted file mode 100644 index c0b5fb7..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DLMetadata.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; -import java.net.URI; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * Metadata of a given DL instance. - */ -public class DLMetadata { - - static final Logger LOG = LoggerFactory.getLogger(DLMetadata.class); - - static final String LINE_SPLITTER = "\n"; - static final String BK_DL_TYPE = "BKDL"; - static final int METADATA_FORMAT_VERSION = 1; - - // metadata format version - private int metadataFormatVersion = 0; - // underlying dl type - private String dlType; - // underlying dl config - private DLConfig dlConfig; - - public DLMetadata(String dlType, DLConfig dlConfig) { - this(dlType, dlConfig, METADATA_FORMAT_VERSION); - } - - DLMetadata(String dlType, DLConfig dlConfig, int metadataFormatVersion) { - this.dlType = dlType; - this.dlConfig = dlConfig; - this.metadataFormatVersion = metadataFormatVersion; - } - - /** - * @return DL type - */ - public String getDLType() { - return dlType; - } - - /** - * @return DL Config - */ - public DLConfig getDLConfig() { - return dlConfig; - } - - /** - * Serialize the DL metadata into bytes array. - * - * @return bytes of DL metadata. - */ - public byte[] serialize() { - StringBuilder sb = new StringBuilder(); - sb.append(metadataFormatVersion).append(LINE_SPLITTER); - sb.append(dlType).append(LINE_SPLITTER); - sb.append(dlConfig.serialize()); - LOG.debug("Serialized dl metadata {}.", sb); - return sb.toString().getBytes(UTF_8); - } - - @Override - public int hashCode() { - return dlType.hashCode() * 13 + dlConfig.hashCode(); - } - - @Override - public String toString() { - return new String(serialize(), UTF_8); - } - - public void update(URI uri) throws IOException { - DistributedLogConfiguration conf = new DistributedLogConfiguration(); - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkAclId(conf.getZkAclId()) - .uri(uri) - .build(); - byte[] data = serialize(); - try { - zkc.get().setData(uri.getPath(), data, -1); - } catch (KeeperException e) { - throw new IOException("Fail to update dl metadata " + new String(data, UTF_8) - + " to uri " + uri, e); - } catch (InterruptedException e) { - throw new IOException("Interrupted when updating dl metadata " - + new String(data, UTF_8) + " to uri " + uri, e); - } finally { - zkc.close(); - } - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof DLMetadata)) { - return false; - } - DLMetadata other = (DLMetadata) o; - return dlType.equals(other.dlType) && dlConfig.equals(other.dlConfig); - } - - public void create(URI uri) throws IOException { - DistributedLogConfiguration conf = new DistributedLogConfiguration(); - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkAclId(conf.getZkAclId()) - .uri(uri) - .build(); - byte[] data = serialize(); - try { - Utils.zkCreateFullPathOptimistic(zkc, uri.getPath(), data, - zkc.getDefaultACL(), CreateMode.PERSISTENT); - } catch (KeeperException e) { - throw new IOException("Fail to write dl metadata " + new String(data, UTF_8) - + " to uri " + uri, e); - } catch (InterruptedException e) { - throw new IOException("Interrupted when writing dl metadata " + new String(data, UTF_8) - + " to uri " + uri, e); - } finally { - zkc.close(); - } - } - - public static void unbind(URI uri) throws IOException { - DistributedLogConfiguration conf = new DistributedLogConfiguration(); - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .retryThreadCount(conf.getZKClientNumberRetryThreads()) - .requestRateLimit(conf.getZKRequestRateLimit()) - .zkAclId(conf.getZkAclId()) - .uri(uri) - .build(); - byte[] data = new byte[0]; - try { - zkc.get().setData(uri.getPath(), data, -1); - } catch (KeeperException ke) { - throw new IOException("Fail to unbound dl metadata on uri " + uri, ke); - } catch (InterruptedException ie) { - throw new IOException("Interrupted when unbinding dl metadata on uri " + uri, ie); - } finally { - zkc.close(); - } - } - - /** - * Deserialize dl metadata of given <i>uri</i> from a given bytes array. - * - * @param uri - * uri that stored dl metadata bindings - * @param data - * bytes of dl metadata - * @return dl metadata - * @throws IOException if failed to parse the bytes array - */ - public static DLMetadata deserialize(URI uri, byte[] data) throws IOException { - String metadata = new String(data, UTF_8); - LOG.debug("Parsing dl metadata {}.", metadata); - BufferedReader br = new BufferedReader(new StringReader(metadata)); - String versionLine = br.readLine(); - if (null == versionLine) { - throw new IOException("Empty DL Metadata."); - } - int version; - try { - version = Integer.parseInt(versionLine); - } catch (NumberFormatException nfe) { - version = -1; - } - if (METADATA_FORMAT_VERSION != version) { - throw new IOException("Metadata version not compatible. Expected " - + METADATA_FORMAT_VERSION + " but got " + version); - } - String type = br.readLine(); - if (!BK_DL_TYPE.equals(type)) { - throw new IOException("Invalid DL type : " + type); - } - DLConfig dlConfig = new BKDLConfig(uri); - StringBuilder sb = new StringBuilder(); - String line; - while (null != (line = br.readLine())) { - sb.append(line); - } - dlConfig.deserialize(sb.toString().getBytes(UTF_8)); - return new DLMetadata(type, dlConfig, version); - } - - public static DLMetadata create(BKDLConfig bkdlConfig) { - return new DLMetadata(BK_DL_TYPE, bkdlConfig); - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java deleted file mode 100644 index b2a417e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/DryrunLogSegmentMetadataStoreUpdater.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.util.Future; - -public class DryrunLogSegmentMetadataStoreUpdater extends LogSegmentMetadataStoreUpdater { - - public DryrunLogSegmentMetadataStoreUpdater(DistributedLogConfiguration conf, - LogSegmentMetadataStore metadataStore) { - super(conf, metadataStore); - } - - @Override - public Transaction<Object> transaction() { - return new Transaction<Object>() { - @Override - public void addOp(Op<Object> operation) { - // no-op - } - - @Override - public Future<Void> execute() { - return Future.Void(); - } - - @Override - public void abort(Throwable reason) { - // no-op - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java deleted file mode 100644 index c878d68..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadata.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import java.net.URI; - -/** - * Class to represent the layout and metadata of the zookeeper-based log metadata - */ -public class LogMetadata { - - protected static String getLogComponentPath(URI uri, String logName, String logIdentifier, String component) { - return String.format("%s/%s/%s%s", uri.getPath(), logName, logIdentifier, component); - } - - /** - * Get the top stream path for a given log. - * - * @param uri namespace to store the log - * @param logName name of the log - * @return top stream path - */ - public static String getLogStreamPath(URI uri, String logName) { - return String.format("%s/%s", uri.getPath(), logName); - } - - /** - * Get the log root path for a given log. - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return log root path - */ - public static String getLogRootPath(URI uri, String logName, String logIdentifier) { - return getLogComponentPath(uri, logName, logIdentifier, ""); - } - - /** - * Get the logsegments root path for a given log. - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return logsegments root path - */ - public static String getLogSegmentsPath(URI uri, String logName, String logIdentifier) { - return getLogComponentPath(uri, logName, logIdentifier, LOGSEGMENTS_PATH); - } - - public static final int LAYOUT_VERSION = -1; - public final static String LOGSEGMENTS_PATH = "/ledgers"; - public final static String VERSION_PATH = "/version"; - // writer znodes - public final static String MAX_TXID_PATH = "/maxtxid"; - public final static String LOCK_PATH = "/lock"; - public final static String ALLOCATION_PATH = "/allocation"; - // reader znodes - public final static String READ_LOCK_PATH = "/readLock"; - - protected final URI uri; - protected final String logName; - protected final String logIdentifier; - - // Root path of the log - protected final String logRootPath; - // Components - protected final String logSegmentsPath; - protected final String lockPath; - protected final String maxTxIdPath; - protected final String allocationPath; - - /** - * metadata representation of a log - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - */ - protected LogMetadata(URI uri, - String logName, - String logIdentifier) { - this.uri = uri; - this.logName = logName; - this.logIdentifier = logIdentifier; - this.logRootPath = getLogRootPath(uri, logName, logIdentifier); - this.logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; - this.lockPath = logRootPath + LOCK_PATH; - this.maxTxIdPath = logRootPath + MAX_TXID_PATH; - this.allocationPath = logRootPath + ALLOCATION_PATH; - } - - public URI getUri() { - return uri; - } - - public String getLogName() { - return logName; - } - - /** - * Get the root path of the log. - * - * @return root path of the log. - */ - public String getLogRootPath() { - return logRootPath; - } - - /** - * Get the root path for log segments. - * - * @return root path for log segments - */ - public String getLogSegmentsPath() { - return this.logSegmentsPath; - } - - /** - * Get the path for a log segment of the log. - * - * @param segmentName - * segment name - * @return path for the log segment - */ - public String getLogSegmentPath(String segmentName) { - return this.logSegmentsPath + "/" + segmentName; - } - - public String getLockPath() { - return lockPath; - } - - public String getMaxTxIdPath() { - return maxTxIdPath; - } - - public String getAllocationPath() { - return allocationPath; - } - - /** - * Get the fully qualified name of the log. - * - * @return fully qualified name - */ - public String getFullyQualifiedName() { - return String.format("%s:%s", logName, logIdentifier); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java deleted file mode 100644 index ff6bfca..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForReader.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.google.common.base.Optional; - -import java.net.URI; - -/** - * Log Metadata for Reader - */ -public class LogMetadataForReader extends LogMetadata { - - /** - * Get the root path to store subscription infos of a log. - * - * @param uri - * namespace of the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return subscribers root path - */ - public static String getSubscribersPath(URI uri, String logName, String logIdentifier) { - return getLogComponentPath(uri, logName, logIdentifier, SUBSCRIBERS_PATH); - } - - /** - * Get the path that stores subscription info for a <code>subscriberId</code> for a <code>log</code>. - * - * @param uri - * namespace of the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @param subscriberId - * subscriber id of the log - * @return subscriber's path - */ - public static String getSubscriberPath(URI uri, String logName, String logIdentifier, String subscriberId) { - return String.format("%s/%s", getSubscribersPath(uri, logName, logIdentifier), subscriberId); - } - - /** - * Create a metadata representation of a log for reader. - * - * @param uri - * namespace to store the log - * @param logName - * name of the log - * @param logIdentifier - * identifier of the log - * @return metadata representation of a log for reader - */ - public static LogMetadataForReader of(URI uri, String logName, String logIdentifier) { - return new LogMetadataForReader(uri, logName, logIdentifier); - } - - final static String SUBSCRIBERS_PATH = "/subscribers"; - - /** - * metadata representation of a log - * - * @param uri namespace to store the log - * @param logName name of the log - * @param logIdentifier identifier of the log - */ - private LogMetadataForReader(URI uri, String logName, String logIdentifier) { - super(uri, logName, logIdentifier); - } - - /** - * Get the readlock path for the log or a subscriber of the log. - * - * @param subscriberId - * subscriber id. it is optional. - * @return read lock path - */ - public String getReadLockPath(Optional<String> subscriberId) { - if (subscriberId.isPresent()) { - return logRootPath + SUBSCRIBERS_PATH + "/" + subscriberId.get() + READ_LOCK_PATH; - } else { - return logRootPath + READ_LOCK_PATH; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java deleted file mode 100644 index 2284cbb..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataForWriter.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import org.apache.bookkeeper.versioning.Versioned; - -import java.net.URI; - -/** - * Log Metadata for writer - */ -public class LogMetadataForWriter extends LogMetadata { - - private final Versioned<byte[]> maxLSSNData; - private final Versioned<byte[]> maxTxIdData; - private final Versioned<byte[]> allocationData; - - /** - * metadata representation of a log - * - * @param uri namespace to store the log - * @param logName name of the log - * @param logIdentifier identifier of the log - */ - public LogMetadataForWriter(URI uri, - String logName, - String logIdentifier, - Versioned<byte[]> maxLSSNData, - Versioned<byte[]> maxTxIdData, - Versioned<byte[]> allocationData) { - super(uri, logName, logIdentifier); - this.maxLSSNData = maxLSSNData; - this.maxTxIdData = maxTxIdData; - this.allocationData = allocationData; - } - - public Versioned<byte[]> getMaxLSSNData() { - return maxLSSNData; - } - - public Versioned<byte[]> getMaxTxIdData() { - return maxTxIdData; - } - - public Versioned<byte[]> getAllocationData() { - return allocationData; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java deleted file mode 100644 index 01dccb7..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/metadata/LogMetadataStore.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.metadata; - -import com.google.common.annotations.Beta; -import com.google.common.base.Optional; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.util.Future; - -import java.net.URI; -import java.util.Iterator; - -/** - * Interface for log metadata store. - */ -@Beta -public interface LogMetadataStore { - - /** - * Create a stream and return it is namespace location. - * - * @param logName - * name of the log - * @return namespace location that stores this stream. - */ - Future<URI> createLog(String logName); - - /** - * Get the location of the log. - * - * @param logName - * name of the log - * @return namespace location that stores this stream. - */ - Future<Optional<URI>> getLogLocation(String logName); - - /** - * Retrieves logs from the namespace. - * - * @return iterator of logs of the namespace. - */ - Future<Iterator<String>> getLogs(); - - /** - * Register a namespace listener on streams changes. - * - * @param listener - * namespace listener - */ - void registerNamespaceListener(NamespaceListener listener); -}