http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
deleted file mode 100644
index 287bd6d..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
-import com.twitter.util.TimerTask;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import scala.Function0;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Ordered Scheduler. It is thread pool based {@link 
ScheduledExecutorService}, additionally providing
- * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks 
submitted by same <i>key</i>
- * will be executed in order.
- * <p>
- * The scheduler is comprised of multiple {@link 
MonitoredScheduledThreadPoolExecutor}s. Each
- * {@link MonitoredScheduledThreadPoolExecutor} is a single thread executor. 
Normal task submissions will
- * be submitted to executors in a random manner to guarantee load balancing. 
Keyed task submissions (e.g
- * {@link OrderedScheduler#apply(Object, Function0)} will be submitted to a 
dedicated executor based on
- * the hash value of submit <i>key</i>.
- *
- * <h3>Metrics</h3>
- *
- * <h4>Per Executor Metrics</h4>
- *
- * Metrics about individual executors are exposed via {@link 
Builder#perExecutorStatsLogger}
- * under <i>`scope`/`name`-executor-`id`-0</i>. `name` is the scheduler name 
provided by {@link Builder#name}
- * while `id` is the index of this executor in the pool. And corresponding 
stats of future pool of
- * that executor are exposed under 
<i>`scope`/`name`-executor-`id`-0/futurepool</i>.
- * <p>
- * See {@link MonitoredScheduledThreadPoolExecutor} and {@link 
MonitoredFuturePool} for per executor metrics
- * exposed.
- *
- * <h4>Aggregated Metrics</h4>
- * <ul>
- * <li>task_pending_time: opstats. measuring the characteristics about the 
time that tasks spent on
- * waiting being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the 
time that tasks spent on
- * executing.
- * <li>futurepool/task_pending_time: opstats. measuring the characteristics 
about the time that tasks spent
- * on waiting in future pool being executed.
- * <li>futurepool/task_execution_time: opstats. measuring the characteristics 
about the time that tasks spent
- * on executing.
- * <li>futurepool/task_enqueue_time: opstats. measuring the characteristics 
about the time that tasks spent on
- * submitting to future pool.
- * <li>futurepool/tasks_pending: gauge. how many tasks are pending in this 
future pool.
- * </ul>
- */
-public class OrderedScheduler implements ScheduledExecutorService {
-
-    /**
-     * Create a builder to build scheduler.
-     *
-     * @return scheduler builder
-     */
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder for {@link OrderedScheduler}.
-     */
-    public static class Builder {
-
-        private String name = "OrderedScheduler";
-        private int corePoolSize = -1;
-        private ThreadFactory threadFactory = null;
-        private boolean traceTaskExecution = false;
-        private long traceTaskExecutionWarnTimeUs = Long.MAX_VALUE;
-        private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-        private StatsLogger perExecutorStatsLogger = NullStatsLogger.INSTANCE;
-
-        /**
-         * Set the name of this scheduler. It would be used as part of stats 
scope and thread name.
-         *
-         * @param name
-         *          name of the scheduler.
-         * @return scheduler builder
-         */
-        public Builder name(String name) {
-            this.name = name;
-            return this;
-        }
-
-        /**
-         * Set the number of threads to be used in this scheduler.
-         *
-         * @param corePoolSize the number of threads to keep in the pool, even
-         *        if they are idle
-         * @return scheduler builder
-         */
-        public Builder corePoolSize(int corePoolSize) {
-            this.corePoolSize = corePoolSize;
-            return this;
-        }
-
-        /**
-         * Set the thread factory that the scheduler uses to create a new 
thread.
-         *
-         * @param threadFactory the factory to use when the executor
-         *        creates a new thread
-         * @return scheduler builder
-         */
-        public Builder threadFactory(ThreadFactory threadFactory) {
-            this.threadFactory = threadFactory;
-            return this;
-        }
-
-        /**
-         * Enable/Disable exposing task execution stats.
-         *
-         * @param trace
-         *          flag to enable/disable exposing task execution stats.
-         * @return scheduler builder
-         */
-        public Builder traceTaskExecution(boolean trace) {
-            this.traceTaskExecution = trace;
-            return this;
-        }
-
-        /**
-         * Enable/Disable logging slow tasks whose execution time is above 
<code>timeUs</code>.
-         *
-         * @param timeUs
-         *          slow task execution time threshold in us.
-         * @return scheduler builder.
-         */
-        public Builder traceTaskExecutionWarnTimeUs(long timeUs) {
-            this.traceTaskExecutionWarnTimeUs = timeUs;
-            return this;
-        }
-
-        /**
-         * Expose the aggregated stats over <code>statsLogger</code>.
-         *
-         * @param statsLogger
-         *          stats logger to receive aggregated stats.
-         * @return scheduler builder
-         */
-        public Builder statsLogger(StatsLogger statsLogger) {
-            this.statsLogger = statsLogger;
-            return this;
-        }
-
-        /**
-         * Expose stats of individual executors over 
<code>perExecutorStatsLogger</code>.
-         * Each executor's stats will be exposed under a sub-scope 
`name`-executor-`id`-0.
-         * `name` is the scheduler name, while `id` is the index of the 
scheduler in the pool.
-         *
-         * @param perExecutorStatsLogger
-         *          stats logger to receive per executor stats.
-         * @return scheduler builder
-         */
-        public Builder perExecutorStatsLogger(StatsLogger 
perExecutorStatsLogger) {
-            this.perExecutorStatsLogger = perExecutorStatsLogger;
-            return this;
-        }
-
-        /**
-         * Build the ordered scheduler.
-         *
-         * @return ordered scheduler
-         */
-        public OrderedScheduler build() {
-            if (corePoolSize <= 0) {
-                corePoolSize = Runtime.getRuntime().availableProcessors();
-            }
-            if (null == threadFactory) {
-                threadFactory = Executors.defaultThreadFactory();
-            }
-
-            return new OrderedScheduler(
-                    name,
-                    corePoolSize,
-                    threadFactory,
-                    traceTaskExecution,
-                    traceTaskExecutionWarnTimeUs,
-                    statsLogger,
-                    perExecutorStatsLogger);
-        }
-
-    }
-
-    protected final String name;
-    protected final int corePoolSize;
-    protected final MonitoredScheduledThreadPoolExecutor[] executors;
-    protected final MonitoredFuturePool[] futurePools;
-    protected final Random random;
-
-    private OrderedScheduler(String name,
-                             int corePoolSize,
-                             ThreadFactory threadFactory,
-                             boolean traceTaskExecution,
-                             long traceTaskExecutionWarnTimeUs,
-                             StatsLogger statsLogger,
-                             StatsLogger perExecutorStatsLogger) {
-        this.name = name;
-        this.corePoolSize = corePoolSize;
-        this.executors = new 
MonitoredScheduledThreadPoolExecutor[corePoolSize];
-        this.futurePools = new MonitoredFuturePool[corePoolSize];
-        for (int i = 0; i < corePoolSize; i++) {
-            ThreadFactory tf = new ThreadFactoryBuilder()
-                    .setNameFormat(name + "-executor-" + i + "-%d")
-                    .setThreadFactory(threadFactory)
-                    .build();
-            StatsLogger broadcastStatsLogger =
-                    
BroadCastStatsLogger.masterslave(perExecutorStatsLogger.scope("executor-" + i), 
statsLogger);
-            executors[i] = new MonitoredScheduledThreadPoolExecutor(
-                    1, tf, broadcastStatsLogger, traceTaskExecution);
-            futurePools[i] = new MonitoredFuturePool(
-                    new ExecutorServiceFuturePool(executors[i]),
-                    broadcastStatsLogger.scope("futurepool"),
-                    traceTaskExecution,
-                    traceTaskExecutionWarnTimeUs);
-        }
-        this.random = new Random(System.currentTimeMillis());
-    }
-
-    protected MonitoredScheduledThreadPoolExecutor chooseExecutor() {
-        return corePoolSize == 1 ? executors[0] : 
executors[random.nextInt(corePoolSize)];
-    }
-
-    protected MonitoredScheduledThreadPoolExecutor chooseExecutor(Object key) {
-        return corePoolSize == 1 ? executors[0] :
-                executors[MathUtils.signSafeMod(Objects.hashCode(key), 
corePoolSize)];
-    }
-
-    protected FuturePool chooseFuturePool(Object key) {
-        return corePoolSize == 1 ? futurePools[0] :
-                futurePools[MathUtils.signSafeMod(Objects.hashCode(key), 
corePoolSize)];
-    }
-
-    protected FuturePool chooseFuturePool() {
-        return corePoolSize == 1 ? futurePools[0] : 
futurePools[random.nextInt(corePoolSize)];
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
-        return chooseExecutor().schedule(command, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
-        return chooseExecutor().schedule(callable, delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
-                                                  long initialDelay, long 
period, TimeUnit unit) {
-        return chooseExecutor().scheduleAtFixedRate(command, initialDelay, 
period, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
-                                                     long initialDelay, long 
delay, TimeUnit unit) {
-        return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, 
delay, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void shutdown() {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            // Unregister gauges
-            executor.unregisterGauges();
-            executor.shutdown();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Runnable> shutdownNow() {
-        List<Runnable> runnables = new ArrayList<Runnable>();
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            runnables.addAll(executor.shutdownNow());
-        }
-        return runnables;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isShutdown() {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            if (!executor.isShutdown()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isTerminated() {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            if (!executor.isTerminated()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit)
-            throws InterruptedException {
-        for (MonitoredScheduledThreadPoolExecutor executor : executors) {
-            if (!executor.awaitTermination(timeout, unit)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-        return chooseExecutor().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-        return chooseExecutor().submit(task, result);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Future<?> submit(Runnable task) {
-        return chooseExecutor().submit(task);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks)
-            throws InterruptedException {
-        return chooseExecutor().invokeAll(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit)
-            throws InterruptedException {
-        return chooseExecutor().invokeAll(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-            throws InterruptedException, ExecutionException {
-        return chooseExecutor().invokeAny(tasks);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
-            throws InterruptedException, ExecutionException, TimeoutException {
-        return chooseExecutor().invokeAny(tasks, timeout, unit);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void execute(Runnable command) {
-        chooseExecutor().execute(command);
-    }
-
-    // Ordered Functions
-
-    /**
-     * Return a future pool used by <code>key</code>.
-     *
-     * @param key
-     *          key to order in the future pool
-     * @return future pool
-     */
-    public FuturePool getFuturePool(Object key) {
-        return chooseFuturePool(key);
-    }
-
-    /**
-     * Execute the <code>function</code> in the executor that assigned by 
<code>key</code>.
-     *
-     * @see com.twitter.util.Future
-     * @param key key of the <i>function</i> to run
-     * @param function function to run
-     * @return future representing the result of the <i>function</i>
-     */
-    public <T> com.twitter.util.Future<T> apply(Object key, Function0<T> 
function) {
-        return chooseFuturePool(key).apply(function);
-    }
-
-    /**
-     * Execute the <code>function</code> by the scheduler. It would be 
submitted to any executor randomly.
-     *
-     * @param function function to run
-     * @return future representing the result of the <i>function</i>
-     */
-    public <T> com.twitter.util.Future<T> apply(Function0<T> function) {
-        return chooseFuturePool().apply(function);
-    }
-
-    public ScheduledFuture<?> schedule(Object key, Runnable command, long 
delay, TimeUnit unit) {
-        return chooseExecutor(key).schedule(command, delay, unit);
-    }
-
-    public ScheduledFuture<?> scheduleAtFixedRate(Object key,
-                                                  Runnable command,
-                                                  long initialDelay,
-                                                  long period,
-                                                  TimeUnit unit) {
-        return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, 
period, unit);
-    }
-
-    public Future<?> submit(Object key, Runnable command) {
-        return chooseExecutor(key).submit(command);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
deleted file mode 100644
index 41c28a3..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-/**
- * A simple limiter interface which tracks acquire/release of permits, for
- * example for tracking outstanding writes.
- */
-public interface PermitLimiter {
-
-    public static PermitLimiter NULL_PERMIT_LIMITER = new PermitLimiter() {
-        @Override
-        public boolean acquire() {
-            return true;
-        }
-        @Override
-        public void release(int permits) {
-        }
-
-        @Override
-        public void close() {
-
-        }
-    };
-
-    /**
-     * Acquire a permit.
-     *
-     * @return true if successfully acquire a permit, otherwise false.
-     */
-    boolean acquire();
-
-    /**
-     * Release a permit.
-     */
-    void release(int permits);
-
-    /**
-     * Close the resources created by the limiter
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
deleted file mode 100644
index 6a6d574..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-public interface PermitManager {
-
-    public static interface Permit {
-        static final Permit ALLOWED = new Permit() {
-            @Override
-            public boolean isAllowed() {
-                return true;
-            }
-        };
-        boolean isAllowed();
-    }
-
-    public static PermitManager UNLIMITED_PERMIT_MANAGER = new PermitManager() 
{
-        @Override
-        public Permit acquirePermit() {
-            return Permit.ALLOWED;
-        }
-
-        @Override
-        public void releasePermit(Permit permit) {
-            // nop
-        }
-
-        @Override
-        public boolean allowObtainPermits() {
-            return true;
-        }
-
-        @Override
-        public boolean disallowObtainPermits(Permit permit) {
-            return false;
-        }
-
-        @Override
-        public void close() {
-            // nop
-        }
-
-    };
-
-    /**
-     * Obetain a permit from permit manager.
-     *
-     * @return permit.
-     */
-    Permit acquirePermit();
-
-    /**
-     * Release a given permit.
-     *
-     * @param permit
-     *          permit to release
-     */
-    void releasePermit(Permit permit);
-
-    /**
-     * Allow obtaining permits.
-     */
-    boolean allowObtainPermits();
-
-    /**
-     * Disallow obtaining permits. Disallow needs to be performed under the 
context
-     * of <i>permit</i>.
-     *
-     * @param permit
-     *          permit context to disallow
-     */
-    boolean disallowObtainPermits(Permit permit);
-
-    /**
-     * Release the resources
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java
deleted file mode 100644
index 3565f98..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.RetryPolicy;
-
-/**
- * Utils for {@link org.apache.bookkeeper.zookeeper.RetryPolicy}
- */
-public class RetryPolicyUtils {
-
-    /**
-     * Infinite retry policy
-     */
-    public static final RetryPolicy DEFAULT_INFINITE_RETRY_POLICY = 
infiniteRetry(200, 2000);
-
-    /**
-     * Create an infinite retry policy with backoff time between 
<i>baseBackOffTimeMs</i> and
-     * <i>maxBackoffTimeMs</i>.
-     *
-     * @param baseBackoffTimeMs base backoff time in milliseconds
-     * @param maxBackoffTimeMs maximum backoff time in milliseconds
-     * @return an infinite retry policy
-     */
-    public static RetryPolicy infiniteRetry(long baseBackoffTimeMs, long 
maxBackoffTimeMs) {
-        return new BoundExponentialBackoffRetryPolicy(baseBackoffTimeMs, 
maxBackoffTimeMs, Integer.MAX_VALUE);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java
deleted file mode 100644
index d139a80..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.util.Function0;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Future;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.runtime.BoxedUnit;
-
-/**
- * Acts like a future pool, but collects failed apply calls into a queue to be 
applied
- * in-order on close. This happens either in the close thread or after close 
is called,
- * in the last operation to complete execution.
- * Ops submitted after close will not be scheduled, so its important to ensure 
no more
- * ops will be applied once close has been called.
- */
-public class SafeQueueingFuturePool<T> {
-
-    static final Logger LOG = 
LoggerFactory.getLogger(SafeQueueingFuturePool.class);
-
-    private boolean closed;
-    private int outstanding;
-    private ConcurrentLinkedQueue<Function0<T>> queue;
-    private FuturePool orderedFuturePool;
-
-    public SafeQueueingFuturePool(FuturePool orderedFuturePool) {
-        this.closed = false;
-        this.outstanding = 0;
-        this.queue = new ConcurrentLinkedQueue<Function0<T>>();
-        this.orderedFuturePool = orderedFuturePool;
-    }
-
-    public synchronized Future<T> apply(final Function0<T> fn) {
-        Preconditions.checkNotNull(fn);
-        if (closed) {
-            return Future.exception(new RejectedExecutionException("Operation 
submitted to closed SafeQueueingFuturePool"));
-        }
-        ++outstanding;
-        queue.add(fn);
-        Future<T> result = orderedFuturePool.apply(new Function0<T>() {
-            @Override
-            public T apply() {
-                return queue.poll().apply();
-            }
-            @Override
-            public String toString() {
-                return fn.toString();
-            }
-        }).ensure(new Function0<BoxedUnit>() {
-            public BoxedUnit apply() {
-                if (decrOutstandingAndCheckDone()) {
-                    applyAll();
-                }
-                return null;
-            }
-        });
-        return result;
-    }
-
-    private synchronized boolean decrOutstandingAndCheckDone() {
-        return --outstanding == 0 && closed;
-    }
-
-    public void close() {
-        final boolean done;
-        synchronized (this) {
-            if (closed) {
-                return;
-            }
-            closed = true;
-            done = (outstanding == 0);
-        }
-        if (done) {
-            applyAll();
-        }
-    }
-
-    private void applyAll() {
-        if (!queue.isEmpty()) {
-            LOG.info("Applying {} items", queue.size());
-        }
-        while (!queue.isEmpty()) {
-            queue.poll().apply();
-        }
-    }
-
-    public synchronized int size() {
-        return queue.size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java
deleted file mode 100644
index 9f756f0..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class SchedulerUtils {
-
-    static final Logger logger = LoggerFactory.getLogger(SchedulerUtils.class);
-
-    public static void shutdownScheduler(ExecutorService service, long 
timeout, TimeUnit timeUnit) {
-        if (null == service) {
-            return;
-        }
-        service.shutdown();
-        try {
-            service.awaitTermination(timeout, timeUnit);
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted when shutting down scheduler : ", e);
-        }
-        service.shutdownNow();
-    }
-
-    public static void shutdownScheduler(OrderedSafeExecutor service, long 
timeout, TimeUnit timeUnit) {
-        if (null == service) {
-            return;
-        }
-        service.shutdown();
-        try {
-            service.awaitTermination(timeout, timeUnit);
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted when shutting down scheduler : ", e);
-        }
-        service.forceShutdown(timeout, timeUnit);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java
deleted file mode 100644
index 7ec50ba..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-/**
- * Sequencer generating transaction id.
- */
-public interface Sequencer {
-
-    /**
-     * Return next transaction id generated by the sequencer.
-     *
-     * @return next transaction id generated by the sequencer.
-     */
-    long nextId();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
deleted file mode 100644
index 4086a1e..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple counter based {@link PermitLimiter}.
- *
- * <h3>Metrics</h3>
- * <ul>
- * <li> `permits`: gauge. how many permits are acquired right now?
- * <li> `permits`/*: opstats. the characteristics about number of permits 
already acquired on each acquires.
- * <li> `acquireFailure`: counter. how many acquires failed? failure means it 
already reached maximum permits
- * when trying to acquire.
- * </ul>
- */
-public class SimplePermitLimiter implements PermitLimiter {
-
-    final Counter acquireFailureCounter;
-    final OpStatsLogger permitsMetric;
-    final AtomicInteger permits;
-    final int permitsMax;
-    final boolean darkmode;
-    final Feature disableWriteLimitFeature;
-    private StatsLogger statsLogger = null;
-    private Gauge<Number> permitsGauge = null;
-    private String permitsGaugeLabel = "";
-
-    public SimplePermitLimiter(boolean darkmode, int permitsMax, StatsLogger 
statsLogger,
-                               boolean singleton, Feature 
disableWriteLimitFeature) {
-        this.permits = new AtomicInteger(0);
-        this.permitsMax = permitsMax;
-        this.darkmode = darkmode;
-        this.disableWriteLimitFeature = disableWriteLimitFeature;
-
-        // stats
-        if (singleton) {
-            this.statsLogger = statsLogger;
-            this.permitsGauge = new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-                @Override
-                public Number getSample() {
-                    return permits.get();
-                }
-            };
-            this.permitsGaugeLabel = "permits";
-            statsLogger.registerGauge(permitsGaugeLabel, permitsGauge);
-        }
-        acquireFailureCounter = statsLogger.getCounter("acquireFailure");
-        permitsMetric = statsLogger.getOpStatsLogger("permits");
-    }
-
-    public boolean isDarkmode() {
-        return darkmode || disableWriteLimitFeature.isAvailable();
-    }
-
-    @Override
-    public boolean acquire() {
-        permitsMetric.registerSuccessfulEvent(permits.get());
-        if (permits.incrementAndGet() <= permitsMax || isDarkmode()) {
-            return true;
-        } else {
-            acquireFailureCounter.inc();
-            permits.decrementAndGet();
-            return false;
-        }
-    }
-
-    @Override
-    public void release(int permitsToRelease) {
-        permits.addAndGet(-permitsToRelease);
-    }
-
-    @Override
-    public void close() {
-        unregisterGauge();
-    }
-
-    @VisibleForTesting
-    public int getPermits() {
-        return permits.get();
-    }
-
-    public void unregisterGauge() {
-        if (this.statsLogger != null && this.permitsGauge != null) {
-            this.statsLogger.unregisterGauge(permitsGaugeLabel, permitsGauge);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java
deleted file mode 100644
index 216d5ea..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-/**
- * The {@code Sizable} interface is to provide the capability of calculating 
size
- * of any objects.
- */
-public interface Sizable {
-    /**
-     * Calculate the size for this instance.
-     *
-     * @return size of the instance.
-     */
-    long size();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java
deleted file mode 100644
index 96e564e..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.twitter.distributedlog.DistributedLogConstants;
-
-/**
- * Time based sequencer. It generated non-decreasing transaction id using 
milliseconds.
- * It isn't thread-safe. The caller takes the responsibility on 
synchronization.
- */
-public class TimeSequencer implements Sequencer {
-
-    private long lastId = DistributedLogConstants.INVALID_TXID;
-
-    public void setLastId(long lastId) {
-        this.lastId = lastId;
-    }
-
-    @Override
-    public long nextId() {
-        lastId = Math.max(lastId, System.currentTimeMillis());
-        return lastId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java
deleted file mode 100644
index 422bbda..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.annotations.Beta;
-import com.twitter.util.Future;
-
-/**
- * Util class represents a transaction
- */
-@Beta
-public interface Transaction<OpResult> {
-
-    /**
-     * An operation executed in a transaction.
-     */
-    interface Op<OpResult> {
-
-        /**
-         * Execute after the transaction succeeds
-         */
-        void commit(OpResult r);
-
-        /**
-         * Execute after the transaction fails
-         */
-        void abort(Throwable t, OpResult r);
-
-    }
-
-    /**
-     * Listener on the result of an {@link 
com.twitter.distributedlog.util.Transaction.Op}.
-     *
-     * @param <OpResult>
-     */
-    interface OpListener<OpResult> {
-
-        /**
-         * Trigger on operation committed.
-         *
-         * @param r
-         *          result to return
-         */
-        void onCommit(OpResult r);
-
-        /**
-         * Trigger on operation aborted.
-         *
-         * @param t
-         *          reason to abort
-         */
-        void onAbort(Throwable t);
-    }
-
-    /**
-     * Add the operation to current transaction.
-     *
-     * @param operation
-     *          operation to execute under current transaction
-     */
-    void addOp(Op<OpResult> operation);
-
-    /**
-     * Execute the current transaction. If the transaction succeed, all 
operations will be
-     * committed (via {@link 
com.twitter.distributedlog.util.Transaction.Op#commit(Object)}.
-     * Otherwise, all operations will be aborted (via {@link 
Op#abort(Throwable, Object)}).
-     *
-     * @return future representing the result of transaction execution.
-     */
-    Future<Void> execute();
-
-    /**
-     * Abort current transaction. If this is called and the transaction 
haven't been executed by
-     * {@link #execute()}, it would abort all operations. If the transaction 
has been executed,
-     * the behavior is left up to implementation - if transaction is 
cancellable, the {@link #abort(Throwable)}
-     * could attempt to cancel it.
-     *
-     * @param reason reason to abort the transaction
-     */
-    void abort(Throwable reason);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
deleted file mode 100644
index fce9bcd..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java
+++ /dev/null
@@ -1,607 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.function.VoidFunctions;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-/**
- * Basic Utilities.
- */
-public class Utils {
-
-    private static final Logger logger = LoggerFactory.getLogger(Utils.class);
-
-    /**
-     * Current time from some arbitrary time base in the past, counting in
-     * nanoseconds, and not affected by settimeofday or similar system clock
-     * changes. This is appropriate to use when computing how much longer to
-     * wait for an interval to expire.
-     *
-     * @return current time in nanoseconds.
-     */
-    public static long nowInNanos() {
-        return System.nanoTime();
-    }
-
-    /**
-     * Current time from some fixed base time - so useful for cross machine
-     * comparison
-     *
-     * @return current time in milliseconds.
-     */
-    public static long nowInMillis() {
-        return System.currentTimeMillis();
-    }
-
-    /**
-     * Milliseconds elapsed since the time specified, the input is nanoTime
-     * the only conversion happens when computing the elapsed time
-     *
-     * @param startMsecTime the start of the interval that we are measuring
-     * @return elapsed time in milliseconds.
-     */
-    public static long elapsedMSec(long startMsecTime) {
-        return (System.currentTimeMillis() - startMsecTime);
-    }
-
-    public static boolean randomPercent(double percent) {
-        return (Math.random() * 100.0) <= percent;
-    }
-
-    /**
-     * Synchronously create zookeeper path recursively and optimistically.
-     *
-     * @see #zkAsyncCreateFullPathOptimistic(ZooKeeperClient, String, byte[], 
List, CreateMode)
-     * @param zkc Zookeeper client
-     * @param path Zookeeper full path
-     * @param data Zookeeper data
-     * @param acl Acl of the zk path
-     * @param createMode Create mode of zk path
-     * @throws ZooKeeperClient.ZooKeeperConnectionException
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public static void zkCreateFullPathOptimistic(
-        ZooKeeperClient zkc,
-        String path,
-        byte[] data,
-        final List<ACL> acl,
-        final CreateMode createMode)
-        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, 
InterruptedException {
-        try {
-            Await.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, 
createMode));
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            throw zkce;
-        } catch (KeeperException ke) {
-            throw ke;
-        } catch (InterruptedException ie) {
-            throw ie;
-        } catch (RuntimeException rte) {
-            throw rte;
-        } catch (Exception exc) {
-            throw new RuntimeException("Unexpected Exception", exc);
-        }
-    }
-
-    /**
-     * Asynchronously create zookeeper path recursively and optimistically.
-     *
-     * @param zkc Zookeeper client
-     * @param pathToCreate  Zookeeper full path
-     * @param parentPathShouldNotCreate The recursive creation should stop if 
this path doesn't exist
-     * @param data Zookeeper data
-     * @param acl Acl of the zk path
-     * @param createMode Create mode of zk path
-     * @param callback Callback
-     * @param ctx Context object
-     */
-    public static void zkAsyncCreateFullPathOptimisticRecursive(
-        final ZooKeeperClient zkc,
-        final String pathToCreate,
-        final Optional<String> parentPathShouldNotCreate,
-        final byte[] data,
-        final List<ACL> acl,
-        final CreateMode createMode,
-        final AsyncCallback.StringCallback callback,
-        final Object ctx) {
-        try {
-            zkc.get().create(pathToCreate, data, acl, createMode, new 
AsyncCallback.StringCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, 
String name) {
-
-                    if (rc != KeeperException.Code.NONODE.intValue()) {
-                        callback.processResult(rc, path, ctx, name);
-                        return;
-                    }
-
-                    // Since we got a nonode, it means that my parents may not 
exist
-                    // ephemeral nodes can't have children so Create mode is 
always
-                    // persistent parents
-                    int lastSlash = pathToCreate.lastIndexOf('/');
-                    if (lastSlash <= 0) {
-                        callback.processResult(rc, path, ctx, name);
-                        return;
-                    }
-                    String parent = pathToCreate.substring(0, lastSlash);
-                    if (parentPathShouldNotCreate.isPresent() && 
Objects.equal(parentPathShouldNotCreate.get(), parent)) {
-                        // we should stop here
-                        callback.processResult(rc, path, ctx, name);
-                        return;
-                    }
-                    zkAsyncCreateFullPathOptimisticRecursive(zkc, parent, 
parentPathShouldNotCreate, new byte[0], acl,
-                            CreateMode.PERSISTENT, new 
AsyncCallback.StringCallback() {
-                                @Override
-                                public void processResult(int rc, String path, 
Object ctx, String name) {
-                                    if (rc == 
KeeperException.Code.OK.intValue() || rc == 
KeeperException.Code.NODEEXISTS.intValue()) {
-                                        // succeeded in creating the parent, 
now create the original path
-                                        
zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, 
parentPathShouldNotCreate,
-                                                data, acl, createMode, 
callback, ctx);
-                                    } else {
-                                        callback.processResult(rc, path, ctx, 
name);
-                                    }
-                                }
-                            }, ctx);
-                }
-            }, ctx);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
-            
callback.processResult(DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE,
 zkce.getMessage(), ctx, pathToCreate);
-        } catch (InterruptedException ie) {
-            
callback.processResult(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE,
 ie.getMessage(), ctx, pathToCreate);
-        }
-    }
-
-    /**
-     * Asynchronously create zookeeper path recursively and optimistically.
-     *
-     * @param zkc Zookeeper client
-     * @param pathToCreate  Zookeeper full path
-     * @param data Zookeeper data
-     * @param acl Acl of the zk path
-     * @param createMode Create mode of zk path
-     */
-    public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic(
-        final ZooKeeperClient zkc,
-        final String pathToCreate,
-        final byte[] data,
-        final List<ACL> acl,
-        final CreateMode createMode) {
-        Optional<String> parentPathShouldNotCreate = Optional.absent();
-        return zkAsyncCreateFullPathOptimistic(
-                zkc,
-                pathToCreate,
-                parentPathShouldNotCreate,
-                data,
-                acl,
-                createMode);
-    }
-
-    /**
-     * Asynchronously create zookeeper path recursively and optimistically
-     *
-     * @param zkc Zookeeper client
-     * @param pathToCreate  Zookeeper full path
-     * @param parentPathShouldNotCreate zookeeper parent path should not be 
created
-     * @param data Zookeeper data
-     * @param acl Acl of the zk path
-     * @param createMode Create mode of zk path
-     */
-    public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic(
-        final ZooKeeperClient zkc,
-        final String pathToCreate,
-        final Optional<String> parentPathShouldNotCreate,
-        final byte[] data,
-        final List<ACL> acl,
-        final CreateMode createMode) {
-        final Promise<BoxedUnit> result = new Promise<BoxedUnit>();
-
-        zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, 
parentPathShouldNotCreate,
-                data, acl, createMode, new AsyncCallback.StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, String 
name) {
-                handleKeeperExceptionCode(rc, path, result);
-            }
-        }, result);
-
-        return result;
-    }
-
-    /**
-     * Asynchronously create zookeeper path recursively and optimistically.
-     *
-     * @param zkc Zookeeper client
-     * @param pathToCreate  Zookeeper full path
-     * @param data Zookeeper data
-     * @param acl Acl of the zk path
-     * @param createMode Create mode of zk path
-     */
-    public static Future<BoxedUnit> zkAsyncCreateFullPathOptimisticAndSetData(
-        final ZooKeeperClient zkc,
-        final String pathToCreate,
-        final byte[] data,
-        final List<ACL> acl,
-        final CreateMode createMode) {
-        final Promise<BoxedUnit> result = new Promise<BoxedUnit>();
-
-        try {
-            zkc.get().setData(pathToCreate, data, -1, new 
AsyncCallback.StatCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
-                    if (rc != KeeperException.Code.NONODE.intValue()) {
-                        handleKeeperExceptionCode(rc, path, result);
-                        return;
-                    }
-
-                    Optional<String> parentPathShouldNotCreate = 
Optional.absent();
-                    zkAsyncCreateFullPathOptimisticRecursive(zkc, 
pathToCreate, parentPathShouldNotCreate,
-                            data, acl, createMode, new 
AsyncCallback.StringCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object 
ctx, String name) {
-                            handleKeeperExceptionCode(rc, path, result);
-                        }
-                    }, result);
-                }
-            }, result);
-        } catch (Exception exc) {
-            result.setException(exc);
-        }
-
-        return result;
-    }
-
-    private static void handleKeeperExceptionCode(int rc, String 
pathOrMessage, Promise<BoxedUnit> result) {
-        if (KeeperException.Code.OK.intValue() == rc) {
-            result.setValue(BoxedUnit.UNIT);
-        } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE 
== rc) {
-            result.setException(new 
ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage));
-        } else if 
(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
-            result.setException(new DLInterruptedException(pathOrMessage));
-        } else {
-            
result.setException(KeeperException.create(KeeperException.Code.get(rc), 
pathOrMessage));
-        }
-    }
-
-    public static Future<Versioned<byte[]>> zkGetData(ZooKeeperClient zkc, 
String path, boolean watch) {
-        ZooKeeper zk;
-        try {
-            zk = zkc.get();
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        }
-        return zkGetData(zk, path, watch);
-    }
-
-    /**
-     * Retrieve data from zookeeper <code>path</code>.
-     *
-     * @param path
-     *          zookeeper path to retrieve data
-     * @param watch
-     *          whether to watch the path
-     * @return future representing the versioned value. null version or null 
value means path doesn't exist.
-     */
-    public static Future<Versioned<byte[]>> zkGetData(ZooKeeper zk, String 
path, boolean watch) {
-        final Promise<Versioned<byte[]>> promise = new 
Promise<Versioned<byte[]>>();
-        zk.getData(path, watch, new AsyncCallback.DataCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, byte[] 
data, Stat stat) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    if (null == stat) {
-                        promise.setValue(new Versioned<byte[]>(null, null));
-                    } else {
-                        promise.setValue(new Versioned<byte[]>(data, new 
ZkVersion(stat.getVersion())));
-                    }
-                } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                    promise.setValue(new Versioned<byte[]>(null, null));
-                } else {
-                    
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                }
-            }
-        }, null);
-        return promise;
-    }
-
-    public static Future<ZkVersion> zkSetData(ZooKeeperClient zkc, String 
path, byte[] data, ZkVersion version) {
-        ZooKeeper zk;
-        try {
-            zk = zkc.get();
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        }
-        return zkSetData(zk, path, data, version);
-    }
-
-    /**
-     * Set <code>data</code> to zookeeper <code>path</code>.
-     *
-     * @param zk
-     *          zookeeper client
-     * @param path
-     *          path to set data
-     * @param data
-     *          data to set
-     * @param version
-     *          version used to set data
-     * @return future representing the version after this operation.
-     */
-    public static Future<ZkVersion> zkSetData(ZooKeeper zk, String path, 
byte[] data, ZkVersion version) {
-        final Promise<ZkVersion> promise = new Promise<ZkVersion>();
-        zk.setData(path, data, version.getZnodeVersion(), new 
AsyncCallback.StatCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, Stat 
stat) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    promise.updateIfEmpty(new Return<ZkVersion>(new 
ZkVersion(stat.getVersion())));
-                    return;
-                }
-                promise.updateIfEmpty(new Throw<ZkVersion>(
-                        KeeperException.create(KeeperException.Code.get(rc))));
-                return;
-            }
-        }, null);
-        return promise;
-    }
-
-    public static Future<Void> zkDelete(ZooKeeperClient zkc, String path, 
ZkVersion version) {
-        ZooKeeper zk;
-        try {
-            zk = zkc.get();
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        }
-        return zkDelete(zk, path, version);
-    }
-
-    /**
-     * Delete the given <i>path</i> from zookeeper.
-     *
-     * @param zk
-     *          zookeeper client
-     * @param path
-     *          path to delete
-     * @param version
-     *          version used to set data
-     * @return future representing the version after this operation.
-     */
-    public static Future<Void> zkDelete(ZooKeeper zk, String path, ZkVersion 
version) {
-        final Promise<Void> promise = new Promise<Void>();
-        zk.delete(path, version.getZnodeVersion(), new 
AsyncCallback.VoidCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx) {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    promise.updateIfEmpty(new Return<Void>(null));
-                    return;
-                }
-                promise.updateIfEmpty(new Throw<Void>(
-                        KeeperException.create(KeeperException.Code.get(rc))));
-                return;
-            }
-        }, null);
-        return promise;
-    }
-
-    /**
-     * Delete the given <i>path</i> from zookeeper.
-     *
-     * @param zkc
-     *          zookeeper client
-     * @param path
-     *          path to delete
-     * @param version
-     *          version used to set data
-     * @return future representing if the delete is successful. Return true if 
the node is deleted,
-     * false if the node doesn't exist, otherwise future will throw exception
-     *
-     */
-    public static Future<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, 
String path, ZkVersion version) {
-        ZooKeeper zk;
-        try {
-            zk = zkc.get();
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, path));
-        }
-        final Promise<Boolean> promise = new Promise<Boolean>();
-        zk.delete(path, version.getZnodeVersion(), new 
AsyncCallback.VoidCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx) {
-                if (KeeperException.Code.OK.intValue() == rc ) {
-                    promise.setValue(true);
-                } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                    promise.setValue(false);
-                } else {
-                    
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
-                }
-            }
-        }, null);
-        return promise;
-    }
-
-    public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable,
-                                          boolean swallowIOException) {
-        if (null == closeable) {
-            return Future.Void();
-        } else if (swallowIOException) {
-            return FutureUtils.ignore(closeable.asyncClose());
-        } else {
-            return closeable.asyncClose();
-        }
-    }
-
-    /**
-     * Sync zookeeper client on given <i>path</i>.
-     *
-     * @param zkc
-     *          zookeeper client
-     * @param path
-     *          path to sync
-     * @return zookeeper client after sync
-     * @throws IOException
-     */
-    public static ZooKeeper sync(ZooKeeperClient zkc, String path) throws 
IOException {
-        ZooKeeper zk;
-        try {
-            zk = zkc.get();
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Interrupted on checking if log " 
+ path + " exists", e);
-        }
-        final CountDownLatch syncLatch = new CountDownLatch(1);
-        final AtomicInteger syncResult = new AtomicInteger(0);
-        zk.sync(path, new AsyncCallback.VoidCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx) {
-                syncResult.set(rc);
-                syncLatch.countDown();
-            }
-        }, null);
-        try {
-            syncLatch.await();
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Interrupted on syncing zookeeper 
connection", e);
-        }
-        if (KeeperException.Code.OK.intValue() != syncResult.get()) {
-            throw new ZKException("Error syncing zookeeper connection ",
-                    KeeperException.Code.get(syncResult.get()));
-        }
-        return zk;
-    }
-
-    /**
-     * Close a closeable.
-     *
-     * @param closeable
-     *          closeable to close
-     */
-    public static void close(@Nullable Closeable closeable) {
-        if (null == closeable) {
-            return;
-        }
-        try {
-            Closeables.close(closeable, true);
-        } catch (IOException e) {
-            // no-op. the exception is swallowed.
-        }
-    }
-
-    /**
-     * Close an async closeable.
-     *
-     * @param closeable
-     *          closeable to close
-     */
-    public static void close(@Nullable AsyncCloseable closeable)
-            throws IOException {
-        if (null == closeable) {
-            return;
-        }
-        FutureUtils.result(closeable.asyncClose());
-    }
-
-    /**
-     * Close an async closeable.
-     *
-     * @param closeable
-     *          closeable to close
-     */
-    public static void closeQuietly(@Nullable AsyncCloseable closeable) {
-        if (null == closeable) {
-            return;
-        }
-        try {
-            FutureUtils.result(closeable.asyncClose());
-        } catch (IOException e) {
-            // no-op. the exception is swallowed.
-        }
-    }
-
-    /**
-     * Close the closeables in sequence.
-     *
-     * @param closeables
-     *          closeables to close
-     * @return future represents the close future
-     */
-    public static Future<Void> closeSequence(ExecutorService executorService,
-                                             AsyncCloseable... closeables) {
-        return closeSequence(executorService, false, closeables);
-    }
-
-    /**
-     * Close the closeables in sequence and ignore errors during closing.
-     *
-     * @param executorService executor to execute closeable
-     * @param ignoreCloseError whether to ignore errors during closing
-     * @param closeables list of closeables
-     * @return future represents the close future.
-     */
-    public static Future<Void> closeSequence(ExecutorService executorService,
-                                             boolean ignoreCloseError,
-                                             AsyncCloseable... closeables) {
-        List<AsyncCloseable> closeableList = 
Lists.newArrayListWithExpectedSize(closeables.length);
-        for (AsyncCloseable closeable : closeables) {
-            if (null == closeable) {
-                closeableList.add(AsyncCloseable.NULL);
-            } else {
-                closeableList.add(closeable);
-            }
-        }
-        return FutureUtils.processList(
-                closeableList,
-                ignoreCloseError ? AsyncCloseable.CLOSE_FUNC_IGNORE_ERRORS : 
AsyncCloseable.CLOSE_FUNC,
-                executorService).map(VoidFunctions.LIST_TO_VOID_FUNC);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java
deleted file mode 100644
index 193b814..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Utils
- */
-package com.twitter.distributedlog.util;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
deleted file mode 100644
index 78292e9..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-
-import javax.annotation.Nullable;
-
-/**
- * Default zookeeper operation. No action on commiting or aborting.
- */
-public class DefaultZKOp extends ZKOp {
-
-    public static DefaultZKOp of(Op op, OpListener<Void> listener) {
-        return new DefaultZKOp(op, listener);
-    }
-
-    private final OpListener<Void> listener;
-
-    private DefaultZKOp(Op op, @Nullable OpListener<Void> opListener) {
-        super(op);
-        this.listener = opListener;
-    }
-
-    @Override
-    protected void commitOpResult(OpResult opResult) {
-        if (null != listener) {
-            listener.onCommit(null);
-        }
-    }
-
-    @Override
-    protected void abortOpResult(Throwable t, OpResult opResult) {
-        if (null != listener) {
-            listener.onAbort(t);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
deleted file mode 100644
index 78ff0a2..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.twitter.distributedlog.util.PermitManager;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Manager to control all the log segments rolling.
- */
-public class LimitedPermitManager implements PermitManager, Runnable, Watcher {
-
-    static final Logger LOG = 
LoggerFactory.getLogger(LimitedPermitManager.class);
-
-    static enum PermitState {
-        ALLOWED, DISALLOWED, DISABLED
-    }
-
-    class EpochPermit implements Permit {
-
-        final PermitState state;
-        final int epoch;
-
-        EpochPermit(PermitState state) {
-            this.state = state;
-            this.epoch = LimitedPermitManager.this.epoch.get();
-        }
-
-        int getEpoch() {
-            return epoch;
-        }
-
-        @Override
-        public boolean isAllowed() {
-            return PermitState.ALLOWED == state;
-        }
-    }
-
-    boolean enablePermits = true;
-    final Semaphore semaphore;
-    final int period;
-    final TimeUnit timeUnit;
-    final ScheduledExecutorService executorService;
-    final AtomicInteger epoch = new AtomicInteger(0);
-    private StatsLogger statsLogger = null;
-    private Gauge<Number> outstandingGauge = null;
-
-    public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit,
-                                ScheduledExecutorService executorService) {
-        this(concurrency, period, timeUnit, executorService, 
NullStatsLogger.INSTANCE);
-    }
-
-    public LimitedPermitManager(final int concurrency, int period, TimeUnit 
timeUnit,
-            ScheduledExecutorService executorService, StatsLogger statsLogger) 
{
-        if (concurrency > 0) {
-            this.semaphore = new Semaphore(concurrency);
-        } else {
-            this.semaphore = null;
-        }
-        this.period = period;
-        this.timeUnit = timeUnit;
-        this.executorService = executorService;
-        this.statsLogger = statsLogger;
-        this.outstandingGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return null == semaphore ? 0 : concurrency - 
semaphore.availablePermits();
-            }
-        };
-        this.statsLogger.scope("permits").registerGauge("outstanding", 
this.outstandingGauge);
-    }
-
-    @Override
-    synchronized public Permit acquirePermit() {
-        if (!enablePermits) {
-            return new EpochPermit(PermitState.DISABLED);
-        }
-        if (null != semaphore) {
-            return semaphore.tryAcquire() ? new 
EpochPermit(PermitState.ALLOWED) :
-                    new EpochPermit(PermitState.DISALLOWED);
-        } else {
-            return new EpochPermit(PermitState.ALLOWED);
-        }
-    }
-
-    @Override
-    synchronized public void releasePermit(Permit permit) {
-        if (null != semaphore && permit.isAllowed()) {
-            if (period <= 0) {
-                semaphore.release();
-            } else {
-                try {
-                    executorService.schedule(this, period, timeUnit);
-                } catch (RejectedExecutionException ree) {
-                    LOG.warn("Failed on scheduling releasing permit in given 
period ({}ms)." +
-                            " Release it immediately : ", 
timeUnit.toMillis(period), ree);
-                    semaphore.release();
-                }
-            }
-        }
-    }
-
-    @Override
-    synchronized public boolean disallowObtainPermits(Permit permit) {
-        if (!(permit instanceof EpochPermit)) {
-            return false;
-        }
-        if (epoch.getAndIncrement() == ((EpochPermit)permit).getEpoch()) {
-            this.enablePermits = false;
-            LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, 
epoch.get());
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public void close() {
-        unregisterGauge();
-    }
-
-    @Override
-    synchronized public boolean allowObtainPermits() {
-        forceSetAllowPermits(true);
-        return true;
-    }
-
-    synchronized void forceSetAllowPermits(boolean allowPermits) {
-        epoch.getAndIncrement();
-        this.enablePermits = allowPermits;
-        LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, 
epoch.get());
-    }
-
-    @Override
-    public void run() {
-        semaphore.release();
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        if (event.getType().equals(Event.EventType.None)) {
-            switch (event.getState()) {
-            case SyncConnected:
-                forceSetAllowPermits(true);
-                break;
-            case Disconnected:
-                forceSetAllowPermits(false);
-                break;
-            case Expired:
-                forceSetAllowPermits(false);
-                break;
-            default:
-                break;
-            }
-        }
-    }
-
-    public void unregisterGauge() {
-        if(this.statsLogger != null && this.outstandingGauge != null) {
-            this.statsLogger.scope("permits").unregisterGauge("outstanding", 
this.outstandingGauge);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java
deleted file mode 100644
index 5675574..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.twitter.distributedlog.util.Transaction;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-
-import javax.annotation.Nullable;
-
-/**
- * ZooKeeper Transaction Operation
- */
-public abstract class ZKOp implements Transaction.Op<Object> {
-
-    protected final Op op;
-
-    protected ZKOp(Op op) {
-        this.op = op;
-    }
-
-    public Op getOp() {
-        return op;
-    }
-
-    @Override
-    public void commit(Object r) {
-        assert(r instanceof OpResult);
-        commitOpResult((OpResult) r);
-    }
-
-    protected abstract void commitOpResult(OpResult opResult);
-
-    @Override
-    public void abort(Throwable t, Object r) {
-        assert(r instanceof OpResult);
-        abortOpResult(t, (OpResult) r);
-    }
-
-    /**
-     * Abort the operation with exception <i>t</i> and result <i>opResult</i>.
-     *
-     * @param t the reason to abort the operation
-     * @param opResult the result of operation
-     */
-    protected abstract void abortOpResult(Throwable t,
-                                          @Nullable OpResult opResult);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java
deleted file mode 100644
index 57f9aa3..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.OpResult;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * ZooKeeper Transaction
- */
-public class ZKTransaction implements Transaction<Object>, 
AsyncCallback.MultiCallback {
-
-    private final ZooKeeperClient zkc;
-    private final List<ZKOp> ops;
-    private final List<org.apache.zookeeper.Op> zkOps;
-    private final Promise<Void> result;
-    private final AtomicBoolean done = new AtomicBoolean(false);
-
-    public ZKTransaction(ZooKeeperClient zkc) {
-        this.zkc = zkc;
-        this.ops = Lists.newArrayList();
-        this.zkOps = Lists.newArrayList();
-        this.result = new Promise<Void>();
-    }
-
-    @Override
-    public void addOp(Op<Object> operation) {
-        if (done.get()) {
-            throw new IllegalStateException("Add an operation to a finished 
transaction");
-        }
-        assert(operation instanceof ZKOp);
-        ZKOp zkOp = (ZKOp) operation;
-        this.ops.add(zkOp);
-        this.zkOps.add(zkOp.getOp());
-    }
-
-    @Override
-    public Future<Void> execute() {
-        if (!done.compareAndSet(false, true)) {
-            return result;
-        }
-        try {
-            zkc.get().multi(zkOps, this, result);
-        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            result.setException(FutureUtils.zkException(e, ""));
-        } catch (InterruptedException e) {
-            result.setException(FutureUtils.zkException(e, ""));
-        }
-        return result;
-    }
-
-    @Override
-    public void abort(Throwable cause) {
-        if (!done.compareAndSet(false, true)) {
-            return;
-        }
-        for (int i = 0; i < ops.size(); i++) {
-            ops.get(i).abortOpResult(cause, null);
-        }
-        FutureUtils.setException(result, cause);
-    }
-
-    @Override
-    public void processResult(int rc, String path, Object ctx, List<OpResult> 
results) {
-        if (KeeperException.Code.OK.intValue() == rc) { // transaction succeed
-            for (int i = 0; i < ops.size(); i++) {
-                ops.get(i).commitOpResult(results.get(i));
-            }
-            FutureUtils.setValue(result, null);
-        } else {
-            KeeperException ke = 
KeeperException.create(KeeperException.Code.get(rc));
-            for (int i = 0; i < ops.size(); i++) {
-                ops.get(i).abortOpResult(ke, null != results ? results.get(i) 
: null);
-            }
-            FutureUtils.setException(result, ke);
-        }
-    }
-}


Reply via email to