http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java deleted file mode 100644 index 8e4a8be..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/FutureUtils.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import com.google.common.base.Stopwatch; -import org.apache.distributedlog.DistributedLogConstants; -import org.apache.distributedlog.exceptions.BKTransmitException; -import org.apache.distributedlog.exceptions.LockingException; -import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.exceptions.ZKException; -import org.apache.distributedlog.stats.OpStatsListener; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureCancelledException; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import com.twitter.util.Try; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Utilities to process future - */ -public class FutureUtils { - - private static final Logger logger = LoggerFactory.getLogger(FutureUtils.class); - - public static class OrderedFutureEventListener<R> - implements FutureEventListener<R> { - - public static <R> OrderedFutureEventListener<R> of( - FutureEventListener<R> listener, - OrderedScheduler scheduler, - Object key) { - return new OrderedFutureEventListener<R>(scheduler, key, listener); - } - - private final OrderedScheduler scheduler; - private final Object key; - private final FutureEventListener<R> listener; - - private OrderedFutureEventListener(OrderedScheduler scheduler, - Object key, - FutureEventListener<R> listener) { - this.scheduler = scheduler; - this.key = key; - this.listener = listener; - } - - @Override - public void onSuccess(final R value) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - listener.onSuccess(value); - } - }); - } - - @Override - public void onFailure(final Throwable cause) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - listener.onFailure(cause); - } - }); - } - } - - public static class FutureEventListenerRunnable<R> - implements FutureEventListener<R> { - - public static <R> FutureEventListenerRunnable<R> of( - FutureEventListener<R> listener, - ExecutorService executorService) { - return new FutureEventListenerRunnable<R>(executorService, listener); - } - - private final ExecutorService executorService; - private final FutureEventListener<R> listener; - - private FutureEventListenerRunnable(ExecutorService executorService, - FutureEventListener<R> listener) { - this.executorService = executorService; - this.listener = listener; - } - - @Override - public void onSuccess(final R value) { - executorService.submit(new Runnable() { - @Override - public void run() { - listener.onSuccess(value); - } - }); - } - - @Override - public void onFailure(final Throwable cause) { - executorService.submit(new Runnable() { - @Override - public void run() { - listener.onFailure(cause); - } - }); - } - } - - private static class ListFutureProcessor<T, R> - extends Function<Throwable, BoxedUnit> - implements FutureEventListener<R>, Runnable { - - private volatile boolean interrupted = false; - private final Iterator<T> itemsIter; - private final Function<T, Future<R>> processFunc; - private final Promise<List<R>> promise; - private final List<R> results; - private final ExecutorService callbackExecutor; - - ListFutureProcessor(List<T> items, - Function<T, Future<R>> processFunc, - ExecutorService callbackExecutor) { - this.itemsIter = items.iterator(); - this.processFunc = processFunc; - this.promise = new Promise<List<R>>(); - this.promise.setInterruptHandler(this); - this.results = new ArrayList<R>(); - this.callbackExecutor = callbackExecutor; - } - - @Override - public BoxedUnit apply(Throwable cause) { - interrupted = true; - return BoxedUnit.UNIT; - } - - @Override - public void onSuccess(R value) { - results.add(value); - if (null == callbackExecutor) { - run(); - } else { - callbackExecutor.submit(this); - } - } - - @Override - public void onFailure(final Throwable cause) { - if (null == callbackExecutor) { - promise.setException(cause); - } else { - callbackExecutor.submit(new Runnable() { - @Override - public void run() { - promise.setException(cause); - } - }); - } - } - - @Override - public void run() { - if (interrupted) { - logger.debug("ListFutureProcessor is interrupted."); - return; - } - if (!itemsIter.hasNext()) { - promise.setValue(results); - return; - } - processFunc.apply(itemsIter.next()).addEventListener(this); - } - } - - /** - * Process the list of items one by one using the process function <i>processFunc</i>. - * The process will be stopped immediately if it fails on processing any one. - * - * @param collection list of items - * @param processFunc process function - * @param callbackExecutor executor to process the item - * @return future presents the list of processed results - */ - public static <T, R> Future<List<R>> processList(List<T> collection, - Function<T, Future<R>> processFunc, - @Nullable ExecutorService callbackExecutor) { - ListFutureProcessor<T, R> processor = - new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor); - if (null != callbackExecutor) { - callbackExecutor.submit(processor); - } else { - processor.run(); - } - return processor.promise; - } - - /** - * Add a event listener over <i>result</i> for collecting the operation stats. - * - * @param result result to listen on - * @param opStatsLogger stats logger to record operations stats - * @param stopwatch stop watch to time operation - * @param <T> - * @return result after registered the event listener - */ - public static <T> Future<T> stats(Future<T> result, - OpStatsLogger opStatsLogger, - Stopwatch stopwatch) { - return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch)); - } - - /** - * Await for the result of the future and thrown bk related exceptions. - * - * @param result future to wait for - * @return the result of future - * @throws BKException when exceptions are thrown by the future. If there is unkown exceptions - * thrown from the future, the exceptions will be wrapped into - * {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}. - */ - public static <T> T bkResult(Future<T> result) throws BKException { - try { - return Await.result(result); - } catch (BKException bke) { - throw bke; - } catch (InterruptedException ie) { - throw BKException.create(BKException.Code.InterruptedException); - } catch (Exception e) { - logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e); - throw BKException.create(BKException.Code.UnexpectedConditionException); - } - } - - /** - * Return the bk exception return code for a <i>throwable</i>. - * - * @param throwable the cause of the exception - * @return the bk exception return code. if the exception isn't bk exceptions, - * it would return {@link BKException.Code#UnexpectedConditionException}. - */ - public static int bkResultCode(Throwable throwable) { - if (throwable instanceof BKException) { - return ((BKException)throwable).getCode(); - } - return BKException.Code.UnexpectedConditionException; - } - - /** - * Wait for the result until it completes. - * - * @param result result to wait - * @return the result - * @throws IOException when encountered exceptions on the result - */ - public static <T> T result(Future<T> result) throws IOException { - return result(result, Duration.Top()); - } - - /** - * Wait for the result for a given <i>duration</i>. - * <p>If the result is not ready within `duration`, an IOException will thrown wrapping with - * corresponding {@link com.twitter.util.TimeoutException}. - * - * @param result result to wait - * @param duration duration to wait - * @return the result - * @throws IOException when encountered exceptions on the result or waiting for the result. - */ - public static <T> T result(Future<T> result, Duration duration) - throws IOException { - try { - return Await.result(result, duration); - } catch (KeeperException ke) { - throw new ZKException("Encountered zookeeper exception on waiting result", ke); - } catch (BKException bke) { - throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode()); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw new DLInterruptedException("Interrupted on waiting result", ie); - } catch (Exception e) { - throw new IOException("Encountered exception on waiting result", e); - } - } - - /** - * Wait for the result of a lock operation. - * - * @param result result to wait - * @param lockPath path of the lock - * @return the result - * @throws LockingException when encountered exceptions on the result of lock operation - */ - public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException { - try { - return Await.result(result); - } catch (LockingException le) { - throw le; - } catch (Exception e) { - throw new LockingException(lockPath, "Encountered exception on locking ", e); - } - } - - /** - * Convert the <i>throwable</i> to zookeeper related exceptions. - * - * @param throwable cause - * @param path zookeeper path - * @return zookeeper related exceptions - */ - public static Throwable zkException(Throwable throwable, String path) { - if (throwable instanceof KeeperException) { - return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable); - } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) { - return new ZKException("Encountered zookeeper connection loss on " + path, - KeeperException.Code.CONNECTIONLOSS); - } else if (throwable instanceof InterruptedException) { - return new DLInterruptedException("Interrupted on operating " + path, throwable); - } else { - return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable); - } - } - - /** - * Cancel the future. It would interrupt the future. - * - * @param future future to cancel - */ - public static <T> void cancel(Future<T> future) { - future.raise(new FutureCancelledException()); - } - - /** - * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period. - * If the promise has been satisfied before raising, it won't change the state of the promise. - * - * @param promise promise to raise exception - * @param timeout timeout period - * @param unit timeout period unit - * @param cause cause to raise - * @param scheduler scheduler to execute raising exception - * @param key the submit key used by the scheduler - * @return the promise applied with the raise logic - */ - public static <T> Promise<T> within(final Promise<T> promise, - final long timeout, - final TimeUnit unit, - final Throwable cause, - final OrderedScheduler scheduler, - final Object key) { - if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) { - return promise; - } - // schedule a timeout to raise timeout exception - final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() { - @Override - public void run() { - if (!promise.isDefined() && FutureUtils.setException(promise, cause)) { - logger.info("Raise exception", cause); - } - } - }, timeout, unit); - // when the promise is satisfied, cancel the timeout task - promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() { - @Override - public BoxedUnit apply(Try<T> value) { - if (!task.cancel(true)) { - logger.debug("Failed to cancel the timeout task"); - } - return BoxedUnit.UNIT; - } - }); - return promise; - } - - /** - * Satisfy the <i>promise</i> with provide value in an ordered scheduler. - * <p>If the promise was already satisfied, nothing will be changed. - * - * @param promise promise to satisfy - * @param value value to satisfy - * @param scheduler scheduler to satisfy the promise with provided value - * @param key the submit key of the ordered scheduler - */ - public static <T> void setValue(final Promise<T> promise, - final T value, - OrderedScheduler scheduler, - Object key) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - setValue(promise, value); - } - }); - } - - /** - * Satisfy the <i>promise</i> with provide value. - * <p>If the promise was already satisfied, nothing will be changed. - * - * @param promise promise to satisfy - * @param value value to satisfy - * @return true if successfully satisfy the future. false if the promise has been satisfied. - */ - public static <T> boolean setValue(Promise<T> promise, T value) { - boolean success = promise.updateIfEmpty(new Return<T>(value)); - if (!success) { - logger.info("Result set multiple times. Value = '{}', New = 'Return({})'", - promise.poll(), value); - } - return success; - } - - /** - * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered scheduler. - * - * @param promise promise to satisfy - * @param throwable cause to satisfy - * @param scheduler the scheduler to satisfy the promise - * @param key submit key of the ordered scheduler - */ - public static <T> void setException(final Promise<T> promise, - final Throwable cause, - OrderedScheduler scheduler, - Object key) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - setException(promise, cause); - } - }); - } - - /** - * Satisfy the <i>promise</i> with provided <i>cause</i>. - * - * @param promise promise to satisfy - * @param cause cause to satisfy - * @return true if successfully satisfy the future. false if the promise has been satisfied. - */ - public static <T> boolean setException(Promise<T> promise, Throwable cause) { - boolean success = promise.updateIfEmpty(new Throw<T>(cause)); - if (!success) { - logger.info("Result set multiple times. Value = '{}', New = 'Throw({})'", - promise.poll(), cause); - } - return success; - } - - /** - * Ignore exception from the <i>future</i>. - * - * @param future the original future - * @return a transformed future ignores exceptions - */ - public static <T> Promise<Void> ignore(Future<T> future) { - return ignore(future, null); - } - - /** - * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions - * - * @param future the original future - * @param errorMsg the error message to log on exceptions - * @return a transformed future ignores exceptions - */ - public static <T> Promise<Void> ignore(Future<T> future, final String errorMsg) { - final Promise<Void> promise = new Promise<Void>(); - future.addEventListener(new FutureEventListener<T>() { - @Override - public void onSuccess(T value) { - setValue(promise, null); - } - - @Override - public void onFailure(Throwable cause) { - if (null != errorMsg) { - logger.error(errorMsg, cause); - } - setValue(promise, null); - } - }); - return promise; - } - - /** - * Create transmit exception from transmit result. - * - * @param transmitResult - * transmit result (basically bk exception code) - * @return transmit exception - */ - public static BKTransmitException transmitException(int transmitResult) { - return new BKTransmitException("Failed to write to bookkeeper; Error is (" - + transmitResult + ") " - + BKException.getMessage(transmitResult), transmitResult); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java deleted file mode 100644 index 3372476..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredFuturePool.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import com.google.common.base.Stopwatch; - -import com.twitter.util.FuturePool; -import com.twitter.util.FuturePool$; -import com.twitter.util.Future; - -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import scala.runtime.BoxedUnit; -import scala.Function0; - -/** - * {@link FuturePool} with exposed stats. This class is exposing following stats for helping understanding - * the healthy of this thread pool executor. - * <h3>Metrics</h3> - * Stats are only exposed when <code>traceTaskExecution</code> is true. - * <ul> - * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on waiting - * being executed. - * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on executing. - * <li>task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on submitting. - * <li>tasks_pending: gauge. how many tasks are pending in this future pool. - * </ul> - */ -public class MonitoredFuturePool implements FuturePool { - static final Logger LOG = LoggerFactory.getLogger(MonitoredFuturePool.class); - - private final FuturePool futurePool; - - private final StatsLogger statsLogger; - private final OpStatsLogger taskPendingTime; - private final OpStatsLogger taskExecutionTime; - private final OpStatsLogger taskEnqueueTime; - private final Counter taskPendingCounter; - - private final boolean traceTaskExecution; - private final long traceTaskExecutionWarnTimeUs; - - class TimedFunction0<T> extends com.twitter.util.Function0<T> { - private final Function0<T> function0; - private Stopwatch pendingStopwatch = Stopwatch.createStarted(); - - TimedFunction0(Function0<T> function0) { - this.function0 = function0; - this.pendingStopwatch = Stopwatch.createStarted(); - } - - @Override - public T apply() { - taskPendingTime.registerSuccessfulEvent(pendingStopwatch.elapsed(TimeUnit.MICROSECONDS)); - Stopwatch executionStopwatch = Stopwatch.createStarted(); - T result = function0.apply(); - taskExecutionTime.registerSuccessfulEvent(executionStopwatch.elapsed(TimeUnit.MICROSECONDS)); - long elapsed = executionStopwatch.elapsed(TimeUnit.MICROSECONDS); - if (elapsed > traceTaskExecutionWarnTimeUs) { - LOG.info("{} took too long {} microseconds", function0.toString(), elapsed); - } - return result; - } - } - - /** - * Create a future pool with stats exposed. - * - * @param futurePool underlying future pool to execute futures - * @param statsLogger stats logger to receive exposed stats - * @param traceTaskExecution flag to enable/disable exposing stats about task execution - * @param traceTaskExecutionWarnTimeUs flag to enable/disable logging slow tasks - * whose execution time is above this value - */ - public MonitoredFuturePool(FuturePool futurePool, - StatsLogger statsLogger, - boolean traceTaskExecution, - long traceTaskExecutionWarnTimeUs) { - this.futurePool = futurePool; - this.traceTaskExecution = traceTaskExecution; - this.traceTaskExecutionWarnTimeUs = traceTaskExecutionWarnTimeUs; - this.statsLogger = statsLogger; - this.taskPendingTime = statsLogger.getOpStatsLogger("task_pending_time"); - this.taskExecutionTime = statsLogger.getOpStatsLogger("task_execution_time"); - this.taskEnqueueTime = statsLogger.getOpStatsLogger("task_enqueue_time"); - this.taskPendingCounter = statsLogger.getCounter("tasks_pending"); - } - - @Override - public <T> Future<T> apply(Function0<T> function0) { - if (traceTaskExecution) { - taskPendingCounter.inc(); - Stopwatch taskEnqueueStopwatch = Stopwatch.createStarted(); - Future<T> futureResult = futurePool.apply(new TimedFunction0<T>(function0)); - taskEnqueueTime.registerSuccessfulEvent(taskEnqueueStopwatch.elapsed(TimeUnit.MICROSECONDS)); - futureResult.ensure(new com.twitter.util.Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - taskPendingCounter.dec(); - return null; - } - }); - return futureResult; - } else { - return futurePool.apply(function0); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java deleted file mode 100644 index 3121a19..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.MathUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * {@link ScheduledThreadPoolExecutor} with exposed stats. This class is exposing following stats for - * helping understanding the healthy of this thread pool executor. - * <h3>Metrics</h3> - * <ul> - * <li>pending_tasks: gauge. how many tasks are pending in this executor. - * <li>completed_tasks: gauge. how many tasks are completed in this executor. - * <li>total_tasks: gauge. how many tasks are submitted to this executor. - * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on - * waiting being executed. - * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on - * executing. - * </ul> - */ -public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { - static final Logger LOG = LoggerFactory.getLogger(MonitoredScheduledThreadPoolExecutor.class); - - private class TimedRunnable implements Runnable { - - final Runnable runnable; - final long enqueueNanos; - - TimedRunnable(Runnable runnable) { - this.runnable = runnable; - this.enqueueNanos = MathUtils.nowInNano(); - } - - @Override - public void run() { - long startNanos = MathUtils.nowInNano(); - long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos); - taskPendingStats.registerSuccessfulEvent(pendingMicros); - try { - runnable.run(); - } finally { - long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos); - taskExecutionStats.registerSuccessfulEvent(executionMicros); - } - } - - @Override - public String toString() { - return runnable.toString(); - } - - @Override - public int hashCode() { - return runnable.hashCode(); - } - } - - private class TimedCallable<T> implements Callable<T> { - - final Callable<T> task; - final long enqueueNanos; - - TimedCallable(Callable<T> task) { - this.task = task; - this.enqueueNanos = MathUtils.nowInNano(); - } - - @Override - public T call() throws Exception { - long startNanos = MathUtils.nowInNano(); - long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos); - taskPendingStats.registerSuccessfulEvent(pendingMicros); - try { - return task.call(); - } finally { - long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos); - taskExecutionStats.registerSuccessfulEvent(executionMicros); - } - } - } - - protected final boolean traceTaskExecution; - protected final OpStatsLogger taskExecutionStats; - protected final OpStatsLogger taskPendingStats; - protected final StatsLogger statsLogger; - // Gauges and their labels - private static final String pendingTasksGaugeLabel = "pending_tasks"; - private final Gauge<Number> pendingTasksGauge; - private static final String completedTasksGaugeLabel = "completed_tasks"; - protected final Gauge<Number> completedTasksGauge; - private static final String totalTasksGaugeLabel = "total_tasks"; - protected final Gauge<Number> totalTasksGauge; - - public MonitoredScheduledThreadPoolExecutor(int corePoolSize, - ThreadFactory threadFactory, - StatsLogger statsLogger, - boolean traceTaskExecution) { - super(corePoolSize, threadFactory); - this.traceTaskExecution = traceTaskExecution; - this.statsLogger = statsLogger; - this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time"); - this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time"); - this.pendingTasksGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return getQueue().size(); - } - }; - this.completedTasksGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return getCompletedTaskCount(); - } - }; - this.totalTasksGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return getTaskCount(); - } - }; - - // outstanding tasks - this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge); - // completed tasks - this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge); - // total tasks - this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge); - } - - private Runnable timedRunnable(Runnable r) { - return traceTaskExecution ? new TimedRunnable(r) : r; - } - - private <T> Callable<T> timedCallable(Callable<T> task) { - return traceTaskExecution ? new TimedCallable<T>(task) : task; - } - - @Override - public Future<?> submit(Runnable task) { - return super.submit(timedRunnable(task)); - } - - @Override - public <T> Future<T> submit(Runnable task, T result) { - return super.submit(timedRunnable(task), result); - } - - @Override - public <T> Future<T> submit(Callable<T> task) { - return super.submit(timedCallable(task)); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - Throwable hiddenThrowable = extractThrowable(r); - if (hiddenThrowable != null) - logAndHandle(hiddenThrowable, true); - - // The executor re-throws exceptions thrown by the task to the uncaught exception handler - // so we don't need to pass the exception to the handler explicitly - if (null != t) { - logAndHandle(t, false); - } - } - - /** - * The executor re-throws exceptions thrown by the task to the uncaught exception handler - * so we only need to do anything if uncaught exception handler has not been se - */ - private void logAndHandle(Throwable t, boolean passToHandler) { - if (Thread.getDefaultUncaughtExceptionHandler() == null) { - LOG.error("Unhandled exception on thread {}", Thread.currentThread().getName(), t); - } - else { - LOG.info("Unhandled exception on thread {}", Thread.currentThread().getName(), t); - if (passToHandler) { - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); - } - } - } - - - /** - * Extract the exception (throwable) inside the ScheduledFutureTask - * @param runnable - The runable that was executed - * @return exception enclosed in the Runnable if any; null otherwise - */ - private Throwable extractThrowable(Runnable runnable) { - // Check for exceptions wrapped by FutureTask. - // We do this by calling get(), which will cause it to throw any saved exception. - // Check for isDone to prevent blocking - if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) { - try { - ((Future<?>) runnable).get(); - } catch (CancellationException e) { - LOG.debug("Task {} cancelled", runnable, e.getCause()); - } catch (InterruptedException e) { - LOG.debug("Task {} was interrupted", runnable, e); - } catch (ExecutionException e) { - return e.getCause(); - } - } - - return null; - } - - void unregisterGauges() { - this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge); - this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge); - this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java deleted file mode 100644 index ad1ba4e..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java +++ /dev/null @@ -1,490 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import com.google.common.base.Objects; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.distributedlog.stats.BroadCastStatsLogger; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; -import com.twitter.util.Time; -import com.twitter.util.Timer; -import com.twitter.util.TimerTask; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.MathUtils; -import scala.Function0; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing - * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i> - * will be executed in order. - * <p> - * The scheduler is comprised of multiple {@link MonitoredScheduledThreadPoolExecutor}s. Each - * {@link MonitoredScheduledThreadPoolExecutor} is a single thread executor. Normal task submissions will - * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g - * {@link OrderedScheduler#apply(Object, Function0)} will be submitted to a dedicated executor based on - * the hash value of submit <i>key</i>. - * - * <h3>Metrics</h3> - * - * <h4>Per Executor Metrics</h4> - * - * Metrics about individual executors are exposed via {@link Builder#perExecutorStatsLogger} - * under <i>`scope`/`name`-executor-`id`-0</i>. `name` is the scheduler name provided by {@link Builder#name} - * while `id` is the index of this executor in the pool. And corresponding stats of future pool of - * that executor are exposed under <i>`scope`/`name`-executor-`id`-0/futurepool</i>. - * <p> - * See {@link MonitoredScheduledThreadPoolExecutor} and {@link MonitoredFuturePool} for per executor metrics - * exposed. - * - * <h4>Aggregated Metrics</h4> - * <ul> - * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on - * waiting being executed. - * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on - * executing. - * <li>futurepool/task_pending_time: opstats. measuring the characteristics about the time that tasks spent - * on waiting in future pool being executed. - * <li>futurepool/task_execution_time: opstats. measuring the characteristics about the time that tasks spent - * on executing. - * <li>futurepool/task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on - * submitting to future pool. - * <li>futurepool/tasks_pending: gauge. how many tasks are pending in this future pool. - * </ul> - */ -public class OrderedScheduler implements ScheduledExecutorService { - - /** - * Create a builder to build scheduler. - * - * @return scheduler builder - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder for {@link OrderedScheduler}. - */ - public static class Builder { - - private String name = "OrderedScheduler"; - private int corePoolSize = -1; - private ThreadFactory threadFactory = null; - private boolean traceTaskExecution = false; - private long traceTaskExecutionWarnTimeUs = Long.MAX_VALUE; - private StatsLogger statsLogger = NullStatsLogger.INSTANCE; - private StatsLogger perExecutorStatsLogger = NullStatsLogger.INSTANCE; - - /** - * Set the name of this scheduler. It would be used as part of stats scope and thread name. - * - * @param name - * name of the scheduler. - * @return scheduler builder - */ - public Builder name(String name) { - this.name = name; - return this; - } - - /** - * Set the number of threads to be used in this scheduler. - * - * @param corePoolSize the number of threads to keep in the pool, even - * if they are idle - * @return scheduler builder - */ - public Builder corePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; - return this; - } - - /** - * Set the thread factory that the scheduler uses to create a new thread. - * - * @param threadFactory the factory to use when the executor - * creates a new thread - * @return scheduler builder - */ - public Builder threadFactory(ThreadFactory threadFactory) { - this.threadFactory = threadFactory; - return this; - } - - /** - * Enable/Disable exposing task execution stats. - * - * @param trace - * flag to enable/disable exposing task execution stats. - * @return scheduler builder - */ - public Builder traceTaskExecution(boolean trace) { - this.traceTaskExecution = trace; - return this; - } - - /** - * Enable/Disable logging slow tasks whose execution time is above <code>timeUs</code>. - * - * @param timeUs - * slow task execution time threshold in us. - * @return scheduler builder. - */ - public Builder traceTaskExecutionWarnTimeUs(long timeUs) { - this.traceTaskExecutionWarnTimeUs = timeUs; - return this; - } - - /** - * Expose the aggregated stats over <code>statsLogger</code>. - * - * @param statsLogger - * stats logger to receive aggregated stats. - * @return scheduler builder - */ - public Builder statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - /** - * Expose stats of individual executors over <code>perExecutorStatsLogger</code>. - * Each executor's stats will be exposed under a sub-scope `name`-executor-`id`-0. - * `name` is the scheduler name, while `id` is the index of the scheduler in the pool. - * - * @param perExecutorStatsLogger - * stats logger to receive per executor stats. - * @return scheduler builder - */ - public Builder perExecutorStatsLogger(StatsLogger perExecutorStatsLogger) { - this.perExecutorStatsLogger = perExecutorStatsLogger; - return this; - } - - /** - * Build the ordered scheduler. - * - * @return ordered scheduler - */ - public OrderedScheduler build() { - if (corePoolSize <= 0) { - corePoolSize = Runtime.getRuntime().availableProcessors(); - } - if (null == threadFactory) { - threadFactory = Executors.defaultThreadFactory(); - } - - return new OrderedScheduler( - name, - corePoolSize, - threadFactory, - traceTaskExecution, - traceTaskExecutionWarnTimeUs, - statsLogger, - perExecutorStatsLogger); - } - - } - - protected final String name; - protected final int corePoolSize; - protected final MonitoredScheduledThreadPoolExecutor[] executors; - protected final MonitoredFuturePool[] futurePools; - protected final Random random; - - private OrderedScheduler(String name, - int corePoolSize, - ThreadFactory threadFactory, - boolean traceTaskExecution, - long traceTaskExecutionWarnTimeUs, - StatsLogger statsLogger, - StatsLogger perExecutorStatsLogger) { - this.name = name; - this.corePoolSize = corePoolSize; - this.executors = new MonitoredScheduledThreadPoolExecutor[corePoolSize]; - this.futurePools = new MonitoredFuturePool[corePoolSize]; - for (int i = 0; i < corePoolSize; i++) { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat(name + "-executor-" + i + "-%d") - .setThreadFactory(threadFactory) - .build(); - StatsLogger broadcastStatsLogger = - BroadCastStatsLogger.masterslave(perExecutorStatsLogger.scope("executor-" + i), statsLogger); - executors[i] = new MonitoredScheduledThreadPoolExecutor( - 1, tf, broadcastStatsLogger, traceTaskExecution); - futurePools[i] = new MonitoredFuturePool( - new ExecutorServiceFuturePool(executors[i]), - broadcastStatsLogger.scope("futurepool"), - traceTaskExecution, - traceTaskExecutionWarnTimeUs); - } - this.random = new Random(System.currentTimeMillis()); - } - - protected MonitoredScheduledThreadPoolExecutor chooseExecutor() { - return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)]; - } - - protected MonitoredScheduledThreadPoolExecutor chooseExecutor(Object key) { - return corePoolSize == 1 ? executors[0] : - executors[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)]; - } - - protected FuturePool chooseFuturePool(Object key) { - return corePoolSize == 1 ? futurePools[0] : - futurePools[MathUtils.signSafeMod(Objects.hashCode(key), corePoolSize)]; - } - - protected FuturePool chooseFuturePool() { - return corePoolSize == 1 ? futurePools[0] : futurePools[random.nextInt(corePoolSize)]; - } - - /** - * {@inheritDoc} - */ - @Override - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { - return chooseExecutor().schedule(command, delay, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { - return chooseExecutor().schedule(callable, delay, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, - long initialDelay, long period, TimeUnit unit) { - return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, - long initialDelay, long delay, TimeUnit unit) { - return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public void shutdown() { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - // Unregister gauges - executor.unregisterGauges(); - executor.shutdown(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<Runnable> shutdownNow() { - List<Runnable> runnables = new ArrayList<Runnable>(); - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - runnables.addAll(executor.shutdownNow()); - } - return runnables; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isShutdown() { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - if (!executor.isShutdown()) { - return false; - } - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isTerminated() { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - if (!executor.isTerminated()) { - return false; - } - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - for (MonitoredScheduledThreadPoolExecutor executor : executors) { - if (!executor.awaitTermination(timeout, unit)) { - return false; - } - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public <T> Future<T> submit(Callable<T> task) { - return chooseExecutor().submit(task); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> Future<T> submit(Runnable task, T result) { - return chooseExecutor().submit(task, result); - } - - /** - * {@inheritDoc} - */ - @Override - public Future<?> submit(Runnable task) { - return chooseExecutor().submit(task); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - return chooseExecutor().invokeAll(tasks); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return chooseExecutor().invokeAll(tasks, timeout, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) - throws InterruptedException, ExecutionException { - return chooseExecutor().invokeAny(tasks); - } - - /** - * {@inheritDoc} - */ - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return chooseExecutor().invokeAny(tasks, timeout, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public void execute(Runnable command) { - chooseExecutor().execute(command); - } - - // Ordered Functions - - /** - * Return a future pool used by <code>key</code>. - * - * @param key - * key to order in the future pool - * @return future pool - */ - public FuturePool getFuturePool(Object key) { - return chooseFuturePool(key); - } - - /** - * Execute the <code>function</code> in the executor that assigned by <code>key</code>. - * - * @see com.twitter.util.Future - * @param key key of the <i>function</i> to run - * @param function function to run - * @return future representing the result of the <i>function</i> - */ - public <T> com.twitter.util.Future<T> apply(Object key, Function0<T> function) { - return chooseFuturePool(key).apply(function); - } - - /** - * Execute the <code>function</code> by the scheduler. It would be submitted to any executor randomly. - * - * @param function function to run - * @return future representing the result of the <i>function</i> - */ - public <T> com.twitter.util.Future<T> apply(Function0<T> function) { - return chooseFuturePool().apply(function); - } - - public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) { - return chooseExecutor(key).schedule(command, delay, unit); - } - - public ScheduledFuture<?> scheduleAtFixedRate(Object key, - Runnable command, - long initialDelay, - long period, - TimeUnit unit) { - return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit); - } - - public Future<?> submit(Object key, Runnable command) { - return chooseExecutor(key).submit(command); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java deleted file mode 100644 index 15394dc..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitLimiter.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -/** - * A simple limiter interface which tracks acquire/release of permits, for - * example for tracking outstanding writes. - */ -public interface PermitLimiter { - - public static PermitLimiter NULL_PERMIT_LIMITER = new PermitLimiter() { - @Override - public boolean acquire() { - return true; - } - @Override - public void release(int permits) { - } - - @Override - public void close() { - - } - }; - - /** - * Acquire a permit. - * - * @return true if successfully acquire a permit, otherwise false. - */ - boolean acquire(); - - /** - * Release a permit. - */ - void release(int permits); - - /** - * Close the resources created by the limiter - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java deleted file mode 100644 index 24c7860..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/PermitManager.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -public interface PermitManager { - - public static interface Permit { - static final Permit ALLOWED = new Permit() { - @Override - public boolean isAllowed() { - return true; - } - }; - boolean isAllowed(); - } - - public static PermitManager UNLIMITED_PERMIT_MANAGER = new PermitManager() { - @Override - public Permit acquirePermit() { - return Permit.ALLOWED; - } - - @Override - public void releasePermit(Permit permit) { - // nop - } - - @Override - public boolean allowObtainPermits() { - return true; - } - - @Override - public boolean disallowObtainPermits(Permit permit) { - return false; - } - - @Override - public void close() { - // nop - } - - }; - - /** - * Obetain a permit from permit manager. - * - * @return permit. - */ - Permit acquirePermit(); - - /** - * Release a given permit. - * - * @param permit - * permit to release - */ - void releasePermit(Permit permit); - - /** - * Allow obtaining permits. - */ - boolean allowObtainPermits(); - - /** - * Disallow obtaining permits. Disallow needs to be performed under the context - * of <i>permit</i>. - * - * @param permit - * permit context to disallow - */ - boolean disallowObtainPermits(Permit permit); - - /** - * Release the resources - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java deleted file mode 100644 index a467d26..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SafeQueueingFuturePool.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import com.google.common.base.Preconditions; - -import com.twitter.util.Function0; -import com.twitter.util.FuturePool; -import com.twitter.util.Future; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicInteger; - -import scala.runtime.BoxedUnit; - -/** - * Acts like a future pool, but collects failed apply calls into a queue to be applied - * in-order on close. This happens either in the close thread or after close is called, - * in the last operation to complete execution. - * Ops submitted after close will not be scheduled, so its important to ensure no more - * ops will be applied once close has been called. - */ -public class SafeQueueingFuturePool<T> { - - static final Logger LOG = LoggerFactory.getLogger(SafeQueueingFuturePool.class); - - private boolean closed; - private int outstanding; - private ConcurrentLinkedQueue<Function0<T>> queue; - private FuturePool orderedFuturePool; - - public SafeQueueingFuturePool(FuturePool orderedFuturePool) { - this.closed = false; - this.outstanding = 0; - this.queue = new ConcurrentLinkedQueue<Function0<T>>(); - this.orderedFuturePool = orderedFuturePool; - } - - public synchronized Future<T> apply(final Function0<T> fn) { - Preconditions.checkNotNull(fn); - if (closed) { - return Future.exception(new RejectedExecutionException("Operation submitted to closed SafeQueueingFuturePool")); - } - ++outstanding; - queue.add(fn); - Future<T> result = orderedFuturePool.apply(new Function0<T>() { - @Override - public T apply() { - return queue.poll().apply(); - } - @Override - public String toString() { - return fn.toString(); - } - }).ensure(new Function0<BoxedUnit>() { - public BoxedUnit apply() { - if (decrOutstandingAndCheckDone()) { - applyAll(); - } - return null; - } - }); - return result; - } - - private synchronized boolean decrOutstandingAndCheckDone() { - return --outstanding == 0 && closed; - } - - public void close() { - final boolean done; - synchronized (this) { - if (closed) { - return; - } - closed = true; - done = (outstanding == 0); - } - if (done) { - applyAll(); - } - } - - private void applyAll() { - if (!queue.isEmpty()) { - LOG.info("Applying {} items", queue.size()); - } - while (!queue.isEmpty()) { - queue.poll().apply(); - } - } - - public synchronized int size() { - return queue.size(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java deleted file mode 100644 index 66e382c..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SchedulerUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -public class SchedulerUtils { - - static final Logger logger = LoggerFactory.getLogger(SchedulerUtils.class); - - public static void shutdownScheduler(ExecutorService service, long timeout, TimeUnit timeUnit) { - if (null == service) { - return; - } - service.shutdown(); - try { - service.awaitTermination(timeout, timeUnit); - } catch (InterruptedException e) { - logger.warn("Interrupted when shutting down scheduler : ", e); - } - service.shutdownNow(); - } - - public static void shutdownScheduler(OrderedSafeExecutor service, long timeout, TimeUnit timeUnit) { - if (null == service) { - return; - } - service.shutdown(); - try { - service.awaitTermination(timeout, timeUnit); - } catch (InterruptedException e) { - logger.warn("Interrupted when shutting down scheduler : ", e); - } - service.forceShutdown(timeout, timeUnit); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java deleted file mode 100644 index ab8de35..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sequencer.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -/** - * Sequencer generating transaction id. - */ -public interface Sequencer { - - /** - * Return next transaction id generated by the sequencer. - * - * @return next transaction id generated by the sequencer. - */ - long nextId(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java index 767ddf6..3697b3f 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,8 +25,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.distributedlog.common.util.PermitLimiter; /** * Simple counter based {@link PermitLimiter}. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java deleted file mode 100644 index 2f606e2..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Sizable.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -/** - * The {@code Sizable} interface is to provide the capability of calculating size - * of any objects. - */ -public interface Sizable { - /** - * Calculate the size for this instance. - * - * @return size of the instance. - */ - long size(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java index 69dfdbe..5bc5af2 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/TimeSequencer.java @@ -18,6 +18,7 @@ package org.apache.distributedlog.util; import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.common.util.Sequencer; /** * Time based sequencer. It generated non-decreasing transaction id using milliseconds. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java index 3a623dc..d90a7f8 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/Transaction.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,7 +18,7 @@ package org.apache.distributedlog.util; import com.google.common.annotations.Beta; -import com.twitter.util.Future; +import java.util.concurrent.CompletableFuture; /** * Util class represents a transaction @@ -44,7 +44,7 @@ public interface Transaction<OpResult> { } /** - * Listener on the result of an {@link org.apache.distributedlog.util.Transaction.Op}. + * Listener on the result of an {@link Transaction.Op}. * * @param <OpResult> */ @@ -77,12 +77,12 @@ public interface Transaction<OpResult> { /** * Execute the current transaction. If the transaction succeed, all operations will be - * committed (via {@link org.apache.distributedlog.util.Transaction.Op#commit(Object)}. + * committed (via {@link Transaction.Op#commit(Object)}. * Otherwise, all operations will be aborted (via {@link Op#abort(Throwable, Object)}). * * @return future representing the result of transaction execution. */ - Future<Void> execute(); + CompletableFuture<Void> execute(); /** * Abort current transaction. If this is called and the transaction haven't been executed by