http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
deleted file mode 100644
index bf7a1ad..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
+++ /dev/null
@@ -1,925 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.AlreadyClosedException;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.exceptions.StreamNotReadyException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.io.Abortables;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.FatalErrorHandler;
-import org.apache.distributedlog.service.ServerFeatureKeys;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.TimeSequencer;
-import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.TimeoutException;
-import com.twitter.util.Timer;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * Implementation of {@link Stream}.
- */
-public class StreamImpl implements Stream {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(StreamImpl.class);
-
-    /**
-     * The status of the stream.
-     *
-     * <p>The status change of the stream should just go in one direction. If 
a stream hits
-     * any error, the stream should be put in error state. If a stream is in 
error state,
-     * it should be removed and not reused anymore.
-     */
-    public enum StreamStatus {
-        UNINITIALIZED(-1),
-        INITIALIZING(0),
-        INITIALIZED(1),
-        CLOSING(-4),
-        CLOSED(-5),
-        // if a stream is in error state, it should be abort during closing.
-        ERROR(-6);
-
-        final int code;
-
-        StreamStatus(int code) {
-            this.code = code;
-        }
-
-        int getCode() {
-            return code;
-        }
-
-        public static boolean isUnavailable(StreamStatus status) {
-            return StreamStatus.ERROR == status || StreamStatus.CLOSING == 
status || StreamStatus.CLOSED == status;
-        }
-    }
-
-    private final String name;
-    private final Partition partition;
-    private DistributedLogManager manager;
-
-    private volatile AsyncLogWriter writer;
-    private volatile StreamStatus status;
-    private volatile String owner;
-    private volatile Throwable lastException;
-    private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
-
-    private final Promise<Void> closePromise = new Promise<Void>();
-    private final Object txnLock = new Object();
-    private final TimeSequencer sequencer = new TimeSequencer();
-    private final StreamRequestLimiter limiter;
-    private final DynamicDistributedLogConfiguration dynConf;
-    private final DistributedLogConfiguration dlConfig;
-    private final DistributedLogNamespace dlNamespace;
-    private final String clientId;
-    private final OrderedScheduler scheduler;
-    private final ReentrantReadWriteLock closeLock = new 
ReentrantReadWriteLock();
-    private final Feature featureRateLimitDisabled;
-    private final StreamManager streamManager;
-    private final StreamConfigProvider streamConfigProvider;
-    private final FatalErrorHandler fatalErrorHandler;
-    private final long streamProbationTimeoutMs;
-    private final long serviceTimeoutMs;
-    private final long writerCloseTimeoutMs;
-    private final boolean failFastOnStreamNotReady;
-    private final HashedWheelTimer requestTimer;
-    private final Timer futureTimer;
-
-    // Stats
-    private final StatsLogger streamLogger;
-    private final StatsLogger streamExceptionStatLogger;
-    private final StatsLogger limiterStatLogger;
-    private final Counter serviceTimeout;
-    private final OpStatsLogger streamAcquireStat;
-    private final OpStatsLogger writerCloseStatLogger;
-    private final Counter pendingOpsCounter;
-    private final Counter unexpectedExceptions;
-    private final Counter writerCloseTimeoutCounter;
-    private final StatsLogger exceptionStatLogger;
-    private final ConcurrentHashMap<String, Counter> exceptionCounters =
-        new ConcurrentHashMap<String, Counter>();
-    private final Gauge<Number> streamStatusGauge;
-
-    // Since we may create and discard streams at initialization if there's a 
race,
-    // must not do any expensive initialization here (particularly any locking 
or
-    // significant resource allocation etc.).
-    StreamImpl(final String name,
-               final Partition partition,
-               String clientId,
-               StreamManager streamManager,
-               StreamOpStats streamOpStats,
-               ServerConfiguration serverConfig,
-               DistributedLogConfiguration dlConfig,
-               DynamicDistributedLogConfiguration streamConf,
-               FeatureProvider featureProvider,
-               StreamConfigProvider streamConfigProvider,
-               DistributedLogNamespace dlNamespace,
-               OrderedScheduler scheduler,
-               FatalErrorHandler fatalErrorHandler,
-               HashedWheelTimer requestTimer,
-               Timer futureTimer) {
-        this.clientId = clientId;
-        this.dlConfig = dlConfig;
-        this.streamManager = streamManager;
-        this.name = name;
-        this.partition = partition;
-        this.status = StreamStatus.UNINITIALIZED;
-        this.lastException = new IOException("Fail to write record to stream " 
+ name);
-        this.streamConfigProvider = streamConfigProvider;
-        this.dlNamespace = dlNamespace;
-        this.featureRateLimitDisabled = featureProvider.getFeature(
-            
ServerFeatureKeys.SERVICE_RATE_LIMIT_DISABLED.name().toLowerCase());
-        this.scheduler = scheduler;
-        this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs();
-        this.streamProbationTimeoutMs = 
serverConfig.getStreamProbationTimeoutMs();
-        this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs();
-        this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady();
-        this.fatalErrorHandler = fatalErrorHandler;
-        this.dynConf = streamConf;
-        StatsLogger limiterStatsLogger = BroadCastStatsLogger.two(
-            streamOpStats.baseScope("stream_limiter"),
-            streamOpStats.streamRequestScope(partition, "limiter"));
-        this.limiter = new StreamRequestLimiter(name, dynConf, 
limiterStatsLogger, featureRateLimitDisabled);
-        this.requestTimer = requestTimer;
-        this.futureTimer = futureTimer;
-
-        // Stats
-        this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
-        this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
-        this.streamExceptionStatLogger = streamLogger.scope("exceptions");
-        this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
-        StatsLogger streamsStatsLogger = streamOpStats.baseScope("streams");
-        this.streamAcquireStat = 
streamsStatsLogger.getOpStatsLogger("acquire");
-        this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops");
-        this.unexpectedExceptions = 
streamOpStats.baseCounter("unexpected_exceptions");
-        this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
-        this.writerCloseStatLogger = 
streamsStatsLogger.getOpStatsLogger("writer_close");
-        this.writerCloseTimeoutCounter = 
streamsStatsLogger.getCounter("writer_close_timeouts");
-        // Gauges
-        this.streamStatusGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return StreamStatus.UNINITIALIZED.getCode();
-            }
-            @Override
-            public Number getSample() {
-                return status.getCode();
-            }
-        };
-    }
-
-    @Override
-    public String getOwner() {
-        return owner;
-    }
-
-    @Override
-    public String getStreamName() {
-        return name;
-    }
-
-    @Override
-    public DynamicDistributedLogConfiguration getStreamConfiguration() {
-        return dynConf;
-    }
-
-    @Override
-    public Partition getPartition() {
-        return partition;
-    }
-
-    private DistributedLogManager openLog(String name) throws IOException {
-        Optional<DistributedLogConfiguration> dlConf = 
Optional.<DistributedLogConfiguration>absent();
-        Optional<DynamicDistributedLogConfiguration> dynDlConf = 
Optional.of(dynConf);
-        Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger);
-        return dlNamespace.openLog(name, dlConf, dynDlConf, 
perStreamStatsLogger);
-    }
-
-    // Expensive initialization, only called once per stream.
-    @Override
-    public void initialize() throws IOException {
-        manager = openLog(name);
-
-        // Better to avoid registering the gauge multiple times, so do this in 
init
-        // which only gets called once.
-        streamLogger.registerGauge("stream_status", this.streamStatusGauge);
-
-        // Signal initialization is complete, should be last in this method.
-        status = StreamStatus.INITIALIZING;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("Stream:%s, %s, %s Status:%s", name, manager, 
writer, status);
-    }
-
-    @Override
-    public void start() {
-        // acquire the stream
-        acquireStream().addEventListener(new FutureEventListener<Boolean>() {
-                @Override
-                public void onSuccess(Boolean success) {
-                    if (!success) {
-                        // failed to acquire the stream. set the stream in 
error status and close it.
-                        setStreamInErrorStatus();
-                        requestClose("Failed to acquire the ownership");
-                    }
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    // unhandled exceptions
-                    logger.error("Stream {} threw unhandled exception : ", 
name, cause);
-                    // failed to acquire the stream. set the stream in error 
status and close it.
-                    setStreamInErrorStatus();
-                    requestClose("Unhandled exception");
-                }
-            });
-    }
-
-    //
-    // Stats Operations
-    //
-
-    void countException(Throwable t, StatsLogger streamExceptionLogger) {
-        String exceptionName = null == t ? "null" : t.getClass().getName();
-        Counter counter = exceptionCounters.get(exceptionName);
-        if (null == counter) {
-            counter = exceptionStatLogger.getCounter(exceptionName);
-            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, 
counter);
-            if (null != oldCounter) {
-                counter = oldCounter;
-            }
-        }
-        counter.inc();
-        streamExceptionLogger.getCounter(exceptionName).inc();
-    }
-
-    boolean isCriticalException(Throwable cause) {
-        return !(cause instanceof OwnershipAcquireFailedException);
-    }
-
-    //
-    // Service Timeout:
-    // - schedule a timeout function to handle operation timeouts: {@link 
#handleServiceTimeout(String)}
-    // - if the operation is completed within timeout period, cancel the 
timeout.
-    //
-
-    void scheduleTimeout(final StreamOp op) {
-        final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
-            @Override
-            public void run(Timeout timeout) throws Exception {
-                if (!timeout.isCancelled()) {
-                    serviceTimeout.inc();
-                    handleServiceTimeout("Operation " + 
op.getClass().getName() + " timeout");
-                }
-            }
-        }, serviceTimeoutMs, TimeUnit.MILLISECONDS);
-        op.responseHeader().ensure(new Function0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                timeout.cancel();
-                return null;
-            }
-        });
-    }
-
-    /**
-     * Close the stream and schedule cache eviction at some point in the 
future.
-     * We delay this as a way to place the stream in a probationary 
state--cached
-     * in the proxy but unusable.
-     * This mechanism helps the cluster adapt to situations where a proxy has
-     * persistent connectivity/availability issues, because it keeps an 
affected
-     * stream off the proxy for a period of time, hopefully long enough for the
-     * issues to be resolved, or for whoop to kick in and kill the shard.
-     */
-    void handleServiceTimeout(String reason) {
-        synchronized (this) {
-            if (StreamStatus.isUnavailable(status)) {
-                return;
-            }
-            // Mark stream in error state
-            setStreamInErrorStatus();
-        }
-
-        // Async close request, and schedule eviction when its done.
-        Future<Void> closeFuture = requestClose(reason, false /* dont remove 
*/);
-        closeFuture.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Void result) {
-                streamManager.scheduleRemoval(StreamImpl.this, 
streamProbationTimeoutMs);
-                return BoxedUnit.UNIT;
-            }
-        });
-    }
-
-    //
-    // Submit the operation to the stream.
-    //
-
-    /**
-     * Execute the StreamOp. If reacquire is needed, this may initiate 
reacquire and queue the op for
-     * execution once complete.
-     *
-     * @param op
-     *          stream operation to execute.
-     */
-    @Override
-    public void submit(StreamOp op) {
-        try {
-            limiter.apply(op);
-        } catch (OverCapacityException ex) {
-            op.fail(ex);
-            return;
-        }
-
-        // Timeout stream op if requested.
-        if (serviceTimeoutMs > 0) {
-            scheduleTimeout(op);
-        }
-
-        boolean completeOpNow = false;
-        boolean success = true;
-        if (StreamStatus.isUnavailable(status)) {
-            // Stream is closed, fail the op immediately
-            op.fail(new StreamUnavailableException("Stream " + name + " is 
closed."));
-            return;
-        } else if (StreamStatus.INITIALIZED == status && writer != null) {
-            completeOpNow = true;
-            success = true;
-        } else {
-            synchronized (this) {
-                if (StreamStatus.isUnavailable(status)) {
-                    // Stream is closed, fail the op immediately
-                    op.fail(new StreamUnavailableException("Stream " + name + 
" is closed."));
-                    return;
-                } else if (StreamStatus.INITIALIZED == status) {
-                    completeOpNow = true;
-                    success = true;
-                } else if (failFastOnStreamNotReady) {
-                    op.fail(new StreamNotReadyException("Stream " + name + " 
is not ready; status = " + status));
-                    return;
-                } else { // the stream is still initializing
-                    pendingOps.add(op);
-                    pendingOpsCounter.inc();
-                    if (1 == pendingOps.size()) {
-                        if (op instanceof HeartbeatOp) {
-                            ((HeartbeatOp) op).setWriteControlRecord(true);
-                        }
-                    }
-                }
-            }
-        }
-        if (completeOpNow) {
-            executeOp(op, success);
-        }
-    }
-
-    //
-    // Execute operations and handle exceptions on operations
-    //
-
-    /**
-     * Execute the <i>op</i> immediately.
-     *
-     * @param op
-     *          stream operation to execute.
-     * @param success
-     *          whether the operation is success or not.
-     */
-    void executeOp(final StreamOp op, boolean success) {
-        final AsyncLogWriter writer;
-        final Throwable lastException;
-        synchronized (this) {
-            writer = this.writer;
-            lastException = this.lastException;
-        }
-        if (null != writer && success) {
-            op.execute(writer, sequencer, txnLock)
-                    .addEventListener(new FutureEventListener<Void>() {
-                @Override
-                public void onSuccess(Void value) {
-                    // nop
-                }
-                @Override
-                public void onFailure(Throwable cause) {
-                    boolean countAsException = true;
-                    if (cause instanceof DLException) {
-                        final DLException dle = (DLException) cause;
-                        switch (dle.getCode()) {
-                        case FOUND:
-                            assert(cause instanceof 
OwnershipAcquireFailedException);
-                            countAsException = false;
-                            handleExceptionOnStreamOp(op, cause);
-                            break;
-                        case ALREADY_CLOSED:
-                            assert(cause instanceof AlreadyClosedException);
-                            op.fail(cause);
-                            
handleAlreadyClosedException((AlreadyClosedException) cause);
-                            break;
-                        // exceptions that mostly from client (e.g. too large 
record)
-                        case NOT_IMPLEMENTED:
-                        case METADATA_EXCEPTION:
-                        case LOG_EMPTY:
-                        case LOG_NOT_FOUND:
-                        case TRUNCATED_TRANSACTION:
-                        case END_OF_STREAM:
-                        case TRANSACTION_OUT_OF_ORDER:
-                        case INVALID_STREAM_NAME:
-                        case TOO_LARGE_RECORD:
-                        case STREAM_NOT_READY:
-                        case OVER_CAPACITY:
-                            op.fail(cause);
-                            break;
-                        // the DL writer hits exception, simple set the stream 
to error status
-                        // and fail the request
-                        default:
-                            handleExceptionOnStreamOp(op, cause);
-                            break;
-                        }
-                    } else {
-                        handleExceptionOnStreamOp(op, cause);
-                    }
-                    if (countAsException) {
-                        countException(cause, streamExceptionStatLogger);
-                    }
-                }
-            });
-        } else {
-            if (null != lastException) {
-                op.fail(lastException);
-            } else {
-                op.fail(new StreamUnavailableException("Stream " + name + " is 
closed."));
-            }
-        }
-    }
-
-    /**
-     * Handle exception when executing <i>op</i>.
-     *
-     * @param op
-     *          stream operation executing
-     * @param cause
-     *          exception received when executing <i>op</i>
-     */
-    private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) 
{
-        AsyncLogWriter oldWriter = null;
-        boolean statusChanged = false;
-        synchronized (this) {
-            if (StreamStatus.INITIALIZED == status) {
-                oldWriter = setStreamStatus(StreamStatus.ERROR, 
StreamStatus.INITIALIZED, null, cause);
-                statusChanged = true;
-            }
-        }
-        if (statusChanged) {
-            Abortables.asyncAbort(oldWriter, false);
-            if (isCriticalException(cause)) {
-                logger.error("Failed to write data into stream {} : ", name, 
cause);
-            } else {
-                logger.warn("Failed to write data into stream {} : {}", name, 
cause.getMessage());
-            }
-            requestClose("Failed to write data into stream " + name + " : " + 
cause.getMessage());
-        }
-        op.fail(cause);
-    }
-
-    /**
-     * Handling already closed exception.
-     */
-    private void handleAlreadyClosedException(AlreadyClosedException ace) {
-        unexpectedExceptions.inc();
-        logger.error("Encountered unexpected exception when writing data into 
stream {} : ", name, ace);
-        fatalErrorHandler.notifyFatalError();
-    }
-
-    //
-    // Acquire streams
-    //
-
-    Future<Boolean> acquireStream() {
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        final Promise<Boolean> acquirePromise = new Promise<Boolean>();
-        manager.openAsyncLogWriter().addEventListener(
-            FutureUtils.OrderedFutureEventListener.of(new 
FutureEventListener<AsyncLogWriter>() {
-
-            @Override
-            public void onSuccess(AsyncLogWriter w) {
-                onAcquireStreamSuccess(w, stopwatch, acquirePromise);
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                onAcquireStreamFailure(cause, stopwatch, acquirePromise);
-            }
-
-        }, scheduler, getStreamName()));
-        return acquirePromise;
-    }
-
-    private void onAcquireStreamSuccess(AsyncLogWriter w,
-                                        Stopwatch stopwatch,
-                                        Promise<Boolean> acquirePromise) {
-        synchronized (txnLock) {
-            sequencer.setLastId(w.getLastTxId());
-        }
-        AsyncLogWriter oldWriter;
-        Queue<StreamOp> oldPendingOps;
-        boolean success;
-        synchronized (StreamImpl.this) {
-            oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
-                    StreamStatus.INITIALIZING, w, null);
-            oldPendingOps = pendingOps;
-            pendingOps = new ArrayDeque<StreamOp>();
-            success = true;
-        }
-        // check if the stream is allowed to be acquired
-        if (!streamManager.allowAcquire(StreamImpl.this)) {
-            if (null != oldWriter) {
-                Abortables.asyncAbort(oldWriter, true);
-            }
-            int maxAcquiredPartitions = 
dynConf.getMaxAcquiredPartitionsPerProxy();
-            StreamUnavailableException sue = new 
StreamUnavailableException("Stream " + partition.getStream()
-                    + " is not allowed to acquire more than " + 
maxAcquiredPartitions + " partitions");
-            countException(sue, exceptionStatLogger);
-            logger.error("Failed to acquire stream {} because it is 
unavailable : {}",
-                    name, sue.getMessage());
-            synchronized (this) {
-                oldWriter = setStreamStatus(StreamStatus.ERROR,
-                        StreamStatus.INITIALIZED, null, sue);
-                // we don't switch the pending ops since they are already 
switched
-                // when setting the status to initialized
-                success = false;
-            }
-        }
-        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, 
stopwatch, acquirePromise);
-    }
-
-    private void onAcquireStreamFailure(Throwable cause,
-                                        Stopwatch stopwatch,
-                                        Promise<Boolean> acquirePromise) {
-        AsyncLogWriter oldWriter;
-        Queue<StreamOp> oldPendingOps;
-        boolean success;
-        if (cause instanceof AlreadyClosedException) {
-            countException(cause, streamExceptionStatLogger);
-            handleAlreadyClosedException((AlreadyClosedException) cause);
-            return;
-        } else {
-            if (isCriticalException(cause)) {
-                countException(cause, streamExceptionStatLogger);
-                logger.error("Failed to acquire stream {} : ", name, cause);
-            } else {
-                logger.warn("Failed to acquire stream {} : {}", name, 
cause.getMessage());
-            }
-            synchronized (StreamImpl.this) {
-                oldWriter = setStreamStatus(StreamStatus.ERROR,
-                        StreamStatus.INITIALIZING, null, cause);
-                oldPendingOps = pendingOps;
-                pendingOps = new ArrayDeque<StreamOp>();
-                success = false;
-            }
-        }
-        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, 
stopwatch, acquirePromise);
-    }
-
-    /**
-     * Process the pending request after acquired stream.
-     *
-     * @param success whether the acquisition succeed or not
-     * @param oldWriter the old writer to abort
-     * @param oldPendingOps the old pending ops to execute
-     * @param stopwatch stopwatch to measure the time spent on acquisition
-     * @param acquirePromise the promise to complete the acquire operation
-     */
-    void processPendingRequestsAfterAcquire(boolean success,
-                                            AsyncLogWriter oldWriter,
-                                            Queue<StreamOp> oldPendingOps,
-                                            Stopwatch stopwatch,
-                                            Promise<Boolean> acquirePromise) {
-        if (success) {
-            
streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-        } else {
-            
streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-        }
-        for (StreamOp op : oldPendingOps) {
-            executeOp(op, success);
-            pendingOpsCounter.dec();
-        }
-        Abortables.asyncAbort(oldWriter, true);
-        FutureUtils.setValue(acquirePromise, success);
-    }
-
-    //
-    // Stream Status Changes
-    //
-
-    synchronized void setStreamInErrorStatus() {
-        if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
-            return;
-        }
-        this.status = StreamStatus.ERROR;
-    }
-
-    /**
-     * Update the stream status. The changes are only applied when there isn't 
status changed.
-     *
-     * @param newStatus
-     *          new status
-     * @param oldStatus
-     *          old status
-     * @param writer
-     *          new log writer
-     * @param t
-     *          new exception
-     * @return old writer if it exists
-     */
-    synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
-                                                StreamStatus oldStatus,
-                                                AsyncLogWriter writer,
-                                                Throwable t) {
-        if (oldStatus != this.status) {
-            logger.info("Stream {} status already changed from {} -> {} when 
trying to change it to {}",
-                    new Object[] { name, oldStatus, this.status, newStatus });
-            return null;
-        }
-
-        String owner = null;
-        if (t instanceof OwnershipAcquireFailedException) {
-            owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
-        }
-
-        AsyncLogWriter oldWriter = this.writer;
-        this.writer = writer;
-        if (null != owner && owner.equals(clientId)) {
-            unexpectedExceptions.inc();
-            logger.error("I am waiting myself {} to release lock on stream {}, 
so have to shut myself down :",
-                         new Object[] { owner, name, t });
-            // I lost the ownership but left a lock over zookeeper
-            // I should not ask client to redirect to myself again as I can't 
handle it :(
-            // shutdown myself
-            fatalErrorHandler.notifyFatalError();
-            this.owner = null;
-        } else {
-            this.owner = owner;
-        }
-        this.lastException = t;
-        this.status = newStatus;
-        if (StreamStatus.INITIALIZED == newStatus) {
-            streamManager.notifyAcquired(this);
-            logger.info("Inserted acquired stream {} -> writer {}", name, 
this);
-        } else {
-            streamManager.notifyReleased(this);
-            logger.info("Removed acquired stream {} -> writer {}", name, this);
-        }
-        return oldWriter;
-    }
-
-    //
-    // Stream Close Functions
-    //
-
-    void close(DistributedLogManager dlm) {
-        if (null != dlm) {
-            try {
-                dlm.close();
-            } catch (IOException ioe) {
-                logger.warn("Failed to close dlm for {} : ", name, ioe);
-            }
-        }
-    }
-
-    @Override
-    public Future<Void> requestClose(String reason) {
-        return requestClose(reason, true);
-    }
-
-    Future<Void> requestClose(String reason, boolean uncache) {
-        final boolean abort;
-        closeLock.writeLock().lock();
-        try {
-            if (StreamStatus.CLOSING == status
-                || StreamStatus.CLOSED == status) {
-                return closePromise;
-            }
-            logger.info("Request to close stream {} : {}", getStreamName(), 
reason);
-            // if the stream isn't closed from INITIALIZED state, we abort the 
stream instead of closing it.
-            abort = StreamStatus.INITIALIZED != status;
-            status = StreamStatus.CLOSING;
-            streamManager.notifyReleased(this);
-        } finally {
-            closeLock.writeLock().unlock();
-        }
-        // we will fail the requests that are coming in between closing and 
closed only
-        // after the async writer is closed. so we could clear up the lock 
before redirect
-        // them.
-        close(abort, uncache);
-        return closePromise;
-    }
-
-    @Override
-    public void delete() throws IOException {
-        if (null != writer) {
-            Utils.close(writer);
-            synchronized (this) {
-                writer = null;
-                lastException = new StreamUnavailableException("Stream was 
deleted");
-            }
-        }
-        if (null == manager) {
-            throw new UnexpectedException("No stream " + name + " to delete");
-        }
-        manager.delete();
-    }
-
-    /**
-     * Post action executed after closing.
-     */
-    private void postClose(boolean uncache) {
-        closeManagerAndErrorOutPendingRequests();
-        unregisterGauge();
-        if (uncache) {
-            if (null != owner) {
-                long probationTimeoutMs = 2 * 
dlConfig.getZKSessionTimeoutMilliseconds() / 3;
-                streamManager.scheduleRemoval(this, probationTimeoutMs);
-            } else {
-                streamManager.notifyRemoved(this);
-                logger.info("Removed cached stream {}.", getStreamName());
-            }
-        }
-        FutureUtils.setValue(closePromise, null);
-    }
-
-    /**
-     * Shouldn't call close directly. The callers should call #requestClose 
instead
-     *
-     * @param shouldAbort shall we abort the stream instead of closing
-     */
-    private Future<Void> close(boolean shouldAbort, final boolean uncache) {
-        boolean abort;
-        closeLock.writeLock().lock();
-        try {
-            if (StreamStatus.CLOSED == status) {
-                return closePromise;
-            }
-            abort = shouldAbort || (StreamStatus.INITIALIZED != status && 
StreamStatus.CLOSING != status);
-            status = StreamStatus.CLOSED;
-            streamManager.notifyReleased(this);
-        } finally {
-            closeLock.writeLock().unlock();
-        }
-        logger.info("Closing stream {} ...", name);
-        // Close the writers to release the locks before failing the requests
-        Future<Void> closeWriterFuture;
-        if (abort) {
-            closeWriterFuture = Abortables.asyncAbort(writer, true);
-        } else {
-            closeWriterFuture = Utils.asyncClose(writer, true);
-        }
-        // close the manager and error out pending requests after close writer
-        Duration closeWaitDuration;
-        if (writerCloseTimeoutMs <= 0) {
-            closeWaitDuration = Duration.Top();
-        } else {
-            closeWaitDuration = 
Duration.fromMilliseconds(writerCloseTimeoutMs);
-        }
-
-        FutureUtils.stats(
-                closeWriterFuture,
-                writerCloseStatLogger,
-                Stopwatch.createStarted()
-        ).masked().within(futureTimer, closeWaitDuration)
-                .addEventListener(FutureUtils.OrderedFutureEventListener.of(
-                new FutureEventListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        postClose(uncache);
-                    }
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        if (cause instanceof TimeoutException) {
-                            writerCloseTimeoutCounter.inc();
-                        }
-                        postClose(uncache);
-                    }
-                }, scheduler, name));
-        return closePromise;
-    }
-
-    private void closeManagerAndErrorOutPendingRequests() {
-        close(manager);
-        // Failed the pending requests.
-        Queue<StreamOp> oldPendingOps;
-        synchronized (this) {
-            oldPendingOps = pendingOps;
-            pendingOps = new ArrayDeque<StreamOp>();
-        }
-        StreamUnavailableException closingException =
-                new StreamUnavailableException("Stream " + name + " is 
closed.");
-        for (StreamOp op : oldPendingOps) {
-            op.fail(closingException);
-            pendingOpsCounter.dec();
-        }
-        limiter.close();
-        logger.info("Closed stream {}.", name);
-    }
-
-    /**
-     * clean up the gauge to help GC.
-     */
-    private void unregisterGauge(){
-        streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);
-    }
-
-    // Test-only apis
-
-    @VisibleForTesting
-    public int numPendingOps() {
-        Queue<StreamOp> queue = pendingOps;
-        return null == queue ? 0 : queue.size();
-    }
-
-    @VisibleForTesting
-    public StreamStatus getStatus() {
-        return status;
-    }
-
-    @VisibleForTesting
-    public void setStatus(StreamStatus status) {
-        this.status = status;
-    }
-
-    @VisibleForTesting
-    public AsyncLogWriter getWriter() {
-        return writer;
-    }
-
-    @VisibleForTesting
-    public DistributedLogManager getManager() {
-        return manager;
-    }
-
-    @VisibleForTesting
-    public Throwable getLastException() {
-        return lastException;
-    }
-
-    @VisibleForTesting
-    public Future<Void> getCloseFuture() {
-        return closePromise;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
deleted file mode 100644
index d86c538..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManager.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import com.google.common.base.Optional;
-import com.twitter.util.Future;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Manage lifecycle of streams.
- *
- * <p>StreamManager is responsible for creating, destroying, and keeping track 
of Stream objects.
- *
- * <p>Stream objects, which are managed by StreamManager and created by 
StreamFactory, are essentially the
- * per stream request handlers, responsible fo dispatching ex. write requests 
to an underlying AsyncLogWriter,
- * managing stream lock, interpreting exceptions, error conditions, and etc.
- */
-public interface StreamManager {
-
-    /**
-     * Get a cached stream, returning null if it doesnt exist.
-     * @param stream name
-     * @return the cached stream
-     */
-    Stream getStream(String stream);
-
-    /**
-     * Get a cached stream and create a new one if it doesnt exist.
-     * @param streamName stream name
-     * @param start whether to start the stream after it is created.
-     * @return future satisfied once close complete
-     */
-    Stream getOrCreateStream(String streamName, boolean start) throws 
IOException;
-
-    /**
-     * Asynchronously create a new stream.
-     * @param stream
-     * @return Future satisfied once the stream is created
-     */
-    Future<Void> createStreamAsync(String stream);
-
-    /**
-     * Is acquiring stream allowed?
-     *
-     * @param stream
-     *          stream instance
-     * @return true if it is allowed to acquire this stream, otherwise false.
-     */
-    boolean allowAcquire(Stream stream);
-
-    /**
-     * Notify the manager that a stream was acquired.
-     * @param stream being acquired
-     */
-    void notifyAcquired(Stream stream);
-
-    /**
-     * Notify the manager that a stream was released.
-     * @param stream being released
-     */
-    void notifyReleased(Stream stream);
-
-    /**
-     * Notify the manager that a stream was completely removed.
-     * @param stream being uncached
-     * @return whether the stream existed or not
-     */
-    boolean notifyRemoved(Stream stream);
-
-    /**
-     * Asynchronous delete method.
-     * @param streamName stream name
-     * @return future satisfied once delete complete
-     */
-    Future<Void> deleteAndRemoveAsync(String streamName);
-
-    /**
-     * Asynchronous close and uncache method.
-     * @param streamName stream name
-     * @return future satisfied once close and uncache complete
-     */
-    Future<Void> closeAndRemoveAsync(String streamName);
-
-    /**
-     * Close and uncache after delayMs.
-     * @param stream to remove
-     */
-    void scheduleRemoval(Stream stream, long delayMs);
-
-    /**
-     * Close all stream.
-     * @return future satisfied all streams closed
-     */
-    Future<List<Void>> closeStreams();
-
-    /**
-     * Return map with stream ownership info.
-     * @param regex for filtering streams
-     * @return map containing ownership info
-     */
-    Map<String, String> getStreamOwnershipMap(Optional<String> regex);
-
-    /**
-     * Number of acquired streams.
-     * @return number of acquired streams
-     */
-    int numAcquired();
-
-    /**
-     * Number of cached streams.
-     * @return number of cached streams
-     */
-    int numCached();
-
-    /**
-     * Is the stream denoted by streamName in the acquired state.
-     * @return true if the stream is in the acquired state
-     */
-    boolean isAcquired(String streamName);
-
-    /**
-     * Close manager and disallow further activity.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
deleted file mode 100644
index 5d54738..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.exceptions.ServiceUnavailableException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.service.streamset.PartitionMap;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.util.ConfUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StreamManagerImpl is the default implementation responsible for creating, 
destroying, and keeping track
- * of Streams.
- *
- * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, 
is reposible simply for creating
- * a stream object in isolation from the rest of the system. We pass a 
StreamFactory in instead of simply
- * creating StreamImpl's ourselves in order to inject dependencies without 
bloating the StreamManagerImpl
- * constructor.
- */
-public class StreamManagerImpl implements StreamManager {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(StreamManagerImpl.class);
-
-    private final ConcurrentHashMap<String, Stream> streams =
-        new ConcurrentHashMap<String, Stream>();
-    private final AtomicInteger numCached = new AtomicInteger(0);
-
-    private final ConcurrentHashMap<String, Stream> acquiredStreams =
-        new ConcurrentHashMap<String, Stream>();
-    private final AtomicInteger numAcquired = new AtomicInteger(0);
-
-    //
-    // Partitions
-    //
-    private final StreamPartitionConverter partitionConverter;
-    private final PartitionMap cachedPartitions = new PartitionMap();
-    private final PartitionMap acquiredPartitions = new PartitionMap();
-
-    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
-    private final ScheduledExecutorService executorService;
-    private final DistributedLogConfiguration dlConfig;
-    private final StreamConfigProvider streamConfigProvider;
-    private final String clientId;
-    private boolean closed = false;
-    private final StreamFactory streamFactory;
-    private final DistributedLogNamespace dlNamespace;
-
-    public StreamManagerImpl(String clientId,
-                             DistributedLogConfiguration dlConfig,
-                             ScheduledExecutorService executorService,
-                             StreamFactory streamFactory,
-                             StreamPartitionConverter partitionConverter,
-                             StreamConfigProvider streamConfigProvider,
-                             DistributedLogNamespace dlNamespace) {
-        this.clientId = clientId;
-        this.executorService = executorService;
-        this.streamFactory = streamFactory;
-        this.partitionConverter = partitionConverter;
-        this.dlConfig = dlConfig;
-        this.streamConfigProvider = streamConfigProvider;
-        this.dlNamespace = dlNamespace;
-    }
-
-    private DynamicDistributedLogConfiguration getDynConf(String streamName) {
-        Optional<DynamicDistributedLogConfiguration> dynDlConf =
-                streamConfigProvider.getDynamicStreamConfig(streamName);
-        if (dynDlConf.isPresent()) {
-            return dynDlConf.get();
-        } else {
-            return ConfUtils.getConstDynConf(dlConfig);
-        }
-    }
-
-    @Override
-    public boolean allowAcquire(Stream stream) {
-        return acquiredPartitions.addPartition(
-                stream.getPartition(),
-                
stream.getStreamConfiguration().getMaxAcquiredPartitionsPerProxy());
-    }
-
-    /**
-     * Must be enqueued to an executor to avoid deadlocks (close and 
execute-op both
-     * try to acquire the same read-write lock).
-     */
-    @Override
-    public Future<Void> deleteAndRemoveAsync(final String stream) {
-        final Promise<Void> result = new Promise<Void>();
-        java.util.concurrent.Future<?> scheduleFuture = schedule(new 
Runnable() {
-            @Override
-            public void run() {
-                result.become(doDeleteAndRemoveAsync(stream));
-            }
-        }, 0);
-        if (null == scheduleFuture) {
-            return Future.exception(
-                new ServiceUnavailableException("Couldn't schedule a delete 
task."));
-        }
-        return result;
-    }
-
-    /**
-     * Must be enqueued to an executor to avoid deadlocks (close and 
execute-op both
-     * try to acquire the same read-write lock).
-     */
-    @Override
-    public Future<Void> closeAndRemoveAsync(final String streamName) {
-        final Promise<Void> releasePromise = new Promise<Void>();
-        java.util.concurrent.Future<?> scheduleFuture = schedule(new 
Runnable() {
-            @Override
-            public void run() {
-                releasePromise.become(doCloseAndRemoveAsync(streamName));
-            }
-        }, 0);
-        if (null == scheduleFuture) {
-            return Future.exception(
-                new ServiceUnavailableException("Couldn't schedule a release 
task."));
-        }
-        return releasePromise;
-    }
-
-    @Override
-    public Future<Void> createStreamAsync(final String stream) {
-        final Promise<Void> createPromise = new Promise<Void>();
-        java.util.concurrent.Future<?> scheduleFuture = schedule(new 
Runnable() {
-            @Override
-            public void run() {
-                try {
-                    dlNamespace.createLog(stream);
-                    createPromise.setValue(null);
-                } catch (Exception e) {
-                    createPromise.setException(e);
-                }
-            }
-        }, 0);
-        if (null == scheduleFuture) {
-            return Future.exception(
-                new ServiceUnavailableException("Couldn't schedule a create 
task."));
-        }
-        return createPromise;
-    }
-
-    @Override
-    public void notifyReleased(Stream stream) {
-        acquiredPartitions.removePartition(stream.getPartition());
-        if (acquiredStreams.remove(stream.getStreamName(), stream)) {
-            numAcquired.getAndDecrement();
-        }
-    }
-
-    @Override
-    public void notifyAcquired(Stream stream) {
-        if (null == acquiredStreams.put(stream.getStreamName(), stream)) {
-            numAcquired.getAndIncrement();
-        }
-    }
-
-    @Override
-    public boolean notifyRemoved(Stream stream) {
-        cachedPartitions.removePartition(stream.getPartition());
-        if (streams.remove(stream.getStreamName(), stream)) {
-            numCached.getAndDecrement();
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public Map<String, String> getStreamOwnershipMap(Optional<String> regex) {
-        Map<String, String> ownershipMap = new HashMap<String, String>();
-        for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) {
-            String name = entry.getKey();
-            if (regex.isPresent() && !name.matches(regex.get())) {
-                continue;
-            }
-            Stream stream = entry.getValue();
-            if (null == stream) {
-                continue;
-            }
-            String owner = stream.getOwner();
-            if (null == owner) {
-                ownershipMap.put(name, clientId);
-            }
-        }
-        return ownershipMap;
-    }
-
-    @Override
-    public Stream getStream(String stream) {
-        return streams.get(stream);
-    }
-
-    @Override
-    public Stream getOrCreateStream(String streamName, boolean start) throws 
IOException {
-        Stream stream = streams.get(streamName);
-        if (null == stream) {
-            closeLock.readLock().lock();
-            try {
-                if (closed) {
-                    return null;
-                }
-                DynamicDistributedLogConfiguration dynConf = 
getDynConf(streamName);
-                int maxCachedPartitions = 
dynConf.getMaxCachedPartitionsPerProxy();
-
-                // get partition from the stream name
-                Partition partition = partitionConverter.convert(streamName);
-
-                // add partition to cached map
-                if (!cachedPartitions.addPartition(partition, 
maxCachedPartitions)) {
-                    throw new StreamUnavailableException("Stream " + streamName
-                            + " is not allowed to cache more than " + 
maxCachedPartitions + " partitions");
-                }
-
-                stream = newStream(streamName, dynConf);
-                Stream oldWriter = streams.putIfAbsent(streamName, stream);
-                if (null != oldWriter) {
-                    stream = oldWriter;
-                } else {
-                    numCached.getAndIncrement();
-                    logger.info("Inserted mapping stream name {} -> stream 
{}", streamName, stream);
-                    stream.initialize();
-                    if (start) {
-                        stream.start();
-                    }
-                }
-            } finally {
-                closeLock.readLock().unlock();
-            }
-        }
-        return stream;
-    }
-
-    @Override
-    public Future<List<Void>> closeStreams() {
-        int numAcquired = acquiredStreams.size();
-        int numCached = streams.size();
-        logger.info("Closing all acquired streams : acquired = {}, cached = 
{}.",
-            numAcquired, numCached);
-        Set<Stream> streamsToClose = new HashSet<Stream>();
-        streamsToClose.addAll(streams.values());
-        return closeStreams(streamsToClose, Optional.<RateLimiter>absent());
-    }
-
-    @Override
-    public void scheduleRemoval(final Stream stream, long delayMs) {
-        if (delayMs > 0) {
-            logger.info("Scheduling removal of stream {} from cache after {} 
sec.",
-                    stream.getStreamName(), delayMs);
-        }
-        schedule(new Runnable() {
-            @Override
-            public void run() {
-                if (notifyRemoved(stream)) {
-                    logger.info("Removed cached stream {} after probation.", 
stream.getStreamName());
-                } else {
-                    logger.info("Cached stream {} already removed.", 
stream.getStreamName());
-                }
-            }
-        }, delayMs);
-    }
-
-    @Override
-    public int numAcquired() {
-        return numAcquired.get();
-    }
-
-    @Override
-    public int numCached() {
-        return numCached.get();
-    }
-
-    @Override
-    public boolean isAcquired(String streamName) {
-        return acquiredStreams.containsKey(streamName);
-    }
-
-    @Override
-    public void close() {
-        closeLock.writeLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            closed = true;
-        } finally {
-            closeLock.writeLock().unlock();
-        }
-    }
-
-    private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, 
Optional<RateLimiter> rateLimiter) {
-        if (streamsToClose.isEmpty()) {
-            logger.info("No streams to close.");
-            List<Void> emptyList = new ArrayList<Void>();
-            return Future.value(emptyList);
-        }
-        List<Future<Void>> futures = new 
ArrayList<Future<Void>>(streamsToClose.size());
-        for (Stream stream : streamsToClose) {
-            if (rateLimiter.isPresent()) {
-                rateLimiter.get().acquire();
-            }
-            futures.add(stream.requestClose("Close Streams"));
-        }
-        return Future.collect(futures);
-    }
-
-    private Stream newStream(String name, DynamicDistributedLogConfiguration 
streamConf) {
-        return streamFactory.create(name, streamConf, this);
-    }
-
-    public Future<Void> doCloseAndRemoveAsync(final String streamName) {
-        Stream stream = streams.get(streamName);
-        if (null == stream) {
-            logger.info("No stream {} to release.", streamName);
-            return Future.value(null);
-        } else {
-            return stream.requestClose("release ownership");
-        }
-    }
-
-    /**
-     * Dont schedule if we're closed - closeLock is acquired to close, so if 
we acquire the
-     * lock and discover we're not closed, we won't schedule.
-     */
-    private java.util.concurrent.Future<?> schedule(Runnable runnable, long 
delayMs) {
-        closeLock.readLock().lock();
-        try {
-            if (closed) {
-                return null;
-            } else if (delayMs > 0) {
-                return executorService.schedule(runnable, delayMs, 
TimeUnit.MILLISECONDS);
-            } else {
-                return executorService.submit(runnable);
-            }
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to schedule task {} in {} ms : ",
-                    new Object[] { runnable, delayMs, ree });
-            return null;
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    private Future<Void> doDeleteAndRemoveAsync(final String streamName) {
-        Stream stream = streams.get(streamName);
-        if (null == stream) {
-            logger.warn("No stream {} to delete.", streamName);
-            return Future.exception(new UnexpectedException("No stream " + 
streamName + " to delete."));
-        } else {
-            Future<Void> result;
-            logger.info("Deleting stream {}, {}", streamName, stream);
-            try {
-                stream.delete();
-                result = stream.requestClose("Stream Deleted");
-            } catch (IOException e) {
-                logger.error("Failed on removing stream {} : ", streamName, e);
-                result = Future.exception(e);
-            }
-            return result;
-        }
-    }
-
-    @VisibleForTesting
-    public ConcurrentHashMap<String, Stream> getCachedStreams() {
-        return streams;
-    }
-
-    @VisibleForTesting
-    public ConcurrentHashMap<String, Stream> getAcquiredStreams() {
-        return acquiredStreams;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
deleted file mode 100644
index d0b8de4..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-
-/**
- * An operation applied to a stream.
- */
-public interface StreamOp {
-    /**
-     * Execute a stream op with the supplied writer.
-     *
-     * @param writer active writer for applying the change
-     * @param sequencer sequencer used for generating transaction id for 
stream operations
-     * @param txnLock transaction lock to guarantee ordering of transaction id
-     * @return a future satisfied when the operation completes execution
-     */
-    Future<Void> execute(AsyncLogWriter writer,
-                         Sequencer sequencer,
-                         Object txnLock);
-
-    /**
-     * Invoked before the stream op is executed.
-     */
-    void preExecute() throws DLException;
-
-    /**
-     * Return the response header (containing the status code etc.).
-     *
-     * @return A future containing the response header or the exception
-     *      encountered by the op if it failed.
-     */
-    Future<ResponseHeader> responseHeader();
-
-    /**
-     * Abort the operation with the givem exception.
-     */
-    void fail(Throwable t);
-
-    /**
-     * Return the stream name.
-     */
-    String streamName();
-
-    /**
-     * Stopwatch gives the start time of the operation.
-     */
-    Stopwatch stopwatch();
-
-    /**
-     * Compute checksum from arguments.
-     */
-    Long computeChecksum();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
deleted file mode 100644
index f3fc610..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * Encapsulate stream op stats construction to make it easier to access stream
- * op stats consistently from different scopes.
- */
-public class StreamOpStats {
-    private final StatsLogger baseStatsLogger;
-    private final StatsLogger requestStatsLogger;
-    private final StatsLogger recordsStatsLogger;
-    private final StatsLogger requestDeniedStatsLogger;
-    private final StatsLogger streamStatsLogger;
-
-    public StreamOpStats(StatsLogger statsLogger,
-                         StatsLogger perStreamStatsLogger) {
-        this.baseStatsLogger = statsLogger;
-        this.requestStatsLogger = statsLogger.scope("request");
-        this.recordsStatsLogger = statsLogger.scope("records");
-        this.requestDeniedStatsLogger = statsLogger.scope("denied");
-        this.streamStatsLogger = perStreamStatsLogger;
-    }
-
-    public StatsLogger baseStatsLogger(String opName) {
-        return baseStatsLogger;
-    }
-
-    public Counter baseCounter(String opName) {
-        return baseStatsLogger.getCounter(opName);
-    }
-
-    public StatsLogger baseScope(String opName) {
-        return baseStatsLogger.scope(opName);
-    }
-
-    public OpStatsLogger requestLatencyStat(String opName) {
-        return requestStatsLogger.getOpStatsLogger(opName);
-    }
-
-    public StatsLogger requestScope(String scopeName) {
-        return requestStatsLogger.scope(scopeName);
-    }
-
-    public Counter scopedRequestCounter(String opName, String counterName) {
-        return requestScope(opName).getCounter(counterName);
-    }
-
-    public Counter requestCounter(String counterName) {
-        return requestStatsLogger.getCounter(counterName);
-    }
-
-    public Counter requestPendingCounter(String counterName) {
-        return requestCounter(counterName);
-    }
-
-    public Counter requestDeniedCounter(String counterName) {
-        return requestDeniedStatsLogger.getCounter(counterName);
-    }
-
-    public Counter recordsCounter(String counterName) {
-        return recordsStatsLogger.getCounter(counterName);
-    }
-
-    public StatsLogger streamRequestStatsLogger(Partition partition) {
-        return BroadCastStatsLogger.masterslave(
-            streamStatsLogger.scope(partition.getStream()).scope("partition")
-                .scope(partition.getPaddedId()), 
streamStatsLogger.scope(partition.getStream())
-                .scope("aggregate"));
-    }
-
-    public StatsLogger streamRequestScope(Partition partition, String 
scopeName) {
-        return streamRequestStatsLogger(partition).scope(scopeName);
-    }
-
-    public OpStatsLogger streamRequestLatencyStat(Partition partition, String 
opName) {
-        return streamRequestStatsLogger(partition).getOpStatsLogger(opName);
-    }
-
-    public Counter streamRequestCounter(Partition partition, String opName, 
String counterName) {
-        return streamRequestScope(partition, opName).getCounter(counterName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
deleted file mode 100644
index 7a38d14..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ProtocolUtils;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Operation to truncate a log stream.
- */
-public class TruncateOp extends AbstractWriteOp {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(TruncateOp.class);
-
-    private final Counter deniedTruncateCounter;
-    private final DLSN dlsn;
-    private final AccessControlManager accessControlManager;
-
-    public TruncateOp(String stream,
-                      DLSN dlsn,
-                      StatsLogger statsLogger,
-                      StatsLogger perStreamStatsLogger,
-                      Long checksum,
-                      Feature checksumDisabledFeature,
-                      AccessControlManager accessControlManager) {
-        super(stream, requestStat(statsLogger, "truncate"), checksum, 
checksumDisabledFeature);
-        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, 
perStreamStatsLogger);
-        this.deniedTruncateCounter = 
streamOpStats.requestDeniedCounter("truncate");
-        this.accessControlManager = accessControlManager;
-        this.dlsn = dlsn;
-    }
-
-    @Override
-    public Long computeChecksum() {
-        return ProtocolUtils.truncateOpCRC32(stream, dlsn);
-    }
-
-    @Override
-    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
-                                              Sequencer sequencer,
-                                              Object txnLock) {
-        if (!stream.equals(writer.getStreamName())) {
-            logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, 
{}", stream, writer.getStreamName());
-            return Future.exception(new IllegalStateException("The stream 
mapping is incorrect, fail the request"));
-        }
-        return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, 
WriteResponse>() {
-            @Override
-            public WriteResponse apply(Boolean v1) {
-                return ResponseUtils.writeSuccess();
-            }
-        });
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!accessControlManager.allowTruncate(stream)) {
-            deniedTruncateCounter.inc();
-            throw new RequestDeniedException(stream, "truncate");
-        }
-        super.preExecute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
deleted file mode 100644
index c4bdcc2..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.RequestDeniedException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ProtocolUtils;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Operation to write a single record to a log stream.
- */
-public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(WriteOp.class);
-
-    private final byte[] payload;
-    private final boolean isRecordSet;
-
-    // Stats
-    private final Counter deniedWriteCounter;
-    private final Counter successRecordCounter;
-    private final Counter failureRecordCounter;
-    private final Counter redirectRecordCounter;
-    private final OpStatsLogger latencyStat;
-    private final Counter bytes;
-    private final Counter writeBytes;
-
-    private final byte dlsnVersion;
-    private final AccessControlManager accessControlManager;
-
-    public WriteOp(String stream,
-                   ByteBuffer data,
-                   StatsLogger statsLogger,
-                   StatsLogger perStreamStatsLogger,
-                   StreamPartitionConverter streamPartitionConverter,
-                   ServerConfiguration conf,
-                   byte dlsnVersion,
-                   Long checksum,
-                   boolean isRecordSet,
-                   Feature checksumDisabledFeature,
-                   AccessControlManager accessControlManager) {
-        super(stream, requestStat(statsLogger, "write"), checksum, 
checksumDisabledFeature);
-        payload = new byte[data.remaining()];
-        data.get(payload);
-        this.isRecordSet = isRecordSet;
-
-        final Partition partition = streamPartitionConverter.convert(stream);
-        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, 
perStreamStatsLogger);
-        this.successRecordCounter = streamOpStats.recordsCounter("success");
-        this.failureRecordCounter = streamOpStats.recordsCounter("failure");
-        this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
-        this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write");
-        this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes");
-        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, 
"write");
-        this.bytes = streamOpStats.streamRequestCounter(partition, "write", 
"bytes");
-
-        this.dlsnVersion = dlsnVersion;
-        this.accessControlManager = accessControlManager;
-
-        final long size = getPayloadSize();
-        result().addEventListener(new FutureEventListener<WriteResponse>() {
-            @Override
-            public void onSuccess(WriteResponse response) {
-                if (response.getHeader().getCode() == StatusCode.SUCCESS) {
-                    
latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
-                    bytes.add(size);
-                    writeBytes.add(size);
-                } else {
-                    
latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
-                }
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                
latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
-            }
-        });
-    }
-
-    @Override
-    public long getPayloadSize() {
-      return payload.length;
-    }
-
-    @Override
-    public Long computeChecksum() {
-        return ProtocolUtils.writeOpCRC32(stream, payload);
-    }
-
-    @Override
-    public void preExecute() throws DLException {
-        if (!accessControlManager.allowWrite(stream)) {
-            deniedWriteCounter.inc();
-            throw new RequestDeniedException(stream, "write");
-        }
-        super.preExecute();
-    }
-
-    @Override
-    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
-                                              Sequencer sequencer,
-                                              Object txnLock) {
-        if (!stream.equals(writer.getStreamName())) {
-            logger.error("Write: Stream Name Mismatch in the Stream Map {}, 
{}", stream, writer.getStreamName());
-            return Future.exception(new IllegalStateException("The stream 
mapping is incorrect, fail the request"));
-        }
-
-        long txnId;
-        Future<DLSN> writeResult;
-        synchronized (txnLock) {
-            txnId = sequencer.nextId();
-            LogRecord record = new LogRecord(txnId, payload);
-            if (isRecordSet) {
-                record.setRecordSet();
-            }
-            writeResult = writer.write(record);
-        }
-        return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
-            @Override
-            public WriteResponse apply(DLSN value) {
-                successRecordCounter.inc();
-                return 
ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion));
-            }
-        });
-    }
-
-    @Override
-    protected void fail(ResponseHeader header) {
-        if (StatusCode.FOUND == header.getCode()) {
-            redirectRecordCounter.inc();
-        } else {
-            failureRecordCounter.inc();
-        }
-        super.fail(header);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
deleted file mode 100644
index e411b420..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/WriteOpWithPayload.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-/**
- * A write operation with payload.
- */
-public interface WriteOpWithPayload {
-
-    // Return the payload size in bytes
-    long getPayloadSize();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
deleted file mode 100644
index fcaee35..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/AdminOp.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.admin;
-
-import org.apache.distributedlog.exceptions.DLException;
-import com.twitter.util.Future;
-
-/**
- * Admin operation interface.
- */
-public interface AdminOp<RespT> {
-
-    /**
-     * Invoked before the stream op is executed.
-     */
-    void preExecute() throws DLException;
-
-    /**
-     * Execute the operation.
-     *
-     * @return the future represents the response of the operation
-     */
-    Future<RespT> execute();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
 
b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
deleted file mode 100644
index 89a2566..0000000
--- 
a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/admin/CreateOp.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream.admin;
-
-import static 
org.apache.distributedlog.service.stream.AbstractStreamOp.requestStat;
-
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-/**
- * Operation to create log stream.
- */
-public class CreateOp extends StreamAdminOp {
-
-  public CreateOp(String stream,
-                  StatsLogger statsLogger,
-                  StreamManager streamManager,
-                  Long checksum,
-                  Feature checksumEnabledFeature) {
-    super(stream,
-            streamManager,
-            requestStat(statsLogger, "create"),
-            checksum,
-            checksumEnabledFeature);
-  }
-
-  @Override
-  protected Future<WriteResponse> executeOp() {
-    Future<Void> result = streamManager.createStreamAsync(stream);
-    return result.map(new AbstractFunction1<Void, WriteResponse>() {
-      @Override
-      public WriteResponse apply(Void value) {
-        return ResponseUtils.writeSuccess();
-      }
-    });
-  }
-}

Reply via email to