http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java new file mode 100644 index 0000000..ce8b4f7 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.twitter.util.Future; +import org.apache.distributedlog.io.AsyncCloseable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * <i>LogReader</i> is a `synchronous` reader reading records from a DL log. + * + * <h3>Lifecycle of a Reader</h3> + * + * <p>A reader is a <i>sequential</i> reader that read records from a DL log starting + * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)} + * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}. + * + * <p>After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)} + * to read records out the log from provided position. + * + * <p>Closing the reader (via {@link #close()} will release all the resources occupied + * by this reader instance. + * + * <p>Exceptions could be thrown during reading records. Once the exception is thrown, + * the reader is set to an error state and it isn't usable anymore. It is the application's + * responsibility to handle the exceptions and re-create readers if necessary. + * + * <p>Example: + * <pre> + * DistributedLogManager dlm = ...; + * long nextTxId = ...; + * LogReader reader = dlm.getInputStream(nextTxId); + * + * while (true) { // keep reading & processing records + * LogRecord record; + * try { + * record = reader.readNext(false); + * nextTxId = record.getTransactionId(); + * // process the record + * ... + * } catch (IOException ioe) { + * // handle the exception + * ... + * reader = dlm.getInputStream(nextTxId + 1); + * } + * } + * + * </pre> + * + * <h3>Read Records</h3> + * + * <p>Reading records from an <i>endless</i> log in `synchronous` way isn't as + * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it + * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on + * controlling the <i>waiting</i> behavior on `synchronous` reads. + * + * <h4>Blocking vs NonBlocking</h4> + * + * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records + * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true) + * means the reads will only check readahead cache and return whatever records + * available in the readahead cache. + * + * <p>The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is + * catching up with writer (there are records in the log), the read call will + * wait until records are read and returned. If the reader is caught up with + * writer (there are no more records in the log at read time), the read call + * will wait for a small period of time (defined in + * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever + * records available in the readahead cache. In other words, if a reader sees + * no record on blocking reads, it means the reader is `caught-up` with the + * writer. + * + * <p><i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated + * state machines. Applications could use <i>blocking</i> reads till caught up + * with latest data. Once they are caught up with latest data, they could start + * serving their service and turn to <i>non-blocking</i> read mode and tail read + * data from the logs. + * + * <p>See examples below. + * + * <h4>Read Single Record</h4> + * + * {@link #readNext(boolean)} is reading individual records from a DL log. + * + * <pre> + * LogReader reader = ... + * + * // keep reading records in blocking way until no records available in the log + * LogRecord record = reader.readNext(false); + * while (null != record) { + * // process the record + * ... + * // read next record + * records = reader.readNext(false); + * } + * + * ... + * + * // reader is caught up with writer, doing non-blocking reads to tail the log + * while (true) { + * record = reader.readNext(true) + * // process the new records + * ... + * } + * </pre> + * + * <h4>Read Batch of Records</h4> + * + * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records + * from a DL log. + * + * <pre> + * LogReader reader = ... + * int N = 10; + * + * // keep reading N records in blocking way until no records available in the log + * List<LogRecord> records = reader.readBulk(false, N); + * while (!records.isEmpty()) { + * // process the list of records + * ... + * if (records.size() < N) { // no more records available in the log + * break; + * } + * // read next N records + * records = reader.readBulk(false, N); + * } + * + * ... + * + * // reader is caught up with writer, doing non-blocking reads to tail the log + * while (true) { + * records = reader.readBulk(true, N) + * // process the new records + * ... + * } + * + * </pre> + * + * <p>NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing + * the {@link AsyncCloseable} interface so the reader could be closed asynchronously + * + * @see AsyncLogReader + */ +public interface LogReader extends Closeable { + + /** + * Read the next log record from the stream. + * + * <p>If <i>nonBlocking</i> is set to true, the call returns immediately by just polling + * records from read ahead cache. It would return <i>null</i> if there isn't any records + * available in the read ahead cache. + * + * <p>If <i>nonBlocking</i> is set to false, it would does blocking call. The call will + * block until return a record if there are records in the stream (aka catching up). + * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()} + * milliseconds and return null if there isn't any more records in the stream. + * + * @param nonBlocking should the read make blocking calls to the backend or rely on the + * readAhead cache + * @return an operation from the stream or null if at end of stream + * @throws IOException if there is an error reading from the stream + */ + LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException; + + /** + * Read the next <i>numLogRecords</i> log records from the stream. + * + * @param nonBlocking should the read make blocking calls to the backend or rely on the + * readAhead cache + * @param numLogRecords maximum number of log records returned by this call. + * @return an operation from the stream or empty list if at end of stream + * @throws IOException if there is an error reading from the stream + * @see #readNext(boolean) + */ + List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException; + + /** + * Closes this source and releases any system resources associated + * with it. If the source is already closed then invoking this + * method has no effect. + * + * @return future representing the close result. + */ + Future<Void> asyncClose(); +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java new file mode 100644 index 0000000..2a7031e --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.apache.distributedlog.util.FutureUtils.newTFuture; + +import com.google.common.annotations.VisibleForTesting; +import com.twitter.util.Future; +import java.io.IOException; +import java.util.List; + +/** + * The wrapper over {@link org.apache.distributedlog.api.LogReader}. + */ +class LogReaderImpl implements LogReader { + + private final org.apache.distributedlog.api.LogReader reader; + + LogReaderImpl(org.apache.distributedlog.api.LogReader reader) { + this.reader = reader; + } + + @VisibleForTesting + org.apache.distributedlog.api.LogReader getImpl() { + return reader; + } + + @Override + public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException { + return reader.readNext(nonBlocking); + } + + @Override + public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException { + return reader.readBulk(nonBlocking, numLogRecords); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public Future<Void> asyncClose() { + return newTFuture(reader.asyncClose()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java new file mode 100644 index 0000000..d965158 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.io.Abortable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * A generic interface class to support writing log records into + * a persistent distributed log. + */ +public interface LogWriter extends Closeable, Abortable { + /** + * Write a log record to the stream. + * + * @param record single log record + * @throws IOException + */ + void write(LogRecord record) throws IOException; + + /** + * Write a list of log records to the stream. + * + * @param records list of log records + * @throws IOException + */ + @Deprecated + int writeBulk(List<LogRecord> records) throws IOException; + + /** + * All data that has been written to the stream so far will be sent to + * persistent storage. + * The transmission is asynchronous and new data can be still written to the + * stream while flushing is performed. + * + * <p>TODO: rename this to flush() + */ + long setReadyToFlush() throws IOException; + + /** + * Flush and sync all data that is ready to be flush + * {@link #setReadyToFlush()} into underlying persistent store. + * @throws IOException + * + * TODO: rename this to commit() + */ + long flushAndSync() throws IOException; + + /** + * Flushes all the data up to this point, + * adds the end of stream marker and marks the stream + * as read-only in the metadata. No appends to the + * stream will be allowed after this point + * + * @throws IOException + */ + void markEndOfStream() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java new file mode 100644 index 0000000..532b3e5 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.List; + +/** + * The wrapper of {@link org.apache.distributedlog.api.LogWriter}. + */ +class LogWriterImpl implements LogWriter { + + private final org.apache.distributedlog.api.LogWriter impl; + + LogWriterImpl(org.apache.distributedlog.api.LogWriter impl) { + this.impl = impl; + } + + @VisibleForTesting + org.apache.distributedlog.api.LogWriter getImpl() { + return impl; + } + + @Override + public void write(LogRecord record) throws IOException { + impl.write(record); + } + + @Override + public int writeBulk(List<LogRecord> records) throws IOException { + return impl.writeBulk(records); + } + + @Override + public long setReadyToFlush() throws IOException { + return impl.setReadyToFlush(); + } + + @Override + public long flushAndSync() throws IOException { + return impl.flushAndSync(); + } + + @Override + public void markEndOfStream() throws IOException { + impl.markEndOfStream(); + } + + @Override + public void close() throws IOException { + impl.close(); + } + + @Override + public void abort() throws IOException { + impl.abort(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java new file mode 100644 index 0000000..5439976 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.apache.distributedlog.util.FutureUtils.newTFuture; + +import com.twitter.util.Future; +import java.io.IOException; +import java.util.Map; +import org.apache.distributedlog.subscription.SubscriptionsStore; + +/** + * A wrapper over {@link org.apache.distributedlog.api.subscription.SubscriptionsStore}. + */ +class SubscriptionsStoreImpl implements SubscriptionsStore { + + private final org.apache.distributedlog.api.subscription.SubscriptionsStore impl; + + SubscriptionsStoreImpl(org.apache.distributedlog.api.subscription.SubscriptionsStore impl) { + this.impl = impl; + } + + org.apache.distributedlog.api.subscription.SubscriptionsStore getImpl() { + return impl; + } + + @Override + public Future<DLSN> getLastCommitPosition(String subscriberId) { + return newTFuture(impl.getLastCommitPosition(subscriberId)); + } + + @Override + public Future<Map<String, DLSN>> getLastCommitPositions() { + return newTFuture(impl.getLastCommitPositions()); + } + + @Override + public Future<Void> advanceCommitPosition(String subscriberId, DLSN newPosition) { + return newTFuture(impl.advanceCommitPosition(subscriberId, newPosition)); + } + + @Override + public Future<Boolean> deleteSubscriber(String subscriberId) { + return newTFuture(impl.deleteSubscriber(subscriberId)); + } + + @Override + public void close() throws IOException { + impl.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java new file mode 100644 index 0000000..3bbfd95 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.namespace; + +import com.google.common.annotations.Beta; +import com.google.common.base.Optional; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.callback.NamespaceListener; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.InvalidStreamNameException; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * A namespace is the basic unit for managing a set of distributedlogs. + * + * <h4>Namespace Interface</h4> + * + * <P> + * The <code>DistributedLogNamespace</code> interface is implemented by different backend providers. + * There are several components are required for an implementation: + * <OL> + * <LI>Log Management -- manage logs in a given namespace. e.g. create/open/delete log, list of logs, + * watch the changes of logs. + * <LI>Access Control -- manage the access controls for logs in the namespace. + * </OL> + * </P> + * + * <h4>Namespace Location</h4> + * + * <p>At the highest level, a <code>DistributedLogNamespace</code> is located by a <code>URI</code>. The location + * URI is in string form has the syntax + * + * <blockquote> + * distributedlog[<tt><b>-</b></tt><i>provider</i>]<tt><b>:</b></tt><i>provider-specific-path</i> + * </blockquote> + * + * <p>where square brackets [...] delineate optional components and the characters <tt><b>-</b></tt> and + * <tt><b>:</b></tt> stand for themselves. + * + * <p>The <code>provider</code> part in the URI indicates what is the backend used for this namespace. For example: + * <i>distributedlog-bk</i> URI is storing logs in bookkeeper, while <i>distributedlog-mem</i> URI is storing logs in + * memory. The <code>provider</code> part is optional. It would use bookkeeper backend if the <i>provider</i> part + * is omitted. + * + * @see DistributedLogManager + * @since 0.3.32 + */ +@Beta +public interface DistributedLogNamespace { + + /** + * Get the namespace driver used by this namespace. + * + * @return namespace driver + */ + NamespaceDriver getNamespaceDriver(); + + // + // Method to operate logs + // + + /** + * Create a log named <i>logName</i>. + * + * @param logName + * name of the log + * @throws InvalidStreamNameException if log name is invalid. + * @throws IOException when encountered issues with backend. + */ + void createLog(String logName) + throws InvalidStreamNameException, IOException; + + /** + * Delete a log named <i>logName</i>. + * + * @param logName + * name of the log + * @throws InvalidStreamNameException if log name is invalid + * @throws LogNotFoundException if log doesn't exist + * @throws IOException when encountered issues with backend + */ + void deleteLog(String logName) + throws InvalidStreamNameException, LogNotFoundException, IOException; + + /** + * Open a log named <i>logName</i>. + * A distributedlog manager is returned to access log <i>logName</i>. + * + * @param logName + * name of the log + * @return distributedlog manager instance. + * @throws InvalidStreamNameException if log name is invalid. + * @throws IOException when encountered issues with backend. + */ + DistributedLogManager openLog(String logName) + throws InvalidStreamNameException, IOException; + + /** + * Open a log named <i>logName</i> with specific log configurations. + * + * <p>This method allows the caller to override global configuration settings by + * supplying log configuration overrides. Log config overrides come in two flavors, + * static and dynamic. Static config never changes in the lifecyle of <code>DistributedLogManager</code>, + * dynamic config changes by reloading periodically and safe to access from any context.</p> + * + * @param logName + * name of the log + * @param logConf + * static log configuration + * @param dynamicLogConf + * dynamic log configuration + * @return distributedlog manager instance. + * @throws InvalidStreamNameException if log name is invalid. + * @throws IOException when encountered issues with backend. + */ + DistributedLogManager openLog(String logName, + Optional<DistributedLogConfiguration> logConf, + Optional<DynamicDistributedLogConfiguration> dynamicLogConf, + Optional<StatsLogger> perStreamStatsLogger) + throws InvalidStreamNameException, IOException; + + /** + * Check whether the log <i>logName</i> exist. + * + * @param logName + * name of the log + * @return <code>true</code> if the log exists, otherwise <code>false</code>. + * @throws IOException when encountered exceptions on checking + */ + boolean logExists(String logName) + throws IOException; + + /** + * Retrieve the logs under the namespace. + * + * @return iterator of the logs under the namespace. + * @throws IOException when encountered issues with backend. + */ + Iterator<String> getLogs() + throws IOException; + + // + // Methods for namespace + // + + /** + * Register namespace listener on stream updates under the namespace. + * + * @param listener + * listener to receive stream updates under the namespace + */ + void registerNamespaceListener(NamespaceListener listener); + + /** + * Create an access control manager to manage/check acl for logs. + * + * @return access control manager for logs under the namespace. + * @throws IOException + */ + AccessControlManager createAccessControlManager() + throws IOException; + + /** + * Close the namespace. + */ + void close(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java new file mode 100644 index 0000000..e646b19 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.namespace; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +import java.io.IOException; +import java.net.URI; + +/** + * Builder to construct a <code>DistributedLogNamespace</code>. + * The builder takes the responsibility of loading backend according to the uri. + * + * @see DistributedLogNamespace + * @since 0.3.32 + */ +public class DistributedLogNamespaceBuilder { + + public static DistributedLogNamespaceBuilder newBuilder() { + return new DistributedLogNamespaceBuilder(); + } + + private final NamespaceBuilder builder; + + // private constructor + private DistributedLogNamespaceBuilder() { + this(NamespaceBuilder.newBuilder()); + } + + @VisibleForTesting + DistributedLogNamespaceBuilder(NamespaceBuilder builder) { + this.builder = builder; + } + + /** + * DistributedLog Configuration used for the namespace. + * + * @param conf + * distributedlog configuration + * @return namespace builder. + */ + public DistributedLogNamespaceBuilder conf(DistributedLogConfiguration conf) { + this.builder.conf(conf); + return this; + } + + /** + * Dynamic DistributedLog Configuration used for the namespace. + * + * @param dynConf dynamic distributedlog configuration + * @return namespace builder + */ + public DistributedLogNamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) { + this.builder.dynConf(dynConf); + return this; + } + + /** + * Namespace Location. + * + * @param uri + * namespace location uri. + * @see DistributedLogNamespace + * @return namespace builder. + */ + public DistributedLogNamespaceBuilder uri(URI uri) { + this.builder.uri(uri); + return this; + } + + /** + * Stats Logger used for stats collection. + * + * @param statsLogger + * stats logger + * @return namespace builder. + */ + public DistributedLogNamespaceBuilder statsLogger(StatsLogger statsLogger) { + this.builder.statsLogger(statsLogger); + return this; + } + + /** + * Stats Logger used for collecting per log stats. + * + * @param statsLogger + * stats logger for collecting per log stats + * @return namespace builder. + */ + public DistributedLogNamespaceBuilder perLogStatsLogger(StatsLogger statsLogger) { + this.builder.perLogStatsLogger(statsLogger); + return this; + } + + /** + * Feature provider used to control the availabilities of features in the namespace. + * + * @param featureProvider + * feature provider to control availabilities of features. + * @return namespace builder. + */ + public DistributedLogNamespaceBuilder featureProvider(FeatureProvider featureProvider) { + this.builder.featureProvider(featureProvider); + return this; + } + + /** + * Client Id used for accessing the namespace. + * + * @param clientId + * client id used for accessing the namespace + * @return namespace builder. + */ + public DistributedLogNamespaceBuilder clientId(String clientId) { + this.builder.clientId(clientId); + return this; + } + + /** + * Region Id used for encoding logs in the namespace. The region id + * is useful when the namespace is globally spanning over regions. + * + * @param regionId + * region id. + * @return namespace builder. + */ + public DistributedLogNamespaceBuilder regionId(int regionId) { + this.builder.regionId(regionId); + return this; + } + + @SuppressWarnings("deprecation") + private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + DistributedLogConfiguration conf) { + StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger; + if (perLogStatsLogger == NullStatsLogger.INSTANCE + && conf.getEnablePerStreamStat()) { + normalizedPerLogStatsLogger = statsLogger.scope("stream"); + } + return normalizedPerLogStatsLogger; + } + + /** + * Build the namespace. + * + * @return the namespace instance. + * @throws IllegalArgumentException when there is illegal argument provided in the builder + * @throws NullPointerException when there is null argument provided in the builder + * @throws IOException when fail to build the backend + */ + public DistributedLogNamespace build() + throws IllegalArgumentException, NullPointerException, IOException { + return new DistributedLogNamespaceImpl(this.builder.build()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java new file mode 100644 index 0000000..a528d62 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.namespace; + +import com.google.common.base.Optional; +import java.io.IOException; +import java.util.Iterator; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.DistributedLogManagerImpl; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.callback.NamespaceListener; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.InvalidStreamNameException; +import org.apache.distributedlog.exceptions.LogNotFoundException; + +/** + * A wapper over {@link org.apache.distributedlog.api.namespace.Namespace}. + */ +class DistributedLogNamespaceImpl implements DistributedLogNamespace { + + private final Namespace impl; + + DistributedLogNamespaceImpl(Namespace impl) { + this.impl = impl; + } + + + @Override + public NamespaceDriver getNamespaceDriver() { + return impl.getNamespaceDriver(); + } + + @Override + public void createLog(String logName) throws InvalidStreamNameException, IOException { + impl.createLog(logName); + } + + @Override + public void deleteLog(String logName) throws InvalidStreamNameException, LogNotFoundException, IOException { + impl.deleteLog(logName); + } + + @Override + public DistributedLogManager openLog(String logName) throws InvalidStreamNameException, IOException { + return new DistributedLogManagerImpl(impl.openLog(logName)); + } + + @Override + public DistributedLogManager openLog(String logName, + Optional<DistributedLogConfiguration> logConf, + Optional<DynamicDistributedLogConfiguration> dynamicLogConf, + Optional<StatsLogger> perStreamStatsLogger) + throws InvalidStreamNameException, IOException { + return new DistributedLogManagerImpl(impl.openLog( + logName, logConf, dynamicLogConf, perStreamStatsLogger)); + } + + @Override + public boolean logExists(String logName) throws IOException { + return impl.logExists(logName); + } + + @Override + public Iterator<String> getLogs() throws IOException { + return impl.getLogs(); + } + + @Override + public void registerNamespaceListener(NamespaceListener listener) { + impl.registerNamespaceListener(listener); + } + + @Override + public AccessControlManager createAccessControlManager() throws IOException { + return impl.createAccessControlManager(); + } + + @Override + public void close() { + impl.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java new file mode 100644 index 0000000..2febeda --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link com.twitter.util.Future} based namespace API. + */ +package org.apache.distributedlog.namespace; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java new file mode 100644 index 0000000..ef26d51 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link com.twitter.util.Future} based API. + */ +package org.apache.distributedlog; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java new file mode 100644 index 0000000..a9c7b21 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.stats; + +import com.google.common.base.Stopwatch; +import com.twitter.util.FutureEventListener; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.OpStatsLogger; + +/** + * A {@link FutureEventListener} monitors the stats for a given operation. + */ +public class OpStatsListener<T> implements FutureEventListener<T> { + OpStatsLogger opStatsLogger; + Stopwatch stopwatch; + + public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) { + this.opStatsLogger = opStatsLogger; + if (null == stopwatch) { + this.stopwatch = Stopwatch.createStarted(); + } else { + this.stopwatch = stopwatch; + } + } + + public OpStatsListener(OpStatsLogger opStatsLogger) { + this(opStatsLogger, null); + } + + @Override + public void onSuccess(T value) { + opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } + + @Override + public void onFailure(Throwable cause) { + opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java new file mode 100644 index 0000000..e352591 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link com.twitter.util.Future} based stats utils. + */ +package org.apache.distributedlog.stats; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java new file mode 100644 index 0000000..cb4ec55 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.subscription; + +import com.twitter.util.Future; +import java.io.Closeable; +import java.util.Map; +import org.apache.distributedlog.DLSN; + +/** + * Store to manage subscriptions. + */ +public interface SubscriptionsStore extends Closeable { + + /** + * Get the last committed position stored for <i>subscriberId</i>. + * + * @param subscriberId + * subscriber id + * @return future representing last committed position. + */ + Future<DLSN> getLastCommitPosition(String subscriberId); + + /** + * Get the last committed positions for all subscribers. + * + * @return future representing last committed positions for all subscribers. + */ + Future<Map<String, DLSN>> getLastCommitPositions(); + + /** + * Advance the last committed position for <i>subscriberId</i>. + * + * @param subscriberId + * subscriber id. + * @param newPosition + * new committed position. + * @return future representing advancing result. + */ + Future<Void> advanceCommitPosition(String subscriberId, DLSN newPosition); + + /** + * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the + * data stored under this subscriber will be lost. + * @param subscriberId subscriber id + * @return future represent success or failure. + * return true only if there's such subscriber and we removed it successfully. + * return false if there's no such subscriber, or we failed to remove. + */ + Future<Boolean> deleteSubscriber(String subscriberId); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java new file mode 100644 index 0000000..032a2ba --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link com.twitter.util.Future} based subscription API. + */ +package org.apache.distributedlog.subscription; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java new file mode 100644 index 0000000..d121529 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java @@ -0,0 +1,596 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.util; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureCancelledException; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Throw; +import com.twitter.util.Try; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.distributedlog.stats.OpStatsListener; +import org.apache.zookeeper.KeeperException; +import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.exceptions.BKTransmitException; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.exceptions.LockingException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.exceptions.ZKException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +/** + * Utilities to process future. + */ +public class FutureUtils { + + private static final Logger logger = LoggerFactory.getLogger(FutureUtils.class); + + /** + * A future listener that is supposed to run in ordered scheduler. + */ + public static class OrderedFutureEventListener<R> + implements FutureEventListener<R> { + + public static <R> OrderedFutureEventListener<R> of( + FutureEventListener<R> listener, + OrderedScheduler scheduler, + Object key) { + return new OrderedFutureEventListener<R>(scheduler, key, listener); + } + + private final OrderedScheduler scheduler; + private final Object key; + private final FutureEventListener<R> listener; + + private OrderedFutureEventListener(OrderedScheduler scheduler, + Object key, + FutureEventListener<R> listener) { + this.scheduler = scheduler; + this.key = key; + this.listener = listener; + } + + @Override + public void onSuccess(final R value) { + scheduler.submit(key, new Runnable() { + @Override + public void run() { + listener.onSuccess(value); + } + }); + } + + @Override + public void onFailure(final Throwable cause) { + scheduler.submit(key, new Runnable() { + @Override + public void run() { + listener.onFailure(cause); + } + }); + } + } + + /** + * A future listener is running a specific executor. + */ + public static class FutureEventListenerRunnable<R> + implements FutureEventListener<R> { + + public static <R> FutureEventListenerRunnable<R> of( + FutureEventListener<R> listener, + ExecutorService executorService) { + return new FutureEventListenerRunnable<R>(executorService, listener); + } + + private final ExecutorService executorService; + private final FutureEventListener<R> listener; + + private FutureEventListenerRunnable(ExecutorService executorService, + FutureEventListener<R> listener) { + this.executorService = executorService; + this.listener = listener; + } + + @Override + public void onSuccess(final R value) { + executorService.submit(new Runnable() { + @Override + public void run() { + listener.onSuccess(value); + } + }); + } + + @Override + public void onFailure(final Throwable cause) { + executorService.submit(new Runnable() { + @Override + public void run() { + listener.onFailure(cause); + } + }); + } + } + + private static class ListFutureProcessor<T, R> + extends Function<Throwable, BoxedUnit> + implements FutureEventListener<R>, Runnable { + + private volatile boolean interrupted = false; + private final Iterator<T> itemsIter; + private final Function<T, Future<R>> processFunc; + private final Promise<List<R>> promise; + private final List<R> results; + private final ExecutorService callbackExecutor; + + ListFutureProcessor(List<T> items, + Function<T, Future<R>> processFunc, + ExecutorService callbackExecutor) { + this.itemsIter = items.iterator(); + this.processFunc = processFunc; + this.promise = new Promise<List<R>>(); + this.promise.setInterruptHandler(this); + this.results = new ArrayList<R>(); + this.callbackExecutor = callbackExecutor; + } + + @Override + public BoxedUnit apply(Throwable cause) { + interrupted = true; + return BoxedUnit.UNIT; + } + + @Override + public void onSuccess(R value) { + results.add(value); + if (null == callbackExecutor) { + run(); + } else { + callbackExecutor.submit(this); + } + } + + @Override + public void onFailure(final Throwable cause) { + if (null == callbackExecutor) { + promise.setException(cause); + } else { + callbackExecutor.submit(new Runnable() { + @Override + public void run() { + promise.setException(cause); + } + }); + } + } + + @Override + public void run() { + if (interrupted) { + logger.debug("ListFutureProcessor is interrupted."); + return; + } + if (!itemsIter.hasNext()) { + promise.setValue(results); + return; + } + processFunc.apply(itemsIter.next()).addEventListener(this); + } + } + + /** + * Process the list of items one by one using the process function <i>processFunc</i>. + * The process will be stopped immediately if it fails on processing any one. + * + * @param collection list of items + * @param processFunc process function + * @param callbackExecutor executor to process the item + * @return future presents the list of processed results + */ + public static <T, R> Future<List<R>> processList(List<T> collection, + Function<T, Future<R>> processFunc, + @Nullable ExecutorService callbackExecutor) { + ListFutureProcessor<T, R> processor = + new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor); + if (null != callbackExecutor) { + callbackExecutor.submit(processor); + } else { + processor.run(); + } + return processor.promise; + } + + /** + * Add a event listener over <i>result</i> for collecting the operation stats. + * + * @param result result to listen on + * @param opStatsLogger stats logger to record operations stats + * @param stopwatch stop watch to time operation + * @param <T> + * @return result after registered the event listener + */ + public static <T> Future<T> stats(Future<T> result, + OpStatsLogger opStatsLogger, + Stopwatch stopwatch) { + return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch)); + } + + /** + * Await for the result of the future and thrown bk related exceptions. + * + * @param result future to wait for + * @return the result of future + * @throws BKException when exceptions are thrown by the future. If there is unkown exceptions + * thrown from the future, the exceptions will be wrapped into + * {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}. + */ + public static <T> T bkResult(Future<T> result) throws BKException { + try { + return Await.result(result); + } catch (BKException bke) { + throw bke; + } catch (InterruptedException ie) { + throw BKException.create(BKException.Code.InterruptedException); + } catch (Exception e) { + logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e); + throw BKException.create(BKException.Code.UnexpectedConditionException); + } + } + + /** + * Return the bk exception return code for a <i>throwable</i>. + * + * @param throwable the cause of the exception + * @return the bk exception return code. if the exception isn't bk exceptions, + * it would return {@link BKException.Code#UnexpectedConditionException}. + */ + public static int bkResultCode(Throwable throwable) { + if (throwable instanceof BKException) { + return ((BKException) throwable).getCode(); + } + return BKException.Code.UnexpectedConditionException; + } + + /** + * Wait for the result until it completes. + * + * @param result result to wait + * @return the result + * @throws IOException when encountered exceptions on the result + */ + public static <T> T result(Future<T> result) throws IOException { + return result(result, Duration.Top()); + } + + /** + * Wait for the result for a given <i>duration</i>. + * + * <p>If the result is not ready within `duration`, an IOException will thrown wrapping with + * corresponding {@link com.twitter.util.TimeoutException}. + * + * @param result result to wait + * @param duration duration to wait + * @return the result + * @throws IOException when encountered exceptions on the result or waiting for the result. + */ + public static <T> T result(Future<T> result, Duration duration) + throws IOException { + try { + return Await.result(result, duration); + } catch (KeeperException ke) { + throw new ZKException("Encountered zookeeper exception on waiting result", ke); + } catch (BKException bke) { + throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode()); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw new DLInterruptedException("Interrupted on waiting result", ie); + } catch (Exception e) { + throw new IOException("Encountered exception on waiting result", e); + } + } + + /** + * Wait for the result of a lock operation. + * + * @param result result to wait + * @param lockPath path of the lock + * @return the result + * @throws LockingException when encountered exceptions on the result of lock operation + */ + public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException { + try { + return Await.result(result); + } catch (LockingException le) { + throw le; + } catch (Exception e) { + throw new LockingException(lockPath, "Encountered exception on locking ", e); + } + } + + /** + * Convert the <i>throwable</i> to zookeeper related exceptions. + * + * @param throwable cause + * @param path zookeeper path + * @return zookeeper related exceptions + */ + public static Throwable zkException(Throwable throwable, String path) { + if (throwable instanceof KeeperException) { + return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable); + } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) { + return new ZKException("Encountered zookeeper connection loss on " + path, + KeeperException.Code.CONNECTIONLOSS); + } else if (throwable instanceof InterruptedException) { + return new DLInterruptedException("Interrupted on operating " + path, throwable); + } else { + return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable); + } + } + + /** + * Cancel the future. It would interrupt the future. + * + * @param future future to cancel + */ + public static <T> void cancel(Future<T> future) { + future.raise(new FutureCancelledException()); + } + + /** + * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period. + * If the promise has been satisfied before raising, it won't change the state of the promise. + * + * @param promise promise to raise exception + * @param timeout timeout period + * @param unit timeout period unit + * @param cause cause to raise + * @param scheduler scheduler to execute raising exception + * @param key the submit key used by the scheduler + * @return the promise applied with the raise logic + */ + public static <T> Promise<T> within(final Promise<T> promise, + final long timeout, + final TimeUnit unit, + final Throwable cause, + final OrderedScheduler scheduler, + final Object key) { + if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) { + return promise; + } + // schedule a timeout to raise timeout exception + final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() { + @Override + public void run() { + if (!promise.isDefined() && FutureUtils.setException(promise, cause)) { + logger.info("Raise exception", cause); + } + } + }, timeout, unit); + // when the promise is satisfied, cancel the timeout task + promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() { + @Override + public BoxedUnit apply(Try<T> value) { + if (!task.cancel(true)) { + logger.debug("Failed to cancel the timeout task"); + } + return BoxedUnit.UNIT; + } + }); + return promise; + } + + /** + * Satisfy the <i>promise</i> with provide value in an ordered scheduler. + * + * <p>If the promise was already satisfied, nothing will be changed. + * + * @param promise promise to satisfy + * @param value value to satisfy + * @param scheduler scheduler to satisfy the promise with provided value + * @param key the submit key of the ordered scheduler + */ + public static <T> void setValue(final Promise<T> promise, + final T value, + OrderedScheduler scheduler, + Object key) { + scheduler.submit(key, new Runnable() { + @Override + public void run() { + setValue(promise, value); + } + }); + } + + /** + * Satisfy the <i>promise</i> with provide value. + * + * <p>If the promise was already satisfied, nothing will be changed. + * + * @param promise promise to satisfy + * @param value value to satisfy + * @return true if successfully satisfy the future. false if the promise has been satisfied. + */ + public static <T> boolean setValue(Promise<T> promise, T value) { + boolean success = promise.updateIfEmpty(new Return<T>(value)); + if (!success) { + logger.info("Result set multiple times. Value = '{}', New = 'Return({})'", + promise.poll(), value); + } + return success; + } + + /** + * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered scheduler. + * + * @param promise promise to satisfy + * @param cause cause to satisfy + * @param scheduler the scheduler to satisfy the promise + * @param key submit key of the ordered scheduler + */ + public static <T> void setException(final Promise<T> promise, + final Throwable cause, + OrderedScheduler scheduler, + Object key) { + scheduler.submit(key, new Runnable() { + @Override + public void run() { + setException(promise, cause); + } + }); + } + + /** + * Satisfy the <i>promise</i> with provided <i>cause</i>. + * + * @param promise promise to satisfy + * @param cause cause to satisfy + * @return true if successfully satisfy the future. false if the promise has been satisfied. + */ + public static <T> boolean setException(Promise<T> promise, Throwable cause) { + boolean success = promise.updateIfEmpty(new Throw<T>(cause)); + if (!success) { + logger.info("Result set multiple times. Value = '{}', New = 'Throw({})'", + promise.poll(), cause); + } + return success; + } + + /** + * Ignore exception from the <i>future</i>. + * + * @param future the original future + * @return a transformed future ignores exceptions + */ + public static <T> Promise<Void> ignore(Future<T> future) { + return ignore(future, null); + } + + /** + * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions. + * + * @param future the original future + * @param errorMsg the error message to log on exceptions + * @return a transformed future ignores exceptions + */ + public static <T> Promise<Void> ignore(Future<T> future, final String errorMsg) { + final Promise<Void> promise = new Promise<Void>(); + future.addEventListener(new FutureEventListener<T>() { + @Override + public void onSuccess(T value) { + setValue(promise, null); + } + + @Override + public void onFailure(Throwable cause) { + if (null != errorMsg) { + logger.error(errorMsg, cause); + } + setValue(promise, null); + } + }); + return promise; + } + + /** + * Create transmit exception from transmit result. + * + * @param transmitResult + * transmit result (basically bk exception code) + * @return transmit exception + */ + public static BKTransmitException transmitException(int transmitResult) { + return new BKTransmitException("Failed to write to bookkeeper; Error is (" + + transmitResult + ") " + + BKException.getMessage(transmitResult), transmitResult); + } + + public static <T> CompletableFuture<T> newJFuture(Promise<T> promise) { + CompletableFuture<T> jFuture = org.apache.distributedlog.common.concurrent.FutureUtils.createFuture(); + jFuture.whenComplete((value, cause) -> { + if (null != cause) { + if (cause instanceof CompletionException) { + promise.setException(cause.getCause()); + } else { + promise.setException(cause); + } + } else { + promise.setValue(value); + } + }); + return jFuture; + } + + public static <T> Future<T> newTFuture(CompletableFuture<T> jFuture) { + Promise<T> promise = new Promise<>(); + jFuture.whenComplete((value, cause) -> { + if (null != cause) { + if (cause instanceof CompletionException) { + promise.setException(cause.getCause()); + } else { + promise.setException(cause); + } + } else { + promise.setValue(value); + } + }); + return promise; + } + + public static <T> Future<List<Future<T>>> newTFutureList( + CompletableFuture<List<CompletableFuture<T>>> jFutureList) { + Promise<List<Future<T>>> promise = new Promise<>(); + jFutureList.whenComplete((value, cause) -> { + if (null != cause) { + if (cause instanceof CompletionException) { + promise.setException(cause.getCause()); + } else { + promise.setException(cause); + } + } else { + promise.setValue(Lists.transform( + value, + future -> newTFuture(future))); + } + }); + return promise; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java new file mode 100644 index 0000000..ee00974 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link com.twitter.util.Future} related utils. + */ +package org.apache.distributedlog.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml b/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml new file mode 100644 index 0000000..1afb903 --- /dev/null +++ b/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml @@ -0,0 +1,19 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//--> +<FindBugsFilter> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java new file mode 100644 index 0000000..dfff56a --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.distributedlog.util.FutureUtils; +import org.junit.Test; + +/** + * Unit test of {@link AsyncLogReaderImpl}. + */ +public class TestAsyncLogReaderImpl { + + private final org.apache.distributedlog.api.AsyncLogReader underlying = + mock(org.apache.distributedlog.api.AsyncLogReader.class); + private final AsyncLogReaderImpl reader = new AsyncLogReaderImpl(underlying); + + @Test + public void testRead() throws Exception { + LogRecordWithDLSN record = mock(LogRecordWithDLSN.class); + when(underlying.readNext()) + .thenReturn(CompletableFuture.completedFuture(record)); + assertEquals(record, FutureUtils.result(reader.readNext())); + verify(underlying, times(1)).readNext(); + } + + @Test + public void testReadBulk() throws Exception { + List<LogRecordWithDLSN> records = mock(List.class); + when(underlying.readBulk(anyInt())) + .thenReturn(CompletableFuture.completedFuture(records)); + assertEquals(records, FutureUtils.result(reader.readBulk(100))); + verify(underlying, times(1)).readBulk(eq(100)); + } + + @Test + public void testReadBulkWithWaitTime() throws Exception { + List<LogRecordWithDLSN> records = mock(List.class); + when(underlying.readBulk(anyInt(), anyLong(), any(TimeUnit.class))) + .thenReturn(CompletableFuture.completedFuture(records)); + assertEquals(records, FutureUtils.result(reader.readBulk(100, 10, TimeUnit.MICROSECONDS))); + verify(underlying, times(1)) + .readBulk(eq(100), eq(10L), eq(TimeUnit.MICROSECONDS)); + } + + @Test + public void testGetStreamName() throws Exception { + String streamName = "test-stream-name"; + when(underlying.getStreamName()) + .thenReturn(streamName); + assertEquals(streamName, reader.getStreamName()); + verify(underlying, times(1)).getStreamName(); + } + + @Test + public void testAsyncClose() throws Exception { + when(underlying.asyncClose()) + .thenReturn(CompletableFuture.completedFuture(null)); + FutureUtils.result(reader.asyncClose()); + verify(underlying, times(1)).asyncClose(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java new file mode 100644 index 0000000..621e18d --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Lists; +import com.twitter.util.Futures; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.util.FutureUtils; +import org.junit.Test; + +/** + * Unit test of {@link AsyncLogWriterImpl}. + */ +public class TestAsyncLogWriterImpl { + + private final org.apache.distributedlog.api.AsyncLogWriter underlying = + mock(org.apache.distributedlog.api.AsyncLogWriter.class); + private final AsyncLogWriterImpl writer = new AsyncLogWriterImpl(underlying); + + @Test + public void testWrite() throws Exception { + DLSN dlsn = mock(DLSN.class); + LogRecord record = mock(LogRecord.class); + when(underlying.write(any(LogRecord.class))) + .thenReturn(CompletableFuture.completedFuture(dlsn)); + assertEquals(dlsn, FutureUtils.result(writer.write(record))); + verify(underlying, times(1)).write(eq(record)); + } + + @Test + public void testWriteBulk() throws Exception { + List<LogRecord> records = mock(List.class); + List<CompletableFuture<DLSN>> futures = Lists.newArrayList(); + List<DLSN> dlsns = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + DLSN dlsn = mock(DLSN.class); + dlsns.add(dlsn); + futures.add(CompletableFuture.completedFuture(dlsn)); + } + when(underlying.writeBulk(any(List.class))) + .thenReturn(CompletableFuture.completedFuture(futures)); + assertEquals( + dlsns, + FutureUtils.result(Futures.collect( + FutureUtils.result(writer.writeBulk(records))))); + verify(underlying, times(1)).writeBulk(eq(records)); + } + + @Test + public void testGetLastTxId() throws Exception { + long txId = 123456L; + when(underlying.getLastTxId()).thenReturn(txId); + assertEquals(txId, writer.getLastTxId()); + verify(underlying, times(1)).getLastTxId(); + } + + @Test + public void testTruncate() throws Exception { + DLSN dlsn = mock(DLSN.class); + when(underlying.truncate(dlsn)) + .thenReturn(CompletableFuture.completedFuture(true)); + assertTrue(FutureUtils.result(writer.truncate(dlsn))); + verify(underlying, times(1)).truncate(eq(dlsn)); + } + + @Test + public void testGetStreamName() throws Exception { + String streamName = "test-stream-name"; + when(underlying.getStreamName()) + .thenReturn(streamName); + assertEquals(streamName, writer.getStreamName()); + verify(underlying, times(1)).getStreamName(); + } + + @Test + public void testAsyncClose() throws Exception { + when(underlying.asyncClose()) + .thenReturn(CompletableFuture.completedFuture(null)); + FutureUtils.result(writer.asyncClose()); + verify(underlying, times(1)).asyncClose(); + } + + @Test + public void testAsyncAbort() throws Exception { + when(underlying.asyncAbort()) + .thenReturn(CompletableFuture.completedFuture(null)); + FutureUtils.result(writer.asyncAbort()); + verify(underlying, times(1)).asyncAbort(); + } + +}