http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java
new file mode 100644
index 0000000..ce8b4f7
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReader.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Future;
+import org.apache.distributedlog.io.AsyncCloseable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
+ *
+ * <h3>Lifecycle of a Reader</h3>
+ *
+ * <p>A reader is a <i>sequential</i> reader that read records from a DL log 
starting
+ * from a given position. The position could be a <i>DLSN</i> (via {@link 
DistributedLogManager#getInputStream(DLSN)}
+ * or a <i>Transaction ID</i> (via {@link 
DistributedLogManager#getInputStream(long)}.
+ *
+ * <p>After the reader is open, it could call {@link #readNext(boolean)} or 
{@link #readBulk(boolean, int)}
+ * to read records out the log from provided position.
+ *
+ * <p>Closing the reader (via {@link #close()} will release all the resources 
occupied
+ * by this reader instance.
+ *
+ * <p>Exceptions could be thrown during reading records. Once the exception is 
thrown,
+ * the reader is set to an error state and it isn't usable anymore. It is the 
application's
+ * responsibility to handle the exceptions and re-create readers if necessary.
+ *
+ * <p>Example:
+ * <pre>
+ * DistributedLogManager dlm = ...;
+ * long nextTxId = ...;
+ * LogReader reader = dlm.getInputStream(nextTxId);
+ *
+ * while (true) { // keep reading & processing records
+ *     LogRecord record;
+ *     try {
+ *         record = reader.readNext(false);
+ *         nextTxId = record.getTransactionId();
+ *         // process the record
+ *         ...
+ *     } catch (IOException ioe) {
+ *         // handle the exception
+ *         ...
+ *         reader = dlm.getInputStream(nextTxId + 1);
+ *     }
+ * }
+ *
+ * </pre>
+ *
+ * <h3>Read Records</h3>
+ *
+ * <p>Reading records from an <i>endless</i> log in `synchronous` way isn't as
+ * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
+ * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
+ * controlling the <i>waiting</i> behavior on `synchronous` reads.
+ *
+ * <h4>Blocking vs NonBlocking</h4>
+ *
+ * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
+ * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
+ * means the reads will only check readahead cache and return whatever records
+ * available in the readahead cache.
+ *
+ * <p>The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader 
is
+ * catching up with writer (there are records in the log), the read call will
+ * wait until records are read and returned. If the reader is caught up with
+ * writer (there are no more records in the log at read time), the read call
+ * will wait for a small period of time (defined in
+ * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return 
whatever
+ * records available in the readahead cache. In other words, if a reader sees
+ * no record on blocking reads, it means the reader is `caught-up` with the
+ * writer.
+ *
+ * <p><i>Blocking</i> and <i>NonBlocking</i> modes are useful for building 
replicated
+ * state machines. Applications could use <i>blocking</i> reads till caught up
+ * with latest data. Once they are caught up with latest data, they could start
+ * serving their service and turn to <i>non-blocking</i> read mode and tail 
read
+ * data from the logs.
+ *
+ * <p>See examples below.
+ *
+ * <h4>Read Single Record</h4>
+ *
+ * {@link #readNext(boolean)} is reading individual records from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ *
+ * // keep reading records in blocking way until no records available in the 
log
+ * LogRecord record = reader.readNext(false);
+ * while (null != record) {
+ *     // process the record
+ *     ...
+ *     // read next record
+ *     records = reader.readNext(false);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ *     record = reader.readNext(true)
+ *     // process the new records
+ *     ...
+ * }
+ * </pre>
+ *
+ * <h4>Read Batch of Records</h4>
+ *
+ * {@link #readBulk(boolean, int)} is a convenient way to read a batch of 
records
+ * from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ * int N = 10;
+ *
+ * // keep reading N records in blocking way until no records available in the 
log
+ * List&lt;LogRecord&gt; records = reader.readBulk(false, N);
+ * while (!records.isEmpty()) {
+ *     // process the list of records
+ *     ...
+ *     if (records.size() < N) { // no more records available in the log
+ *         break;
+ *     }
+ *     // read next N records
+ *     records = reader.readBulk(false, N);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ *     records = reader.readBulk(true, N)
+ *     // process the new records
+ *     ...
+ * }
+ *
+ * </pre>
+ *
+ * <p>NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented 
based on BKAsyncLogReader, exposing
+ * the {@link AsyncCloseable} interface so the reader could be closed 
asynchronously
+ *
+ * @see AsyncLogReader
+ */
+public interface LogReader extends Closeable {
+
+    /**
+     * Read the next log record from the stream.
+     *
+     * <p>If <i>nonBlocking</i> is set to true, the call returns immediately 
by just polling
+     * records from read ahead cache. It would return <i>null</i> if there 
isn't any records
+     * available in the read ahead cache.
+     *
+     * <p>If <i>nonBlocking</i> is set to false, it would does blocking call. 
The call will
+     * block until return a record if there are records in the stream (aka 
catching up).
+     * Otherwise it would wait up to {@link 
DistributedLogConfiguration#getReadAheadWaitTime()}
+     * milliseconds and return null if there isn't any more records in the 
stream.
+     *
+     * @param nonBlocking should the read make blocking calls to the backend 
or rely on the
+     * readAhead cache
+     * @return an operation from the stream or null if at end of stream
+     * @throws IOException if there is an error reading from the stream
+     */
+    LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
+
+    /**
+     * Read the next <i>numLogRecords</i> log records from the stream.
+     *
+     * @param nonBlocking should the read make blocking calls to the backend 
or rely on the
+     * readAhead cache
+     * @param numLogRecords maximum number of log records returned by this 
call.
+     * @return an operation from the stream or empty list if at end of stream
+     * @throws IOException if there is an error reading from the stream
+     * @see #readNext(boolean)
+     */
+    List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) 
throws IOException;
+
+    /**
+     * Closes this source and releases any system resources associated
+     * with it. If the source is already closed then invoking this
+     * method has no effect.
+     *
+     * @return future representing the close result.
+     */
+    Future<Void> asyncClose();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java
new file mode 100644
index 0000000..2a7031e
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogReaderImpl.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The wrapper over {@link org.apache.distributedlog.api.LogReader}.
+ */
+class LogReaderImpl implements LogReader {
+
+    private final org.apache.distributedlog.api.LogReader reader;
+
+    LogReaderImpl(org.apache.distributedlog.api.LogReader reader) {
+        this.reader = reader;
+    }
+
+    @VisibleForTesting
+    org.apache.distributedlog.api.LogReader getImpl() {
+        return reader;
+    }
+
+    @Override
+    public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException {
+        return reader.readNext(nonBlocking);
+    }
+
+    @Override
+    public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int 
numLogRecords) throws IOException {
+        return reader.readBulk(nonBlocking, numLogRecords);
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return newTFuture(reader.asyncClose());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java
new file mode 100644
index 0000000..d965158
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriter.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.io.Abortable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A generic interface class to support writing log records into
+ * a persistent distributed log.
+ */
+public interface LogWriter extends Closeable, Abortable {
+    /**
+     * Write a log record to the stream.
+     *
+     * @param record single log record
+     * @throws IOException
+     */
+    void write(LogRecord record) throws IOException;
+
+    /**
+     * Write a list of log records to the stream.
+     *
+     * @param records list of log records
+     * @throws IOException
+     */
+    @Deprecated
+    int writeBulk(List<LogRecord> records) throws IOException;
+
+    /**
+     * All data that has been written to the stream so far will be sent to
+     * persistent storage.
+     * The transmission is asynchronous and new data can be still written to 
the
+     * stream while flushing is performed.
+     *
+     * <p>TODO: rename this to flush()
+     */
+    long setReadyToFlush() throws IOException;
+
+    /**
+     * Flush and sync all data that is ready to be flush
+     * {@link #setReadyToFlush()} into underlying persistent store.
+     * @throws IOException
+     *
+     * TODO: rename this to commit()
+     */
+    long flushAndSync() throws IOException;
+
+    /**
+     * Flushes all the data up to this point,
+     * adds the end of stream marker and marks the stream
+     * as read-only in the metadata. No appends to the
+     * stream will be allowed after this point
+     *
+     * @throws IOException
+     */
+    void markEndOfStream() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java
new file mode 100644
index 0000000..532b3e5
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/LogWriterImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The wrapper of {@link org.apache.distributedlog.api.LogWriter}.
+ */
+class LogWriterImpl implements LogWriter {
+
+    private final org.apache.distributedlog.api.LogWriter impl;
+
+    LogWriterImpl(org.apache.distributedlog.api.LogWriter impl) {
+        this.impl = impl;
+    }
+
+    @VisibleForTesting
+    org.apache.distributedlog.api.LogWriter getImpl() {
+        return impl;
+    }
+
+    @Override
+    public void write(LogRecord record) throws IOException {
+        impl.write(record);
+    }
+
+    @Override
+    public int writeBulk(List<LogRecord> records) throws IOException {
+        return impl.writeBulk(records);
+    }
+
+    @Override
+    public long setReadyToFlush() throws IOException {
+        return impl.setReadyToFlush();
+    }
+
+    @Override
+    public long flushAndSync() throws IOException {
+        return impl.flushAndSync();
+    }
+
+    @Override
+    public void markEndOfStream() throws IOException {
+        impl.markEndOfStream();
+    }
+
+    @Override
+    public void close() throws IOException {
+        impl.close();
+    }
+
+    @Override
+    public void abort() throws IOException {
+        impl.abort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java
new file mode 100644
index 0000000..5439976
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/SubscriptionsStoreImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+
+import com.twitter.util.Future;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+
+/**
+ * A wrapper over {@link 
org.apache.distributedlog.api.subscription.SubscriptionsStore}.
+ */
+class SubscriptionsStoreImpl implements SubscriptionsStore {
+
+    private final 
org.apache.distributedlog.api.subscription.SubscriptionsStore impl;
+
+    
SubscriptionsStoreImpl(org.apache.distributedlog.api.subscription.SubscriptionsStore
 impl) {
+        this.impl = impl;
+    }
+
+    org.apache.distributedlog.api.subscription.SubscriptionsStore getImpl() {
+        return impl;
+    }
+
+    @Override
+    public Future<DLSN> getLastCommitPosition(String subscriberId) {
+        return newTFuture(impl.getLastCommitPosition(subscriberId));
+    }
+
+    @Override
+    public Future<Map<String, DLSN>> getLastCommitPositions() {
+        return newTFuture(impl.getLastCommitPositions());
+    }
+
+    @Override
+    public Future<Void> advanceCommitPosition(String subscriberId, DLSN 
newPosition) {
+        return newTFuture(impl.advanceCommitPosition(subscriberId, 
newPosition));
+    }
+
+    @Override
+    public Future<Boolean> deleteSubscriber(String subscriberId) {
+        return newTFuture(impl.deleteSubscriber(subscriberId));
+    }
+
+    @Override
+    public void close() throws IOException {
+        impl.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
new file mode 100644
index 0000000..3bbfd95
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespace.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.namespace;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.InvalidStreamNameException;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A namespace is the basic unit for managing a set of distributedlogs.
+ *
+ * <h4>Namespace Interface</h4>
+ *
+ * <P>
+ * The <code>DistributedLogNamespace</code> interface is implemented by 
different backend providers.
+ * There are several components are required for an implementation:
+ * <OL>
+ *     <LI>Log Management -- manage logs in a given namespace. e.g. 
create/open/delete log, list of logs,
+ *         watch the changes of logs.
+ *     <LI>Access Control -- manage the access controls for logs in the 
namespace.
+ * </OL>
+ * </P>
+ *
+ * <h4>Namespace Location</h4>
+ *
+ * <p>At the highest level, a <code>DistributedLogNamespace</code> is located 
by a <code>URI</code>. The location
+ * URI is in string form has the syntax
+ *
+ * <blockquote>
+ * 
distributedlog[<tt><b>-</b></tt><i>provider</i>]<tt><b>:</b></tt><i>provider-specific-path</i>
+ * </blockquote>
+ *
+ * <p>where square brackets [...] delineate optional components and the 
characters <tt><b>-</b></tt> and
+ * <tt><b>:</b></tt> stand for themselves.
+ *
+ * <p>The <code>provider</code> part in the URI indicates what is the backend 
used for this namespace. For example:
+ * <i>distributedlog-bk</i> URI is storing logs in bookkeeper, while 
<i>distributedlog-mem</i> URI is storing logs in
+ * memory. The <code>provider</code> part is optional. It would use bookkeeper 
backend if the <i>provider</i> part
+ * is omitted.
+ *
+ * @see DistributedLogManager
+ * @since 0.3.32
+ */
+@Beta
+public interface DistributedLogNamespace {
+
+    /**
+     * Get the namespace driver used by this namespace.
+     *
+     * @return namespace driver
+     */
+    NamespaceDriver getNamespaceDriver();
+
+    //
+    // Method to operate logs
+    //
+
+    /**
+     * Create a log named <i>logName</i>.
+     *
+     * @param logName
+     *          name of the log
+     * @throws InvalidStreamNameException if log name is invalid.
+     * @throws IOException when encountered issues with backend.
+     */
+    void createLog(String logName)
+            throws InvalidStreamNameException, IOException;
+
+    /**
+     * Delete a log named <i>logName</i>.
+     *
+     * @param logName
+     *          name of the log
+     * @throws InvalidStreamNameException if log name is invalid
+     * @throws LogNotFoundException if log doesn't exist
+     * @throws IOException when encountered issues with backend
+     */
+    void deleteLog(String logName)
+            throws InvalidStreamNameException, LogNotFoundException, 
IOException;
+
+    /**
+     * Open a log named <i>logName</i>.
+     * A distributedlog manager is returned to access log <i>logName</i>.
+     *
+     * @param logName
+     *          name of the log
+     * @return distributedlog manager instance.
+     * @throws InvalidStreamNameException if log name is invalid.
+     * @throws IOException when encountered issues with backend.
+     */
+    DistributedLogManager openLog(String logName)
+            throws InvalidStreamNameException, IOException;
+
+    /**
+     * Open a log named <i>logName</i> with specific log configurations.
+     *
+     * <p>This method allows the caller to override global configuration 
settings by
+     * supplying log configuration overrides. Log config overrides come in two 
flavors,
+     * static and dynamic. Static config never changes in the lifecyle of 
<code>DistributedLogManager</code>,
+     * dynamic config changes by reloading periodically and safe to access 
from any context.</p>
+     *
+     * @param logName
+     *          name of the log
+     * @param logConf
+     *          static log configuration
+     * @param dynamicLogConf
+     *          dynamic log configuration
+     * @return distributedlog manager instance.
+     * @throws InvalidStreamNameException if log name is invalid.
+     * @throws IOException when encountered issues with backend.
+     */
+    DistributedLogManager openLog(String logName,
+                                  Optional<DistributedLogConfiguration> 
logConf,
+                                  Optional<DynamicDistributedLogConfiguration> 
dynamicLogConf,
+                                  Optional<StatsLogger> perStreamStatsLogger)
+            throws InvalidStreamNameException, IOException;
+
+    /**
+     * Check whether the log <i>logName</i> exist.
+     *
+     * @param logName
+     *          name of the log
+     * @return <code>true</code> if the log exists, otherwise 
<code>false</code>.
+     * @throws IOException when encountered exceptions on checking
+     */
+    boolean logExists(String logName)
+            throws IOException;
+
+    /**
+     * Retrieve the logs under the namespace.
+     *
+     * @return iterator of the logs under the namespace.
+     * @throws IOException when encountered issues with backend.
+     */
+    Iterator<String> getLogs()
+            throws IOException;
+
+    //
+    // Methods for namespace
+    //
+
+    /**
+     * Register namespace listener on stream updates under the namespace.
+     *
+     * @param listener
+     *          listener to receive stream updates under the namespace
+     */
+    void registerNamespaceListener(NamespaceListener listener);
+
+    /**
+     * Create an access control manager to manage/check acl for logs.
+     *
+     * @return access control manager for logs under the namespace.
+     * @throws IOException
+     */
+    AccessControlManager createAccessControlManager()
+            throws IOException;
+
+    /**
+     * Close the namespace.
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
new file mode 100644
index 0000000..e646b19
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceBuilder.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.namespace;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Builder to construct a <code>DistributedLogNamespace</code>.
+ * The builder takes the responsibility of loading backend according to the 
uri.
+ *
+ * @see DistributedLogNamespace
+ * @since 0.3.32
+ */
+public class DistributedLogNamespaceBuilder {
+
+    public static DistributedLogNamespaceBuilder newBuilder() {
+        return new DistributedLogNamespaceBuilder();
+    }
+
+    private final NamespaceBuilder builder;
+
+    // private constructor
+    private DistributedLogNamespaceBuilder() {
+        this(NamespaceBuilder.newBuilder());
+    }
+
+    @VisibleForTesting
+    DistributedLogNamespaceBuilder(NamespaceBuilder builder) {
+        this.builder = builder;
+    }
+
+    /**
+     * DistributedLog Configuration used for the namespace.
+     *
+     * @param conf
+     *          distributedlog configuration
+     * @return namespace builder.
+     */
+    public DistributedLogNamespaceBuilder conf(DistributedLogConfiguration 
conf) {
+        this.builder.conf(conf);
+        return this;
+    }
+
+    /**
+     * Dynamic DistributedLog Configuration used for the namespace.
+     *
+     * @param dynConf dynamic distributedlog configuration
+     * @return namespace builder
+     */
+    public DistributedLogNamespaceBuilder 
dynConf(DynamicDistributedLogConfiguration dynConf) {
+        this.builder.dynConf(dynConf);
+        return this;
+    }
+
+    /**
+     * Namespace Location.
+     *
+     * @param uri
+     *          namespace location uri.
+     * @see DistributedLogNamespace
+     * @return namespace builder.
+     */
+    public DistributedLogNamespaceBuilder uri(URI uri) {
+        this.builder.uri(uri);
+        return this;
+    }
+
+    /**
+     * Stats Logger used for stats collection.
+     *
+     * @param statsLogger
+     *          stats logger
+     * @return namespace builder.
+     */
+    public DistributedLogNamespaceBuilder statsLogger(StatsLogger statsLogger) 
{
+        this.builder.statsLogger(statsLogger);
+        return this;
+    }
+
+    /**
+     * Stats Logger used for collecting per log stats.
+     *
+     * @param statsLogger
+     *          stats logger for collecting per log stats
+     * @return namespace builder.
+     */
+    public DistributedLogNamespaceBuilder perLogStatsLogger(StatsLogger 
statsLogger) {
+        this.builder.perLogStatsLogger(statsLogger);
+        return this;
+    }
+
+    /**
+     * Feature provider used to control the availabilities of features in the 
namespace.
+     *
+     * @param featureProvider
+     *          feature provider to control availabilities of features.
+     * @return namespace builder.
+     */
+    public DistributedLogNamespaceBuilder featureProvider(FeatureProvider 
featureProvider) {
+        this.builder.featureProvider(featureProvider);
+        return this;
+    }
+
+    /**
+     * Client Id used for accessing the namespace.
+     *
+     * @param clientId
+     *          client id used for accessing the namespace
+     * @return namespace builder.
+     */
+    public DistributedLogNamespaceBuilder clientId(String clientId) {
+        this.builder.clientId(clientId);
+        return this;
+    }
+
+    /**
+     * Region Id used for encoding logs in the namespace. The region id
+     * is useful when the namespace is globally spanning over regions.
+     *
+     * @param regionId
+     *          region id.
+     * @return namespace builder.
+     */
+    public DistributedLogNamespaceBuilder regionId(int regionId) {
+        this.builder.regionId(regionId);
+        return this;
+    }
+
+    @SuppressWarnings("deprecation")
+    private static StatsLogger normalizePerLogStatsLogger(StatsLogger 
statsLogger,
+                                                          StatsLogger 
perLogStatsLogger,
+                                                          
DistributedLogConfiguration conf) {
+        StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger;
+        if (perLogStatsLogger == NullStatsLogger.INSTANCE
+            && conf.getEnablePerStreamStat()) {
+            normalizedPerLogStatsLogger = statsLogger.scope("stream");
+        }
+        return normalizedPerLogStatsLogger;
+    }
+
+    /**
+     * Build the namespace.
+     *
+     * @return the namespace instance.
+     * @throws IllegalArgumentException when there is illegal argument 
provided in the builder
+     * @throws NullPointerException when there is null argument provided in 
the builder
+     * @throws IOException when fail to build the backend
+     */
+    public DistributedLogNamespace build()
+            throws IllegalArgumentException, NullPointerException, IOException 
{
+        return new DistributedLogNamespaceImpl(this.builder.build());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java
new file mode 100644
index 0000000..a528d62
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/DistributedLogNamespaceImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.namespace;
+
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.DistributedLogManagerImpl;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.InvalidStreamNameException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+
+/**
+ * A wapper over {@link org.apache.distributedlog.api.namespace.Namespace}.
+ */
+class DistributedLogNamespaceImpl implements DistributedLogNamespace {
+
+    private final Namespace impl;
+
+    DistributedLogNamespaceImpl(Namespace impl) {
+        this.impl = impl;
+    }
+
+
+    @Override
+    public NamespaceDriver getNamespaceDriver() {
+        return impl.getNamespaceDriver();
+    }
+
+    @Override
+    public void createLog(String logName) throws InvalidStreamNameException, 
IOException {
+        impl.createLog(logName);
+    }
+
+    @Override
+    public void deleteLog(String logName) throws InvalidStreamNameException, 
LogNotFoundException, IOException {
+        impl.deleteLog(logName);
+    }
+
+    @Override
+    public DistributedLogManager openLog(String logName) throws 
InvalidStreamNameException, IOException {
+        return new DistributedLogManagerImpl(impl.openLog(logName));
+    }
+
+    @Override
+    public DistributedLogManager openLog(String logName,
+                                         Optional<DistributedLogConfiguration> 
logConf,
+                                         
Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+                                         Optional<StatsLogger> 
perStreamStatsLogger)
+            throws InvalidStreamNameException, IOException {
+        return new DistributedLogManagerImpl(impl.openLog(
+            logName, logConf, dynamicLogConf, perStreamStatsLogger));
+    }
+
+    @Override
+    public boolean logExists(String logName) throws IOException {
+        return impl.logExists(logName);
+    }
+
+    @Override
+    public Iterator<String> getLogs() throws IOException {
+        return impl.getLogs();
+    }
+
+    @Override
+    public void registerNamespaceListener(NamespaceListener listener) {
+        impl.registerNamespaceListener(listener);
+    }
+
+    @Override
+    public AccessControlManager createAccessControlManager() throws 
IOException {
+        return impl.createAccessControlManager();
+    }
+
+    @Override
+    public void close() {
+        impl.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java
new file mode 100644
index 0000000..2febeda
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/namespace/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link com.twitter.util.Future} based namespace API.
+ */
+package org.apache.distributedlog.namespace;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java
new file mode 100644
index 0000000..ef26d51
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link com.twitter.util.Future} based API.
+ */
+package org.apache.distributedlog;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
new file mode 100644
index 0000000..a9c7b21
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.stats;
+
+import com.google.common.base.Stopwatch;
+import com.twitter.util.FutureEventListener;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+/**
+ * A {@link FutureEventListener} monitors the stats for a given operation.
+ */
+public class OpStatsListener<T> implements FutureEventListener<T> {
+    OpStatsLogger opStatsLogger;
+    Stopwatch stopwatch;
+
+    public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) {
+        this.opStatsLogger = opStatsLogger;
+        if (null == stopwatch) {
+            this.stopwatch = Stopwatch.createStarted();
+        } else {
+            this.stopwatch = stopwatch;
+        }
+    }
+
+    public OpStatsListener(OpStatsLogger opStatsLogger) {
+        this(opStatsLogger, null);
+    }
+
+    @Override
+    public void onSuccess(T value) {
+        
opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+    }
+
+    @Override
+    public void onFailure(Throwable cause) {
+        
opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java
new file mode 100644
index 0000000..e352591
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link com.twitter.util.Future} based stats utils.
+ */
+package org.apache.distributedlog.stats;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
new file mode 100644
index 0000000..cb4ec55
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/SubscriptionsStore.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.subscription;
+
+import com.twitter.util.Future;
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.distributedlog.DLSN;
+
+/**
+ * Store to manage subscriptions.
+ */
+public interface SubscriptionsStore extends Closeable {
+
+    /**
+     * Get the last committed position stored for <i>subscriberId</i>.
+     *
+     * @param subscriberId
+     *          subscriber id
+     * @return future representing last committed position.
+     */
+    Future<DLSN> getLastCommitPosition(String subscriberId);
+
+    /**
+     * Get the last committed positions for all subscribers.
+     *
+     * @return future representing last committed positions for all 
subscribers.
+     */
+    Future<Map<String, DLSN>> getLastCommitPositions();
+
+    /**
+     * Advance the last committed position for <i>subscriberId</i>.
+     *
+     * @param subscriberId
+     *          subscriber id.
+     * @param newPosition
+     *          new committed position.
+     * @return future representing advancing result.
+     */
+    Future<Void> advanceCommitPosition(String subscriberId, DLSN newPosition);
+
+    /**
+     * Delete the subscriber <i>subscriberId</i> permanently. Once the 
subscriber is deleted, all the
+     * data stored under this subscriber will be lost.
+     * @param subscriberId subscriber id
+     * @return future represent success or failure.
+     * return true only if there's such subscriber and we removed it 
successfully.
+     * return false if there's no such subscriber, or we failed to remove.
+     */
+    Future<Boolean> deleteSubscriber(String subscriberId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java
new file mode 100644
index 0000000..032a2ba
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/subscription/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link com.twitter.util.Future} based subscription API.
+ */
+package org.apache.distributedlog.subscription;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java
new file mode 100644
index 0000000..d121529
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/FutureUtils.java
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.util;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureCancelledException;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Throw;
+import com.twitter.util.Try;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.distributedlog.stats.OpStatsListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.exceptions.BKTransmitException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Utilities to process future.
+ */
+public class FutureUtils {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(FutureUtils.class);
+
+    /**
+     * A future listener that is supposed to run in ordered scheduler.
+     */
+    public static class OrderedFutureEventListener<R>
+            implements FutureEventListener<R> {
+
+        public static <R> OrderedFutureEventListener<R> of(
+                FutureEventListener<R> listener,
+                OrderedScheduler scheduler,
+                Object key) {
+            return new OrderedFutureEventListener<R>(scheduler, key, listener);
+        }
+
+        private final OrderedScheduler scheduler;
+        private final Object key;
+        private final FutureEventListener<R> listener;
+
+        private OrderedFutureEventListener(OrderedScheduler scheduler,
+                                           Object key,
+                                           FutureEventListener<R> listener) {
+            this.scheduler = scheduler;
+            this.key = key;
+            this.listener = listener;
+        }
+
+        @Override
+        public void onSuccess(final R value) {
+            scheduler.submit(key, new Runnable() {
+                @Override
+                public void run() {
+                    listener.onSuccess(value);
+                }
+            });
+        }
+
+        @Override
+        public void onFailure(final Throwable cause) {
+            scheduler.submit(key, new Runnable() {
+                @Override
+                public void run() {
+                    listener.onFailure(cause);
+                }
+            });
+        }
+    }
+
+    /**
+     * A future listener is running a specific executor.
+     */
+    public static class FutureEventListenerRunnable<R>
+            implements FutureEventListener<R> {
+
+        public static <R> FutureEventListenerRunnable<R> of(
+                FutureEventListener<R> listener,
+                ExecutorService executorService) {
+            return new FutureEventListenerRunnable<R>(executorService, 
listener);
+        }
+
+        private final ExecutorService executorService;
+        private final FutureEventListener<R> listener;
+
+        private FutureEventListenerRunnable(ExecutorService executorService,
+                                            FutureEventListener<R> listener) {
+            this.executorService = executorService;
+            this.listener = listener;
+        }
+
+        @Override
+        public void onSuccess(final R value) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    listener.onSuccess(value);
+                }
+            });
+        }
+
+        @Override
+        public void onFailure(final Throwable cause) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    listener.onFailure(cause);
+                }
+            });
+        }
+    }
+
+    private static class ListFutureProcessor<T, R>
+            extends Function<Throwable, BoxedUnit>
+            implements FutureEventListener<R>, Runnable {
+
+        private volatile boolean interrupted = false;
+        private final Iterator<T> itemsIter;
+        private final Function<T, Future<R>> processFunc;
+        private final Promise<List<R>> promise;
+        private final List<R> results;
+        private final ExecutorService callbackExecutor;
+
+        ListFutureProcessor(List<T> items,
+                            Function<T, Future<R>> processFunc,
+                            ExecutorService callbackExecutor) {
+            this.itemsIter = items.iterator();
+            this.processFunc = processFunc;
+            this.promise = new Promise<List<R>>();
+            this.promise.setInterruptHandler(this);
+            this.results = new ArrayList<R>();
+            this.callbackExecutor = callbackExecutor;
+        }
+
+        @Override
+        public BoxedUnit apply(Throwable cause) {
+            interrupted = true;
+            return BoxedUnit.UNIT;
+        }
+
+        @Override
+        public void onSuccess(R value) {
+            results.add(value);
+            if (null == callbackExecutor) {
+                run();
+            } else {
+                callbackExecutor.submit(this);
+            }
+        }
+
+        @Override
+        public void onFailure(final Throwable cause) {
+            if (null == callbackExecutor) {
+                promise.setException(cause);
+            } else {
+                callbackExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        promise.setException(cause);
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void run() {
+            if (interrupted) {
+                logger.debug("ListFutureProcessor is interrupted.");
+                return;
+            }
+            if (!itemsIter.hasNext()) {
+                promise.setValue(results);
+                return;
+            }
+            processFunc.apply(itemsIter.next()).addEventListener(this);
+        }
+    }
+
+    /**
+     * Process the list of items one by one using the process function 
<i>processFunc</i>.
+     * The process will be stopped immediately if it fails on processing any 
one.
+     *
+     * @param collection list of items
+     * @param processFunc process function
+     * @param callbackExecutor executor to process the item
+     * @return future presents the list of processed results
+     */
+    public static <T, R> Future<List<R>> processList(List<T> collection,
+                                                     Function<T, Future<R>> 
processFunc,
+                                                     @Nullable ExecutorService 
callbackExecutor) {
+        ListFutureProcessor<T, R> processor =
+                new ListFutureProcessor<T, R>(collection, processFunc, 
callbackExecutor);
+        if (null != callbackExecutor) {
+            callbackExecutor.submit(processor);
+        } else {
+            processor.run();
+        }
+        return processor.promise;
+    }
+
+    /**
+     * Add a event listener over <i>result</i> for collecting the operation 
stats.
+     *
+     * @param result result to listen on
+     * @param opStatsLogger stats logger to record operations stats
+     * @param stopwatch stop watch to time operation
+     * @param <T>
+     * @return result after registered the event listener
+     */
+    public static <T> Future<T> stats(Future<T> result,
+                                      OpStatsLogger opStatsLogger,
+                                      Stopwatch stopwatch) {
+        return result.addEventListener(new OpStatsListener<T>(opStatsLogger, 
stopwatch));
+    }
+
+    /**
+     * Await for the result of the future and thrown bk related exceptions.
+     *
+     * @param result future to wait for
+     * @return the result of future
+     * @throws BKException when exceptions are thrown by the future. If there 
is unkown exceptions
+     *         thrown from the future, the exceptions will be wrapped into
+     *         {@link 
org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}.
+     */
+    public static <T> T bkResult(Future<T> result) throws BKException {
+        try {
+            return Await.result(result);
+        } catch (BKException bke) {
+            throw bke;
+        } catch (InterruptedException ie) {
+            throw BKException.create(BKException.Code.InterruptedException);
+        } catch (Exception e) {
+            logger.warn("Encountered unexpected exception on waiting 
bookkeeper results : ", e);
+            throw 
BKException.create(BKException.Code.UnexpectedConditionException);
+        }
+    }
+
+    /**
+     * Return the bk exception return code for a <i>throwable</i>.
+     *
+     * @param throwable the cause of the exception
+     * @return the bk exception return code. if the exception isn't bk 
exceptions,
+     *         it would return {@link 
BKException.Code#UnexpectedConditionException}.
+     */
+    public static int bkResultCode(Throwable throwable) {
+        if (throwable instanceof BKException) {
+            return ((BKException) throwable).getCode();
+        }
+        return BKException.Code.UnexpectedConditionException;
+    }
+
+    /**
+     * Wait for the result until it completes.
+     *
+     * @param result result to wait
+     * @return the result
+     * @throws IOException when encountered exceptions on the result
+     */
+    public static <T> T result(Future<T> result) throws IOException {
+        return result(result, Duration.Top());
+    }
+
+    /**
+     * Wait for the result for a given <i>duration</i>.
+     *
+     * <p>If the result is not ready within `duration`, an IOException will 
thrown wrapping with
+     * corresponding {@link com.twitter.util.TimeoutException}.
+     *
+     * @param result result to wait
+     * @param duration duration to wait
+     * @return the result
+     * @throws IOException when encountered exceptions on the result or 
waiting for the result.
+     */
+    public static <T> T result(Future<T> result, Duration duration)
+            throws IOException {
+        try {
+            return Await.result(result, duration);
+        } catch (KeeperException ke) {
+            throw new ZKException("Encountered zookeeper exception on waiting 
result", ke);
+        } catch (BKException bke) {
+            throw new BKTransmitException("Encountered bookkeeper exception on 
waiting result", bke.getCode());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (InterruptedException ie) {
+            throw new DLInterruptedException("Interrupted on waiting result", 
ie);
+        } catch (Exception e) {
+            throw new IOException("Encountered exception on waiting result", 
e);
+        }
+    }
+
+    /**
+     * Wait for the result of a lock operation.
+     *
+     * @param result result to wait
+     * @param lockPath path of the lock
+     * @return the result
+     * @throws LockingException when encountered exceptions on the result of 
lock operation
+     */
+    public static <T> T lockResult(Future<T> result, String lockPath) throws 
LockingException {
+        try {
+            return Await.result(result);
+        } catch (LockingException le) {
+            throw le;
+        } catch (Exception e) {
+            throw new LockingException(lockPath, "Encountered exception on 
locking ", e);
+        }
+    }
+
+    /**
+     * Convert the <i>throwable</i> to zookeeper related exceptions.
+     *
+     * @param throwable cause
+     * @param path zookeeper path
+     * @return zookeeper related exceptions
+     */
+    public static Throwable zkException(Throwable throwable, String path) {
+        if (throwable instanceof KeeperException) {
+            return new ZKException("Encountered zookeeper exception on " + 
path, (KeeperException) throwable);
+        } else if (throwable instanceof 
ZooKeeperClient.ZooKeeperConnectionException) {
+            return new ZKException("Encountered zookeeper connection loss on " 
+ path,
+                    KeeperException.Code.CONNECTIONLOSS);
+        } else if (throwable instanceof InterruptedException) {
+            return new DLInterruptedException("Interrupted on operating " + 
path, throwable);
+        } else {
+            return new UnexpectedException("Encountered unexpected exception 
on operatiing " + path, throwable);
+        }
+    }
+
+    /**
+     * Cancel the future. It would interrupt the future.
+     *
+     * @param future future to cancel
+     */
+    public static <T> void cancel(Future<T> future) {
+        future.raise(new FutureCancelledException());
+    }
+
+    /**
+     * Raise an exception to the <i>promise</i> within a given <i>timeout</i> 
period.
+     * If the promise has been satisfied before raising, it won't change the 
state of the promise.
+     *
+     * @param promise promise to raise exception
+     * @param timeout timeout period
+     * @param unit timeout period unit
+     * @param cause cause to raise
+     * @param scheduler scheduler to execute raising exception
+     * @param key the submit key used by the scheduler
+     * @return the promise applied with the raise logic
+     */
+    public static <T> Promise<T> within(final Promise<T> promise,
+                                        final long timeout,
+                                        final TimeUnit unit,
+                                        final Throwable cause,
+                                        final OrderedScheduler scheduler,
+                                        final Object key) {
+        if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || 
promise.isDefined()) {
+            return promise;
+        }
+        // schedule a timeout to raise timeout exception
+        final java.util.concurrent.ScheduledFuture<?> task = 
scheduler.schedule(key, new Runnable() {
+            @Override
+            public void run() {
+                if (!promise.isDefined() && FutureUtils.setException(promise, 
cause)) {
+                    logger.info("Raise exception", cause);
+                }
+            }
+        }, timeout, unit);
+        // when the promise is satisfied, cancel the timeout task
+        promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Try<T> value) {
+                if (!task.cancel(true)) {
+                    logger.debug("Failed to cancel the timeout task");
+                }
+                return BoxedUnit.UNIT;
+            }
+        });
+        return promise;
+    }
+
+    /**
+     * Satisfy the <i>promise</i> with provide value in an ordered scheduler.
+     *
+     * <p>If the promise was already satisfied, nothing will be changed.
+     *
+     * @param promise promise to satisfy
+     * @param value value to satisfy
+     * @param scheduler scheduler to satisfy the promise with provided value
+     * @param key the submit key of the ordered scheduler
+     */
+    public static <T> void setValue(final Promise<T> promise,
+                                    final T value,
+                                    OrderedScheduler scheduler,
+                                    Object key) {
+        scheduler.submit(key, new Runnable() {
+            @Override
+            public void run() {
+                setValue(promise, value);
+            }
+        });
+    }
+
+    /**
+     * Satisfy the <i>promise</i> with provide value.
+     *
+     * <p>If the promise was already satisfied, nothing will be changed.
+     *
+     * @param promise promise to satisfy
+     * @param value value to satisfy
+     * @return true if successfully satisfy the future. false if the promise 
has been satisfied.
+     */
+    public static <T> boolean setValue(Promise<T> promise, T value) {
+        boolean success = promise.updateIfEmpty(new Return<T>(value));
+        if (!success) {
+            logger.info("Result set multiple times. Value = '{}', New = 
'Return({})'",
+                    promise.poll(), value);
+        }
+        return success;
+    }
+
+    /**
+     * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered 
scheduler.
+     *
+     * @param promise promise to satisfy
+     * @param cause cause to satisfy
+     * @param scheduler the scheduler to satisfy the promise
+     * @param key submit key of the ordered scheduler
+     */
+    public static <T> void setException(final Promise<T> promise,
+                                        final Throwable cause,
+                                        OrderedScheduler scheduler,
+                                        Object key) {
+        scheduler.submit(key, new Runnable() {
+            @Override
+            public void run() {
+                setException(promise, cause);
+            }
+        });
+    }
+
+    /**
+     * Satisfy the <i>promise</i> with provided <i>cause</i>.
+     *
+     * @param promise promise to satisfy
+     * @param cause cause to satisfy
+     * @return true if successfully satisfy the future. false if the promise 
has been satisfied.
+     */
+    public static <T> boolean setException(Promise<T> promise, Throwable 
cause) {
+        boolean success = promise.updateIfEmpty(new Throw<T>(cause));
+        if (!success) {
+            logger.info("Result set multiple times. Value = '{}', New = 
'Throw({})'",
+                    promise.poll(), cause);
+        }
+        return success;
+    }
+
+    /**
+     * Ignore exception from the <i>future</i>.
+     *
+     * @param future the original future
+     * @return a transformed future ignores exceptions
+     */
+    public static <T> Promise<Void> ignore(Future<T> future) {
+        return ignore(future, null);
+    }
+
+    /**
+     * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on 
exceptions.
+     *
+     * @param future the original future
+     * @param errorMsg the error message to log on exceptions
+     * @return a transformed future ignores exceptions
+     */
+    public static <T> Promise<Void> ignore(Future<T> future, final String 
errorMsg) {
+        final Promise<Void> promise = new Promise<Void>();
+        future.addEventListener(new FutureEventListener<T>() {
+            @Override
+            public void onSuccess(T value) {
+                setValue(promise, null);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                if (null != errorMsg) {
+                    logger.error(errorMsg, cause);
+                }
+                setValue(promise, null);
+            }
+        });
+        return promise;
+    }
+
+    /**
+     * Create transmit exception from transmit result.
+     *
+     * @param transmitResult
+     *          transmit result (basically bk exception code)
+     * @return transmit exception
+     */
+    public static BKTransmitException transmitException(int transmitResult) {
+        return new BKTransmitException("Failed to write to bookkeeper; Error 
is ("
+            + transmitResult + ") "
+            + BKException.getMessage(transmitResult), transmitResult);
+    }
+
+    public static <T> CompletableFuture<T> newJFuture(Promise<T> promise) {
+        CompletableFuture<T> jFuture = 
org.apache.distributedlog.common.concurrent.FutureUtils.createFuture();
+        jFuture.whenComplete((value, cause) -> {
+            if (null != cause) {
+                if (cause instanceof CompletionException) {
+                    promise.setException(cause.getCause());
+                } else {
+                    promise.setException(cause);
+                }
+            } else {
+                promise.setValue(value);
+            }
+        });
+        return jFuture;
+    }
+
+    public static <T> Future<T> newTFuture(CompletableFuture<T> jFuture) {
+        Promise<T> promise = new Promise<>();
+        jFuture.whenComplete((value, cause) -> {
+            if (null != cause) {
+                if (cause instanceof CompletionException) {
+                    promise.setException(cause.getCause());
+                } else {
+                    promise.setException(cause);
+                }
+            } else {
+                promise.setValue(value);
+            }
+        });
+        return promise;
+    }
+
+    public static <T> Future<List<Future<T>>> newTFutureList(
+            CompletableFuture<List<CompletableFuture<T>>> jFutureList) {
+        Promise<List<Future<T>>> promise = new Promise<>();
+        jFutureList.whenComplete((value, cause) -> {
+            if (null != cause) {
+                if (cause instanceof CompletionException) {
+                    promise.setException(cause.getCause());
+                } else {
+                    promise.setException(cause);
+                }
+            } else {
+                promise.setValue(Lists.transform(
+                    value,
+                    future -> newTFuture(future)));
+            }
+        });
+        return promise;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java
 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java
new file mode 100644
index 0000000..ee00974
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link com.twitter.util.Future} related utils.
+ */
+package org.apache.distributedlog.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml 
b/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000..1afb903
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,19 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+//-->
+<FindBugsFilter>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java
 
b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java
new file mode 100644
index 0000000..dfff56a
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogReaderImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link AsyncLogReaderImpl}.
+ */
+public class TestAsyncLogReaderImpl {
+
+    private final org.apache.distributedlog.api.AsyncLogReader underlying =
+        mock(org.apache.distributedlog.api.AsyncLogReader.class);
+    private final AsyncLogReaderImpl reader = new 
AsyncLogReaderImpl(underlying);
+
+    @Test
+    public void testRead() throws Exception {
+        LogRecordWithDLSN record = mock(LogRecordWithDLSN.class);
+        when(underlying.readNext())
+            .thenReturn(CompletableFuture.completedFuture(record));
+        assertEquals(record, FutureUtils.result(reader.readNext()));
+        verify(underlying, times(1)).readNext();
+    }
+
+    @Test
+    public void testReadBulk() throws Exception {
+        List<LogRecordWithDLSN> records = mock(List.class);
+        when(underlying.readBulk(anyInt()))
+            .thenReturn(CompletableFuture.completedFuture(records));
+        assertEquals(records, FutureUtils.result(reader.readBulk(100)));
+        verify(underlying, times(1)).readBulk(eq(100));
+    }
+
+    @Test
+    public void testReadBulkWithWaitTime() throws Exception {
+        List<LogRecordWithDLSN> records = mock(List.class);
+        when(underlying.readBulk(anyInt(), anyLong(), any(TimeUnit.class)))
+            .thenReturn(CompletableFuture.completedFuture(records));
+        assertEquals(records, FutureUtils.result(reader.readBulk(100, 10, 
TimeUnit.MICROSECONDS)));
+        verify(underlying, times(1))
+            .readBulk(eq(100), eq(10L), eq(TimeUnit.MICROSECONDS));
+    }
+
+    @Test
+    public void testGetStreamName() throws Exception {
+        String streamName = "test-stream-name";
+        when(underlying.getStreamName())
+            .thenReturn(streamName);
+        assertEquals(streamName, reader.getStreamName());
+        verify(underlying, times(1)).getStreamName();
+    }
+
+    @Test
+    public void testAsyncClose() throws Exception {
+        when(underlying.asyncClose())
+            .thenReturn(CompletableFuture.completedFuture(null));
+        FutureUtils.result(reader.asyncClose());
+        verify(underlying, times(1)).asyncClose();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java
 
b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java
new file mode 100644
index 0000000..621e18d
--- /dev/null
+++ 
b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestAsyncLogWriterImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.twitter.util.Futures;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link AsyncLogWriterImpl}.
+ */
+public class TestAsyncLogWriterImpl {
+
+    private final org.apache.distributedlog.api.AsyncLogWriter underlying =
+        mock(org.apache.distributedlog.api.AsyncLogWriter.class);
+    private final AsyncLogWriterImpl writer = new 
AsyncLogWriterImpl(underlying);
+
+    @Test
+    public void testWrite() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        LogRecord record = mock(LogRecord.class);
+        when(underlying.write(any(LogRecord.class)))
+            .thenReturn(CompletableFuture.completedFuture(dlsn));
+        assertEquals(dlsn, FutureUtils.result(writer.write(record)));
+        verify(underlying, times(1)).write(eq(record));
+    }
+
+    @Test
+    public void testWriteBulk() throws Exception {
+        List<LogRecord> records = mock(List.class);
+        List<CompletableFuture<DLSN>> futures = Lists.newArrayList();
+        List<DLSN> dlsns = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            DLSN dlsn = mock(DLSN.class);
+            dlsns.add(dlsn);
+            futures.add(CompletableFuture.completedFuture(dlsn));
+        }
+        when(underlying.writeBulk(any(List.class)))
+            .thenReturn(CompletableFuture.completedFuture(futures));
+        assertEquals(
+            dlsns,
+            FutureUtils.result(Futures.collect(
+                FutureUtils.result(writer.writeBulk(records)))));
+        verify(underlying, times(1)).writeBulk(eq(records));
+    }
+
+    @Test
+    public void testGetLastTxId() throws Exception {
+        long txId = 123456L;
+        when(underlying.getLastTxId()).thenReturn(txId);
+        assertEquals(txId, writer.getLastTxId());
+        verify(underlying, times(1)).getLastTxId();
+    }
+
+    @Test
+    public void testTruncate() throws Exception {
+        DLSN dlsn = mock(DLSN.class);
+        when(underlying.truncate(dlsn))
+            .thenReturn(CompletableFuture.completedFuture(true));
+        assertTrue(FutureUtils.result(writer.truncate(dlsn)));
+        verify(underlying, times(1)).truncate(eq(dlsn));
+    }
+
+    @Test
+    public void testGetStreamName() throws Exception {
+        String streamName = "test-stream-name";
+        when(underlying.getStreamName())
+            .thenReturn(streamName);
+        assertEquals(streamName, writer.getStreamName());
+        verify(underlying, times(1)).getStreamName();
+    }
+
+    @Test
+    public void testAsyncClose() throws Exception {
+        when(underlying.asyncClose())
+            .thenReturn(CompletableFuture.completedFuture(null));
+        FutureUtils.result(writer.asyncClose());
+        verify(underlying, times(1)).asyncClose();
+    }
+
+    @Test
+    public void testAsyncAbort() throws Exception {
+        when(underlying.asyncAbort())
+            .thenReturn(CompletableFuture.completedFuture(null));
+        FutureUtils.result(writer.asyncAbort());
+        verify(underlying, times(1)).asyncAbort();
+    }
+
+}

Reply via email to