http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java deleted file mode 100644 index 287bd6d..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/OrderedScheduler.java +++ /dev/null @@ -1,490 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.base.Objects; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.distributedlog.stats.BroadCastStatsLogger; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; -import com.twitter.util.Time; -import com.twitter.util.Timer; -import com.twitter.util.TimerTask; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.MathUtils; -import scala.Function0; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing - * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i> - * will be executed in order. - * <p> - * The scheduler is comprised of multiple {@link MonitoredScheduledThreadPoolExecutor}s. Each - * {@link MonitoredScheduledThreadPoolExecutor} is a single thread executor. Normal task submissions will - * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g - * {@link OrderedScheduler#apply(Object, Function0)} will be submitted to a dedicated executor based on - * the hash value of submit <i>key</i>. - * - * <h3>Metrics</h3> - * - * <h4>Per Executor Metrics</h4> - * - * Metrics about individual executors are exposed via {@link Builder#perExecutorStatsLogger} - * under <i>`scope`/`name`-executor-`id`-0</i>. `name` is the scheduler name provided by {@link Builder#name} - * while `id` is the index of this executor in the pool. And corresponding stats of future pool of - * that executor are exposed under <i>`scope`/`name`-executor-`id`-0/futurepool</i>. - * <p> - * See {@link MonitoredScheduledThreadPoolExecutor} and {@link MonitoredFuturePool} for per executor metrics - * exposed. - * - * <h4>Aggregated Metrics</h4> - * <ul> - * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on - * waiting being executed. - * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on - * executing. - * <li>futurepool/task_pending_time: opstats. measuring the characteristics about the time that tasks spent - * on waiting in future pool being executed. - * <li>futurepool/task_execution_time: opstats. measuring the characteristics about the time that tasks spent - * on executing. - * <li>futurepool/task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on - * submitting to future pool. - * <li>futurepool/tasks_pending: gauge. how many tasks are pending in this future pool. - * </ul> - */ -public class OrderedScheduler implements ScheduledExecutorService { - - /** - * Create a builder to build scheduler. - * - * @return scheduler builder - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder for {@link OrderedScheduler}. - */ - public static class Builder { - - private String name = "OrderedScheduler"; - private int corePoolSize = -1; - private ThreadFactory threadFactory = null; - private boolean traceTaskExecution = false; - private long traceTaskExecutionWarnTimeUs = Long.MAX_VALUE; - private StatsLogger statsLogger = NullStatsLogger.INSTANCE; - private StatsLogger perExecutorStatsLogger = NullStatsLogger.INSTANCE; - - /** - * Set the name of this scheduler. It would be used as part of stats scope and thread name. - * - * @param name - * name of the scheduler. - * @return scheduler builder - */ - public Builder name(String name) { - this.name = name; - return this; - } - - /** - * Set the number of threads to be used in this scheduler. - * - * @param corePoolSize the number of threads to keep in the pool, even - * if they are idle - * @return scheduler builder - */ - public Builder corePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; - return this; - } - - /** - * Set the thread factory that the scheduler uses to create a new thread. - * - * @param threadFactory the factory to use when the executor - * creates a new thread - * @return scheduler builder - */ - public Builder threadFactory(ThreadFactory threadFactory) { - this.threadFactory = threadFactory; - return this; - } - - /** - * Enable/Disable exposing task execution stats. - * - * @param trace - * flag to enable/disable exposing task execution stats. - * @return scheduler builder - */ - public Builder traceTaskExecution(boolean trace) { - this.traceTaskExecution = trace; - return this; - } - - /** - * Enable/Disable logging slow tasks whose execution time is above <code>timeUs</code>. - * - * @param timeUs - * slow task execution time threshold in us. - * @return scheduler builder. - */ - public Builder traceTaskExecutionWarnTimeUs(long timeUs) { - this.traceTaskExecutionWarnTimeUs = timeUs; - return this; - } - - /** - * Expose the aggregated stats over <code>statsLogger</code>. - * - * @param statsLogger - * stats logger to receive aggregated stats. - * @return scheduler builder - */ - public Builder statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - /** - * Expose stats of individual executors over <code>perExecutorStatsLogger</code>. - * Each executor's stats will be exposed under a sub-scope `name`-executor-`id`-0. - * `name` is the scheduler name, while `id` is the index of the scheduler in the pool. - * - * @param perExecutorStatsLogger - * stats logger to receive per executor stats. - * @return scheduler builder - */ - public Builder perExecutorStatsLogger(StatsLogger perExecutorStatsLogger) { - this.perExecutorStatsLogger = perExecutorStatsLogger; - return this; - } - - /** - * Build the ordered scheduler. - * - * @return ordered scheduler - */ - public OrderedScheduler build() { - if (corePoolSize <= 0) { - corePoolSize = Runtime.getRuntime().availableProcessors(); - } - if (null == threadFactory) { - threadFactory = Executors.defaultThreadFactory(); - } - - return new OrderedScheduler( - name, - corePoolSize, - threadFactory, - traceTaskExecution, - traceTaskExecutionWarnTimeUs, - statsLogger, - perExecutorStatsLogger); - } - - } - - protected final String name; - protected final int corePoolSize; - protected final MonitoredScheduledThreadPoolExecutor[] executors; - protected final MonitoredFuturePool[] futurePools; - protected final Random random; - - private OrderedScheduler(String name, - int corePoolSize, - ThreadFactory threadFactory, - boolean traceTaskExecution, - long traceTaskExecutionWarnTimeUs, - StatsLogger statsLogger, - StatsLogger perExecutorStatsLogger) { - this.name = name; - this.corePoolSize = corePoolSize; - this.executors = new MonitoredScheduledThreadPoolExecutor[corePoolSize]; - this.futurePools = new MonitoredFuturePool[corePoolSize]; - for (int i = 0; i < corePoolSize; i++) { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat(name + "-executor-" + i + "-%d") - .setThreadFactory(threadFactory) - .build(); - StatsLogger broadcastStatsLogger = - BroadCastStatsLogger.masterslave(perExecutorStatsLogger.scope("executor-" + i), statsLogger); - executors[i] = new MonitoredScheduledThreadPoolExecutor( - 1, tf, broadcastStatsLogger, traceTaskExecution); - futurePools[i] = new MonitoredFuturePool( - new ExecutorServiceFuturePool(executors[i]), - broadcastStatsLogger.scope("futurepool"), - traceTaskExecution, - traceTaskExecutionWarnTimeUs); - } - this.random = new Random(System.currentTimeMillis()); - } - - protected MonitoredScheduledThreadPoolExecutor chooseExecutor() { - return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)]; - } - - protected MonitoredScheduledThreadPoolExecutor chooseExecutor(Object key) { - return corePoolSize == 1 ? executors[0] : - executors[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)]; - } - - protected FuturePool chooseFuturePool(Object key) { - return corePoolSize == 1 ? futurePools[0] : - futurePools[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)]; - } - - protected FuturePool chooseFuturePool() { - return corePoolSize == 1 ? futurePools[0] : futurePools[random.nextInt(corePoolSize)]; - } - - /** - * {@inheritDoc} - */ - @Override - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { - return chooseExecutor().schedule(command, delay, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { - return chooseExecutor().schedule(callable, delay, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, - long initialDelay, long period, TimeUnit unit) { - return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, - long initialDelay, long delay, TimeUnit unit) { - return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public void shutdown() { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - // Unregister gauges - executor.unregisterGauges(); - executor.shutdown(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<Runnable> shutdownNow() { - List<Runnable> runnables = new ArrayList<Runnable>(); - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - runnables.addAll(executor.shutdownNow()); - } - return runnables; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isShutdown() { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - if (!executor.isShutdown()) { - return false; - } - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isTerminated() { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - if (!executor.isTerminated()) { - return false; - } - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - if (!executor.awaitTermination(timeout, unit)) { - return false; - } - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public <T> Future<T> submit(Callable<T> task) { - return chooseExecutor().submit(task); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> Future<T> submit(Runnable task, T result) { - return chooseExecutor().submit(task, result); - } - - /** - * {@inheritDoc} - */ - @Override - public Future<?> submit(Runnable task) { - return chooseExecutor().submit(task); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - return chooseExecutor().invokeAll(tasks); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return chooseExecutor().invokeAll(tasks, timeout, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) - throws InterruptedException, ExecutionException { - return chooseExecutor().invokeAny(tasks); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return chooseExecutor().invokeAny(tasks, timeout, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public void execute(Runnable command) { - chooseExecutor().execute(command); - } - - // Ordered Functions - - /** - * Return a future pool used by <code>key</code>. - * - * @param key - * key to order in the future pool - * @return future pool - */ - public FuturePool getFuturePool(Object key) { - return chooseFuturePool(key); - } - - /** - * Execute the <code>function</code> in the executor that assigned by <code>key</code>. - * - * @see com.twitter.util.Future - * @param key key of the <i>function</i> to run - * @param function function to run - * @return future representing the result of the <i>function</i> - */ - public <T> com.twitter.util.Future<T> apply(Object key, Function0<T> function) { - return chooseFuturePool(key).apply(function); - } - - /** - * Execute the <code>function</code> by the scheduler. It would be submitted to any executor randomly. - * - * @param function function to run - * @return future representing the result of the <i>function</i> - */ - public <T> com.twitter.util.Future<T> apply(Function0<T> function) { - return chooseFuturePool().apply(function); - } - - public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) { - return chooseExecutor(key).schedule(command, delay, unit); - } - - public ScheduledFuture<?> scheduleAtFixedRate(Object key, - Runnable command, - long initialDelay, - long period, - TimeUnit unit) { - return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit); - } - - public Future<?> submit(Object key, Runnable command) { - return chooseExecutor(key).submit(command); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java deleted file mode 100644 index 41c28a3..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -/** - * A simple limiter interface which tracks acquire/release of permits, for - * example for tracking outstanding writes. - */ -public interface PermitLimiter { - - public static PermitLimiter NULL_PERMIT_LIMITER = new PermitLimiter() { - @Override - public boolean acquire() { - return true; - } - @Override - public void release(int permits) { - } - - @Override - public void close() { - - } - }; - - /** - * Acquire a permit. - * - * @return true if successfully acquire a permit, otherwise false. - */ - boolean acquire(); - - /** - * Release a permit. - */ - void release(int permits); - - /** - * Close the resources created by the limiter - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java deleted file mode 100644 index 6a6d574..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -public interface PermitManager { - - public static interface Permit { - static final Permit ALLOWED = new Permit() { - @Override - public boolean isAllowed() { - return true; - } - }; - boolean isAllowed(); - } - - public static PermitManager UNLIMITED_PERMIT_MANAGER = new PermitManager() { - @Override - public Permit acquirePermit() { - return Permit.ALLOWED; - } - - @Override - public void releasePermit(Permit permit) { - // nop - } - - @Override - public boolean allowObtainPermits() { - return true; - } - - @Override - public boolean disallowObtainPermits(Permit permit) { - return false; - } - - @Override - public void close() { - // nop - } - - }; - - /** - * Obetain a permit from permit manager. - * - * @return permit. - */ - Permit acquirePermit(); - - /** - * Release a given permit. - * - * @param permit - * permit to release - */ - void releasePermit(Permit permit); - - /** - * Allow obtaining permits. - */ - boolean allowObtainPermits(); - - /** - * Disallow obtaining permits. Disallow needs to be performed under the context - * of <i>permit</i>. - * - * @param permit - * permit context to disallow - */ - boolean disallowObtainPermits(Permit permit); - - /** - * Release the resources - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java deleted file mode 100644 index 3565f98..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/RetryPolicyUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; - -/** - * Utils for {@link org.apache.bookkeeper.zookeeper.RetryPolicy} - */ -public class RetryPolicyUtils { - - /** - * Infinite retry policy - */ - public static final RetryPolicy DEFAULT_INFINITE_RETRY_POLICY = infiniteRetry(200, 2000); - - /** - * Create an infinite retry policy with backoff time between <i>baseBackOffTimeMs</i> and - * <i>maxBackoffTimeMs</i>. - * - * @param baseBackoffTimeMs base backoff time in milliseconds - * @param maxBackoffTimeMs maximum backoff time in milliseconds - * @return an infinite retry policy - */ - public static RetryPolicy infiniteRetry(long baseBackoffTimeMs, long maxBackoffTimeMs) { - return new BoundExponentialBackoffRetryPolicy(baseBackoffTimeMs, maxBackoffTimeMs, Integer.MAX_VALUE); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java deleted file mode 100644 index d139a80..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SafeQueueingFuturePool.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.base.Preconditions; - -import com.twitter.util.Function0; -import com.twitter.util.FuturePool; -import com.twitter.util.Future; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicInteger; - -import scala.runtime.BoxedUnit; - -/** - * Acts like a future pool, but collects failed apply calls into a queue to be applied - * in-order on close. This happens either in the close thread or after close is called, - * in the last operation to complete execution. - * Ops submitted after close will not be scheduled, so its important to ensure no more - * ops will be applied once close has been called. - */ -public class SafeQueueingFuturePool<T> { - - static final Logger LOG = LoggerFactory.getLogger(SafeQueueingFuturePool.class); - - private boolean closed; - private int outstanding; - private ConcurrentLinkedQueue<Function0<T>> queue; - private FuturePool orderedFuturePool; - - public SafeQueueingFuturePool(FuturePool orderedFuturePool) { - this.closed = false; - this.outstanding = 0; - this.queue = new ConcurrentLinkedQueue<Function0<T>>(); - this.orderedFuturePool = orderedFuturePool; - } - - public synchronized Future<T> apply(final Function0<T> fn) { - Preconditions.checkNotNull(fn); - if (closed) { - return Future.exception(new RejectedExecutionException("Operation submitted to closed SafeQueueingFuturePool")); - } - ++outstanding; - queue.add(fn); - Future<T> result = orderedFuturePool.apply(new Function0<T>() { - @Override - public T apply() { - return queue.poll().apply(); - } - @Override - public String toString() { - return fn.toString(); - } - }).ensure(new Function0<BoxedUnit>() { - public BoxedUnit apply() { - if (decrOutstandingAndCheckDone()) { - applyAll(); - } - return null; - } - }); - return result; - } - - private synchronized boolean decrOutstandingAndCheckDone() { - return --outstanding == 0 && closed; - } - - public void close() { - final boolean done; - synchronized (this) { - if (closed) { - return; - } - closed = true; - done = (outstanding == 0); - } - if (done) { - applyAll(); - } - } - - private void applyAll() { - if (!queue.isEmpty()) { - LOG.info("Applying {} items", queue.size()); - } - while (!queue.isEmpty()) { - queue.poll().apply(); - } - } - - public synchronized int size() { - return queue.size(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java deleted file mode 100644 index 9f756f0..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SchedulerUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -public class SchedulerUtils { - - static final Logger logger = LoggerFactory.getLogger(SchedulerUtils.class); - - public static void shutdownScheduler(ExecutorService service, long timeout, TimeUnit timeUnit) { - if (null == service) { - return; - } - service.shutdown(); - try { - service.awaitTermination(timeout, timeUnit); - } catch (InterruptedException e) { - logger.warn("Interrupted when shutting down scheduler : ", e); - } - service.shutdownNow(); - } - - public static void shutdownScheduler(OrderedSafeExecutor service, long timeout, TimeUnit timeUnit) { - if (null == service) { - return; - } - service.shutdown(); - try { - service.awaitTermination(timeout, timeUnit); - } catch (InterruptedException e) { - logger.warn("Interrupted when shutting down scheduler : ", e); - } - service.forceShutdown(timeout, timeUnit); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java deleted file mode 100644 index 7ec50ba..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sequencer.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -/** - * Sequencer generating transaction id. - */ -public interface Sequencer { - - /** - * Return next transaction id generated by the sequencer. - * - * @return next transaction id generated by the sequencer. - */ - long nextId(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java deleted file mode 100644 index 4086a1e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.annotations.VisibleForTesting; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple counter based {@link PermitLimiter}. - * - * <h3>Metrics</h3> - * <ul> - * <li> `permits`: gauge. how many permits are acquired right now? - * <li> `permits`/*: opstats. the characteristics about number of permits already acquired on each acquires. - * <li> `acquireFailure`: counter. how many acquires failed? failure means it already reached maximum permits - * when trying to acquire. - * </ul> - */ -public class SimplePermitLimiter implements PermitLimiter { - - final Counter acquireFailureCounter; - final OpStatsLogger permitsMetric; - final AtomicInteger permits; - final int permitsMax; - final boolean darkmode; - final Feature disableWriteLimitFeature; - private StatsLogger statsLogger = null; - private Gauge<Number> permitsGauge = null; - private String permitsGaugeLabel = ""; - - public SimplePermitLimiter(boolean darkmode, int permitsMax, StatsLogger statsLogger, - boolean singleton, Feature disableWriteLimitFeature) { - this.permits = new AtomicInteger(0); - this.permitsMax = permitsMax; - this.darkmode = darkmode; - this.disableWriteLimitFeature = disableWriteLimitFeature; - - // stats - if (singleton) { - this.statsLogger = statsLogger; - this.permitsGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - @Override - public Number getSample() { - return permits.get(); - } - }; - this.permitsGaugeLabel = "permits"; - statsLogger.registerGauge(permitsGaugeLabel, permitsGauge); - } - acquireFailureCounter = statsLogger.getCounter("acquireFailure"); - permitsMetric = statsLogger.getOpStatsLogger("permits"); - } - - public boolean isDarkmode() { - return darkmode || disableWriteLimitFeature.isAvailable(); - } - - @Override - public boolean acquire() { - permitsMetric.registerSuccessfulEvent(permits.get()); - if (permits.incrementAndGet() <= permitsMax || isDarkmode()) { - return true; - } else { - acquireFailureCounter.inc(); - permits.decrementAndGet(); - return false; - } - } - - @Override - public void release(int permitsToRelease) { - permits.addAndGet(-permitsToRelease); - } - - @Override - public void close() { - unregisterGauge(); - } - - @VisibleForTesting - public int getPermits() { - return permits.get(); - } - - public void unregisterGauge() { - if (this.statsLogger != null && this.permitsGauge != null) { - this.statsLogger.unregisterGauge(permitsGaugeLabel, permitsGauge); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java deleted file mode 100644 index 216d5ea..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Sizable.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -/** - * The {@code Sizable} interface is to provide the capability of calculating size - * of any objects. - */ -public interface Sizable { - /** - * Calculate the size for this instance. - * - * @return size of the instance. - */ - long size(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java deleted file mode 100644 index 96e564e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/TimeSequencer.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.twitter.distributedlog.DistributedLogConstants; - -/** - * Time based sequencer. It generated non-decreasing transaction id using milliseconds. - * It isn't thread-safe. The caller takes the responsibility on synchronization. - */ -public class TimeSequencer implements Sequencer { - - private long lastId = DistributedLogConstants.INVALID_TXID; - - public void setLastId(long lastId) { - this.lastId = lastId; - } - - @Override - public long nextId() { - lastId = Math.max(lastId, System.currentTimeMillis()); - return lastId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java deleted file mode 100644 index 422bbda..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Transaction.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.annotations.Beta; -import com.twitter.util.Future; - -/** - * Util class represents a transaction - */ -@Beta -public interface Transaction<OpResult> { - - /** - * An operation executed in a transaction. - */ - interface Op<OpResult> { - - /** - * Execute after the transaction succeeds - */ - void commit(OpResult r); - - /** - * Execute after the transaction fails - */ - void abort(Throwable t, OpResult r); - - } - - /** - * Listener on the result of an {@link com.twitter.distributedlog.util.Transaction.Op}. - * - * @param <OpResult> - */ - interface OpListener<OpResult> { - - /** - * Trigger on operation committed. - * - * @param r - * result to return - */ - void onCommit(OpResult r); - - /** - * Trigger on operation aborted. - * - * @param t - * reason to abort - */ - void onAbort(Throwable t); - } - - /** - * Add the operation to current transaction. - * - * @param operation - * operation to execute under current transaction - */ - void addOp(Op<OpResult> operation); - - /** - * Execute the current transaction. If the transaction succeed, all operations will be - * committed (via {@link com.twitter.distributedlog.util.Transaction.Op#commit(Object)}. - * Otherwise, all operations will be aborted (via {@link Op#abort(Throwable, Object)}). - * - * @return future representing the result of transaction execution. - */ - Future<Void> execute(); - - /** - * Abort current transaction. If this is called and the transaction haven't been executed by - * {@link #execute()}, it would abort all operations. If the transaction has been executed, - * the behavior is left up to implementation - if transaction is cancellable, the {@link #abort(Throwable)} - * could attempt to cancel it. - * - * @param reason reason to abort the transaction - */ - void abort(Throwable reason); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java deleted file mode 100644 index fce9bcd..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java +++ /dev/null @@ -1,607 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nullable; - -import com.google.common.base.Objects; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.function.VoidFunctions; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; - -/** - * Basic Utilities. - */ -public class Utils { - - private static final Logger logger = LoggerFactory.getLogger(Utils.class); - - /** - * Current time from some arbitrary time base in the past, counting in - * nanoseconds, and not affected by settimeofday or similar system clock - * changes. This is appropriate to use when computing how much longer to - * wait for an interval to expire. - * - * @return current time in nanoseconds. - */ - public static long nowInNanos() { - return System.nanoTime(); - } - - /** - * Current time from some fixed base time - so useful for cross machine - * comparison - * - * @return current time in milliseconds. - */ - public static long nowInMillis() { - return System.currentTimeMillis(); - } - - /** - * Milliseconds elapsed since the time specified, the input is nanoTime - * the only conversion happens when computing the elapsed time - * - * @param startMsecTime the start of the interval that we are measuring - * @return elapsed time in milliseconds. - */ - public static long elapsedMSec(long startMsecTime) { - return (System.currentTimeMillis() - startMsecTime); - } - - public static boolean randomPercent(double percent) { - return (Math.random() * 100.0) <= percent; - } - - /** - * Synchronously create zookeeper path recursively and optimistically. - * - * @see #zkAsyncCreateFullPathOptimistic(ZooKeeperClient, String, byte[], List, CreateMode) - * @param zkc Zookeeper client - * @param path Zookeeper full path - * @param data Zookeeper data - * @param acl Acl of the zk path - * @param createMode Create mode of zk path - * @throws ZooKeeperClient.ZooKeeperConnectionException - * @throws KeeperException - * @throws InterruptedException - */ - public static void zkCreateFullPathOptimistic( - ZooKeeperClient zkc, - String path, - byte[] data, - final List<ACL> acl, - final CreateMode createMode) - throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { - try { - Await.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, createMode)); - } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - throw zkce; - } catch (KeeperException ke) { - throw ke; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException rte) { - throw rte; - } catch (Exception exc) { - throw new RuntimeException("Unexpected Exception", exc); - } - } - - /** - * Asynchronously create zookeeper path recursively and optimistically. - * - * @param zkc Zookeeper client - * @param pathToCreate Zookeeper full path - * @param parentPathShouldNotCreate The recursive creation should stop if this path doesn't exist - * @param data Zookeeper data - * @param acl Acl of the zk path - * @param createMode Create mode of zk path - * @param callback Callback - * @param ctx Context object - */ - public static void zkAsyncCreateFullPathOptimisticRecursive( - final ZooKeeperClient zkc, - final String pathToCreate, - final Optional<String> parentPathShouldNotCreate, - final byte[] data, - final List<ACL> acl, - final CreateMode createMode, - final AsyncCallback.StringCallback callback, - final Object ctx) { - try { - zkc.get().create(pathToCreate, data, acl, createMode, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - - if (rc != KeeperException.Code.NONODE.intValue()) { - callback.processResult(rc, path, ctx, name); - return; - } - - // Since we got a nonode, it means that my parents may not exist - // ephemeral nodes can't have children so Create mode is always - // persistent parents - int lastSlash = pathToCreate.lastIndexOf('/'); - if (lastSlash <= 0) { - callback.processResult(rc, path, ctx, name); - return; - } - String parent = pathToCreate.substring(0, lastSlash); - if (parentPathShouldNotCreate.isPresent() && Objects.equal(parentPathShouldNotCreate.get(), parent)) { - // we should stop here - callback.processResult(rc, path, ctx, name); - return; - } - zkAsyncCreateFullPathOptimisticRecursive(zkc, parent, parentPathShouldNotCreate, new byte[0], acl, - CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NODEEXISTS.intValue()) { - // succeeded in creating the parent, now create the original path - zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate, - data, acl, createMode, callback, ctx); - } else { - callback.processResult(rc, path, ctx, name); - } - } - }, ctx); - } - }, ctx); - } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - callback.processResult(DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE, zkce.getMessage(), ctx, pathToCreate); - } catch (InterruptedException ie) { - callback.processResult(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE, ie.getMessage(), ctx, pathToCreate); - } - } - - /** - * Asynchronously create zookeeper path recursively and optimistically. - * - * @param zkc Zookeeper client - * @param pathToCreate Zookeeper full path - * @param data Zookeeper data - * @param acl Acl of the zk path - * @param createMode Create mode of zk path - */ - public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic( - final ZooKeeperClient zkc, - final String pathToCreate, - final byte[] data, - final List<ACL> acl, - final CreateMode createMode) { - Optional<String> parentPathShouldNotCreate = Optional.absent(); - return zkAsyncCreateFullPathOptimistic( - zkc, - pathToCreate, - parentPathShouldNotCreate, - data, - acl, - createMode); - } - - /** - * Asynchronously create zookeeper path recursively and optimistically - * - * @param zkc Zookeeper client - * @param pathToCreate Zookeeper full path - * @param parentPathShouldNotCreate zookeeper parent path should not be created - * @param data Zookeeper data - * @param acl Acl of the zk path - * @param createMode Create mode of zk path - */ - public static Future<BoxedUnit> zkAsyncCreateFullPathOptimistic( - final ZooKeeperClient zkc, - final String pathToCreate, - final Optional<String> parentPathShouldNotCreate, - final byte[] data, - final List<ACL> acl, - final CreateMode createMode) { - final Promise<BoxedUnit> result = new Promise<BoxedUnit>(); - - zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate, - data, acl, createMode, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - handleKeeperExceptionCode(rc, path, result); - } - }, result); - - return result; - } - - /** - * Asynchronously create zookeeper path recursively and optimistically. - * - * @param zkc Zookeeper client - * @param pathToCreate Zookeeper full path - * @param data Zookeeper data - * @param acl Acl of the zk path - * @param createMode Create mode of zk path - */ - public static Future<BoxedUnit> zkAsyncCreateFullPathOptimisticAndSetData( - final ZooKeeperClient zkc, - final String pathToCreate, - final byte[] data, - final List<ACL> acl, - final CreateMode createMode) { - final Promise<BoxedUnit> result = new Promise<BoxedUnit>(); - - try { - zkc.get().setData(pathToCreate, data, -1, new AsyncCallback.StatCallback() { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) { - if (rc != KeeperException.Code.NONODE.intValue()) { - handleKeeperExceptionCode(rc, path, result); - return; - } - - Optional<String> parentPathShouldNotCreate = Optional.absent(); - zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate, - data, acl, createMode, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - handleKeeperExceptionCode(rc, path, result); - } - }, result); - } - }, result); - } catch (Exception exc) { - result.setException(exc); - } - - return result; - } - - private static void handleKeeperExceptionCode(int rc, String pathOrMessage, Promise<BoxedUnit> result) { - if (KeeperException.Code.OK.intValue() == rc) { - result.setValue(BoxedUnit.UNIT); - } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) { - result.setException(new ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage)); - } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) { - result.setException(new DLInterruptedException(pathOrMessage)); - } else { - result.setException(KeeperException.create(KeeperException.Code.get(rc), pathOrMessage)); - } - } - - public static Future<Versioned<byte[]>> zkGetData(ZooKeeperClient zkc, String path, boolean watch) { - ZooKeeper zk; - try { - zk = zkc.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } - return zkGetData(zk, path, watch); - } - - /** - * Retrieve data from zookeeper <code>path</code>. - * - * @param path - * zookeeper path to retrieve data - * @param watch - * whether to watch the path - * @return future representing the versioned value. null version or null value means path doesn't exist. - */ - public static Future<Versioned<byte[]>> zkGetData(ZooKeeper zk, String path, boolean watch) { - final Promise<Versioned<byte[]>> promise = new Promise<Versioned<byte[]>>(); - zk.getData(path, watch, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - if (null == stat) { - promise.setValue(new Versioned<byte[]>(null, null)); - } else { - promise.setValue(new Versioned<byte[]>(data, new ZkVersion(stat.getVersion()))); - } - } else if (KeeperException.Code.NONODE.intValue() == rc) { - promise.setValue(new Versioned<byte[]>(null, null)); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - return promise; - } - - public static Future<ZkVersion> zkSetData(ZooKeeperClient zkc, String path, byte[] data, ZkVersion version) { - ZooKeeper zk; - try { - zk = zkc.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } - return zkSetData(zk, path, data, version); - } - - /** - * Set <code>data</code> to zookeeper <code>path</code>. - * - * @param zk - * zookeeper client - * @param path - * path to set data - * @param data - * data to set - * @param version - * version used to set data - * @return future representing the version after this operation. - */ - public static Future<ZkVersion> zkSetData(ZooKeeper zk, String path, byte[] data, ZkVersion version) { - final Promise<ZkVersion> promise = new Promise<ZkVersion>(); - zk.setData(path, data, version.getZnodeVersion(), new AsyncCallback.StatCallback() { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - promise.updateIfEmpty(new Return<ZkVersion>(new ZkVersion(stat.getVersion()))); - return; - } - promise.updateIfEmpty(new Throw<ZkVersion>( - KeeperException.create(KeeperException.Code.get(rc)))); - return; - } - }, null); - return promise; - } - - public static Future<Void> zkDelete(ZooKeeperClient zkc, String path, ZkVersion version) { - ZooKeeper zk; - try { - zk = zkc.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } - return zkDelete(zk, path, version); - } - - /** - * Delete the given <i>path</i> from zookeeper. - * - * @param zk - * zookeeper client - * @param path - * path to delete - * @param version - * version used to set data - * @return future representing the version after this operation. - */ - public static Future<Void> zkDelete(ZooKeeper zk, String path, ZkVersion version) { - final Promise<Void> promise = new Promise<Void>(); - zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (KeeperException.Code.OK.intValue() == rc) { - promise.updateIfEmpty(new Return<Void>(null)); - return; - } - promise.updateIfEmpty(new Throw<Void>( - KeeperException.create(KeeperException.Code.get(rc)))); - return; - } - }, null); - return promise; - } - - /** - * Delete the given <i>path</i> from zookeeper. - * - * @param zkc - * zookeeper client - * @param path - * path to delete - * @param version - * version used to set data - * @return future representing if the delete is successful. Return true if the node is deleted, - * false if the node doesn't exist, otherwise future will throw exception - * - */ - public static Future<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) { - ZooKeeper zk; - try { - zk = zkc.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, path)); - } - final Promise<Boolean> promise = new Promise<Boolean>(); - zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (KeeperException.Code.OK.intValue() == rc ) { - promise.setValue(true); - } else if (KeeperException.Code.NONODE.intValue() == rc) { - promise.setValue(false); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - return promise; - } - - public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable, - boolean swallowIOException) { - if (null == closeable) { - return Future.Void(); - } else if (swallowIOException) { - return FutureUtils.ignore(closeable.asyncClose()); - } else { - return closeable.asyncClose(); - } - } - - /** - * Sync zookeeper client on given <i>path</i>. - * - * @param zkc - * zookeeper client - * @param path - * path to sync - * @return zookeeper client after sync - * @throws IOException - */ - public static ZooKeeper sync(ZooKeeperClient zkc, String path) throws IOException { - ZooKeeper zk; - try { - zk = zkc.get(); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on checking if log " + path + " exists", e); - } - final CountDownLatch syncLatch = new CountDownLatch(1); - final AtomicInteger syncResult = new AtomicInteger(0); - zk.sync(path, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - syncResult.set(rc); - syncLatch.countDown(); - } - }, null); - try { - syncLatch.await(); - } catch (InterruptedException e) { - throw new DLInterruptedException("Interrupted on syncing zookeeper connection", e); - } - if (KeeperException.Code.OK.intValue() != syncResult.get()) { - throw new ZKException("Error syncing zookeeper connection ", - KeeperException.Code.get(syncResult.get())); - } - return zk; - } - - /** - * Close a closeable. - * - * @param closeable - * closeable to close - */ - public static void close(@Nullable Closeable closeable) { - if (null == closeable) { - return; - } - try { - Closeables.close(closeable, true); - } catch (IOException e) { - // no-op. the exception is swallowed. - } - } - - /** - * Close an async closeable. - * - * @param closeable - * closeable to close - */ - public static void close(@Nullable AsyncCloseable closeable) - throws IOException { - if (null == closeable) { - return; - } - FutureUtils.result(closeable.asyncClose()); - } - - /** - * Close an async closeable. - * - * @param closeable - * closeable to close - */ - public static void closeQuietly(@Nullable AsyncCloseable closeable) { - if (null == closeable) { - return; - } - try { - FutureUtils.result(closeable.asyncClose()); - } catch (IOException e) { - // no-op. the exception is swallowed. - } - } - - /** - * Close the closeables in sequence. - * - * @param closeables - * closeables to close - * @return future represents the close future - */ - public static Future<Void> closeSequence(ExecutorService executorService, - AsyncCloseable... closeables) { - return closeSequence(executorService, false, closeables); - } - - /** - * Close the closeables in sequence and ignore errors during closing. - * - * @param executorService executor to execute closeable - * @param ignoreCloseError whether to ignore errors during closing - * @param closeables list of closeables - * @return future represents the close future. - */ - public static Future<Void> closeSequence(ExecutorService executorService, - boolean ignoreCloseError, - AsyncCloseable... closeables) { - List<AsyncCloseable> closeableList = Lists.newArrayListWithExpectedSize(closeables.length); - for (AsyncCloseable closeable : closeables) { - if (null == closeable) { - closeableList.add(AsyncCloseable.NULL); - } else { - closeableList.add(closeable); - } - } - return FutureUtils.processList( - closeableList, - ignoreCloseError ? AsyncCloseable.CLOSE_FUNC_IGNORE_ERRORS : AsyncCloseable.CLOSE_FUNC, - executorService).map(VoidFunctions.LIST_TO_VOID_FUNC); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java deleted file mode 100644 index 193b814..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Utils - */ -package com.twitter.distributedlog.util; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java deleted file mode 100644 index 78292e9..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/DefaultZKOp.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.zk; - -import com.twitter.distributedlog.util.Transaction.OpListener; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; - -import javax.annotation.Nullable; - -/** - * Default zookeeper operation. No action on commiting or aborting. - */ -public class DefaultZKOp extends ZKOp { - - public static DefaultZKOp of(Op op, OpListener<Void> listener) { - return new DefaultZKOp(op, listener); - } - - private final OpListener<Void> listener; - - private DefaultZKOp(Op op, @Nullable OpListener<Void> opListener) { - super(op); - this.listener = opListener; - } - - @Override - protected void commitOpResult(OpResult opResult) { - if (null != listener) { - listener.onCommit(null); - } - } - - @Override - protected void abortOpResult(Throwable t, OpResult opResult) { - if (null != listener) { - listener.onAbort(t); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java deleted file mode 100644 index 78ff0a2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/LimitedPermitManager.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.zk; - -import com.twitter.distributedlog.util.PermitManager; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Manager to control all the log segments rolling. - */ -public class LimitedPermitManager implements PermitManager, Runnable, Watcher { - - static final Logger LOG = LoggerFactory.getLogger(LimitedPermitManager.class); - - static enum PermitState { - ALLOWED, DISALLOWED, DISABLED - } - - class EpochPermit implements Permit { - - final PermitState state; - final int epoch; - - EpochPermit(PermitState state) { - this.state = state; - this.epoch = LimitedPermitManager.this.epoch.get(); - } - - int getEpoch() { - return epoch; - } - - @Override - public boolean isAllowed() { - return PermitState.ALLOWED == state; - } - } - - boolean enablePermits = true; - final Semaphore semaphore; - final int period; - final TimeUnit timeUnit; - final ScheduledExecutorService executorService; - final AtomicInteger epoch = new AtomicInteger(0); - private StatsLogger statsLogger = null; - private Gauge<Number> outstandingGauge = null; - - public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit, - ScheduledExecutorService executorService) { - this(concurrency, period, timeUnit, executorService, NullStatsLogger.INSTANCE); - } - - public LimitedPermitManager(final int concurrency, int period, TimeUnit timeUnit, - ScheduledExecutorService executorService, StatsLogger statsLogger) { - if (concurrency > 0) { - this.semaphore = new Semaphore(concurrency); - } else { - this.semaphore = null; - } - this.period = period; - this.timeUnit = timeUnit; - this.executorService = executorService; - this.statsLogger = statsLogger; - this.outstandingGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return null == semaphore ? 0 : concurrency - semaphore.availablePermits(); - } - }; - this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge); - } - - @Override - synchronized public Permit acquirePermit() { - if (!enablePermits) { - return new EpochPermit(PermitState.DISABLED); - } - if (null != semaphore) { - return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED) : - new EpochPermit(PermitState.DISALLOWED); - } else { - return new EpochPermit(PermitState.ALLOWED); - } - } - - @Override - synchronized public void releasePermit(Permit permit) { - if (null != semaphore && permit.isAllowed()) { - if (period <= 0) { - semaphore.release(); - } else { - try { - executorService.schedule(this, period, timeUnit); - } catch (RejectedExecutionException ree) { - LOG.warn("Failed on scheduling releasing permit in given period ({}ms)." + - " Release it immediately : ", timeUnit.toMillis(period), ree); - semaphore.release(); - } - } - } - } - - @Override - synchronized public boolean disallowObtainPermits(Permit permit) { - if (!(permit instanceof EpochPermit)) { - return false; - } - if (epoch.getAndIncrement() == ((EpochPermit)permit).getEpoch()) { - this.enablePermits = false; - LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get()); - return true; - } else { - return false; - } - } - - @Override - public void close() { - unregisterGauge(); - } - - @Override - synchronized public boolean allowObtainPermits() { - forceSetAllowPermits(true); - return true; - } - - synchronized void forceSetAllowPermits(boolean allowPermits) { - epoch.getAndIncrement(); - this.enablePermits = allowPermits; - LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get()); - } - - @Override - public void run() { - semaphore.release(); - } - - @Override - public void process(WatchedEvent event) { - if (event.getType().equals(Event.EventType.None)) { - switch (event.getState()) { - case SyncConnected: - forceSetAllowPermits(true); - break; - case Disconnected: - forceSetAllowPermits(false); - break; - case Expired: - forceSetAllowPermits(false); - break; - default: - break; - } - } - } - - public void unregisterGauge() { - if(this.statsLogger != null && this.outstandingGauge != null) { - this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java deleted file mode 100644 index 5675574..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKOp.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.zk; - -import com.twitter.distributedlog.util.Transaction; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; - -import javax.annotation.Nullable; - -/** - * ZooKeeper Transaction Operation - */ -public abstract class ZKOp implements Transaction.Op<Object> { - - protected final Op op; - - protected ZKOp(Op op) { - this.op = op; - } - - public Op getOp() { - return op; - } - - @Override - public void commit(Object r) { - assert(r instanceof OpResult); - commitOpResult((OpResult) r); - } - - protected abstract void commitOpResult(OpResult opResult); - - @Override - public void abort(Throwable t, Object r) { - assert(r instanceof OpResult); - abortOpResult(t, (OpResult) r); - } - - /** - * Abort the operation with exception <i>t</i> and result <i>opResult</i>. - * - * @param t the reason to abort the operation - * @param opResult the result of operation - */ - protected abstract void abortOpResult(Throwable t, - @Nullable OpResult opResult); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java deleted file mode 100644 index 57f9aa3..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKTransaction.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.zk; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.OpResult; - -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * ZooKeeper Transaction - */ -public class ZKTransaction implements Transaction<Object>, AsyncCallback.MultiCallback { - - private final ZooKeeperClient zkc; - private final List<ZKOp> ops; - private final List<org.apache.zookeeper.Op> zkOps; - private final Promise<Void> result; - private final AtomicBoolean done = new AtomicBoolean(false); - - public ZKTransaction(ZooKeeperClient zkc) { - this.zkc = zkc; - this.ops = Lists.newArrayList(); - this.zkOps = Lists.newArrayList(); - this.result = new Promise<Void>(); - } - - @Override - public void addOp(Op<Object> operation) { - if (done.get()) { - throw new IllegalStateException("Add an operation to a finished transaction"); - } - assert(operation instanceof ZKOp); - ZKOp zkOp = (ZKOp) operation; - this.ops.add(zkOp); - this.zkOps.add(zkOp.getOp()); - } - - @Override - public Future<Void> execute() { - if (!done.compareAndSet(false, true)) { - return result; - } - try { - zkc.get().multi(zkOps, this, result); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - result.setException(FutureUtils.zkException(e, "")); - } catch (InterruptedException e) { - result.setException(FutureUtils.zkException(e, "")); - } - return result; - } - - @Override - public void abort(Throwable cause) { - if (!done.compareAndSet(false, true)) { - return; - } - for (int i = 0; i < ops.size(); i++) { - ops.get(i).abortOpResult(cause, null); - } - FutureUtils.setException(result, cause); - } - - @Override - public void processResult(int rc, String path, Object ctx, List<OpResult> results) { - if (KeeperException.Code.OK.intValue() == rc) { // transaction succeed - for (int i = 0; i < ops.size(); i++) { - ops.get(i).commitOpResult(results.get(i)); - } - FutureUtils.setValue(result, null); - } else { - KeeperException ke = KeeperException.create(KeeperException.Code.get(rc)); - for (int i = 0; i < ops.size(); i++) { - ops.get(i).abortOpResult(ke, null != results ? results.get(i) : null); - } - FutureUtils.setException(result, ke); - } - } -}