http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java b/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java new file mode 100644 index 0000000..243cf33 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java @@ -0,0 +1,53 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +import com.google.common.base.Preconditions; + +/** + * Joins a task queue with an executor service, to add control over when + * tasks are actually made available for execution. + * + * @author Srinivasan Rajagopal + */ +public class QueueDrainer<T extends Runnable> implements Runnable { + + private final Executor taskExecutor; + private final BlockingQueue<T> blockingQueue; + + /** + * Creates a QueueDrainer that associates the queue with an executorService. + * + * @param taskExecutor Executor to execute a task if present. + * @param blockingQueue Queue to poll if there is a runnable to execute. + */ + public QueueDrainer(Executor taskExecutor, BlockingQueue<T> blockingQueue) { + this.taskExecutor = Preconditions.checkNotNull(taskExecutor); + this.blockingQueue = Preconditions.checkNotNull(blockingQueue); + } + + /** + * Picks tasks from the Queue to execute if present else no-op. + */ + @Override + public void run() { + Runnable command = blockingQueue.poll(); + if (command != null) { + taskExecutor.execute(command); + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/Random.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/Random.java b/commons/src/main/java/org/apache/aurora/common/util/Random.java new file mode 100644 index 0000000..a1f1496 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/Random.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import com.google.common.base.Preconditions; + +/** + * An interface to define the common functionality that is required for generating random values. + * + * @author William Farner + */ +public interface Random { + + /** + * @see java.util.Random#nextDouble() + */ + public double nextDouble(); + + /** + * @see java.util.Random#nextInt(int) + */ + public int nextInt(int n); + + /** + * A Random that wraps a java.util.Random. + */ + static class SystemRandom implements Random { + private final java.util.Random rand; + + public SystemRandom(java.util.Random rand) { + this.rand = Preconditions.checkNotNull(rand); + } + + @Override + public double nextDouble() { + return rand.nextDouble(); + } + + @Override + public int nextInt(int n) { + return rand.nextInt(n); + } + } + + // Utility class. + public static class Util { + private Util() {} + + /** + * Creates a new Random based off the default system Random. + * @return A new default Random. + */ + public static Random newDefaultRandom() { + return new SystemRandom(new java.util.Random()); + } + + /** + * Adapts a java.util.Random into a Random. + * + * @param rand The java.util.Random to adapt. + * @return A new Random. + */ + public static Random fromSystemRandom(java.util.Random rand) { + return new SystemRandom(rand); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java b/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java new file mode 100644 index 0000000..3e4de30 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java @@ -0,0 +1,88 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +//************************************************************************ +// +// Summize +// +// This work protected by US Copyright Law and contains proprietary and +// confidential trade secrets. +// +// (c) Copyright 2006 Summize, ALL RIGHTS RESERVED. +// +//************************************************************************ +package org.apache.aurora.common.util; + +/** + * Generic range normalizer class. Values must be positive. + * + * @author Abdur Chowdhury + */ +public class RangeNormalizer { + public RangeNormalizer(double minA, double maxA, double minB, double maxB) { + _minA = minA; + _maxA = maxA; + _minB = minB; + _maxB = maxB; + _denominator = (_maxA - _minA); + _B = (_maxB - _minB); + _midB = minB + (_B / 2f); + } + + public double normalize(double value) { + // if no input range, return a mid range value + if (_denominator == 0) { + return _midB; + } + + return ((value - _minA) / _denominator) * _B + _minB; + } + + public static double normalize(double value, double minA, double maxA, double minB, double maxB) { + // if the source min and max are equal, don't return 0, return something + // in the target range (perhaps this "default" should be another argument) + if (minA == maxA) { + return minB; + } + + return ((value - minA) / (maxA - minA)) * (maxB - minB) + minB; + } + + public static float normalizeToStepDistribution(double rating) { + int integerRating = (int) Math.round(rating); + + if (integerRating == 2) { + integerRating = 1; + } else if (integerRating == 4) { + integerRating = 3; + } else if (integerRating == 6) { + integerRating = 5; + } else if (integerRating == 8) { + integerRating = 7; + } else if (integerRating == 9) { + integerRating = 10; + } + + return (float) integerRating; + } + + // ******************************************************************* + private double _denominator; + private double _B; + private double _minA = Double.MIN_VALUE; + private double _maxA = Double.MAX_VALUE; + private double _minB = Double.MIN_VALUE; + private double _maxB = Double.MAX_VALUE; + private double _midB = Double.MAX_VALUE; +} + http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java b/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java new file mode 100644 index 0000000..05b3c5f --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java @@ -0,0 +1,88 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * CommandExecutor that invokes {@code queueDrainer} with a best-effort + * mechanism to execute with a fixed interval between requests of {@code + * intervalBetweenRequests}. + * + * @author Srinivasan Rajagopal + */ +public class RateLimitedCommandExecutor implements CommandExecutor { + + private static final Logger LOG = Logger.getLogger(RateLimitedCommandExecutor.class.getName()); + + private final BlockingQueue<RetryingRunnable<?>> blockingQueue; + + /** + * Create a CommandExecutor that executes enquequed tasks in the task + * executor with specified interval between executions. + * + * @param taskExecutor executor for periodic execution of enqueued tasks. + * @param intervalBetweenRequests interval between requests to rate limit + * request rate. + * @param queueDrainer A runnable that is responsible for draining the queue. + * @param blockingQueue Queue to keep outstanding work in. + */ + public RateLimitedCommandExecutor( + ScheduledExecutorService taskExecutor, + Amount<Long, Time> intervalBetweenRequests, + Runnable queueDrainer, + BlockingQueue<RetryingRunnable<?>> blockingQueue) { + + checkNotNull(taskExecutor); + checkNotNull(intervalBetweenRequests); + checkArgument(intervalBetweenRequests.as(Time.MILLISECONDS) > 0); + checkNotNull(queueDrainer); + this.blockingQueue = checkNotNull(blockingQueue); + taskExecutor.scheduleWithFixedDelay( + getSafeRunner(queueDrainer), + 0, + intervalBetweenRequests.as(Time.MILLISECONDS), + TimeUnit.MILLISECONDS); + } + + private static Runnable getSafeRunner(final Runnable runnable) { + return new Runnable() { + @Override public void run() { + try { + runnable.run(); + } catch (RuntimeException t) { + LOG.log(Level.INFO, " error processing task " + runnable); + } + } + }; + } + + @Override + public <E extends Exception> void execute(String name, ExceptionalCommand<E> task, + Class<E> exceptionClass, int numTries, Amount<Long, Time> retryDelay) { + blockingQueue.add(new RetryingRunnable<E>(name, task, exceptionClass, + numTries, retryDelay, this)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java b/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java new file mode 100644 index 0000000..2f66d13 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java @@ -0,0 +1,131 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Throwables; + +import org.apache.commons.lang.builder.ToStringBuilder; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A runnable task that is retried in a user-configurable fashion. + * + * @param <E> The type of exception that the ExceptionalCommand throws. + * + * @author Utkarsh Srivastava + */ +public class RetryingRunnable<E extends Exception> implements Runnable { + private final String name; + private final int tryNum; + private final int numTries; + private final Amount<Long, Time> retryDelay; + private final ExceptionalCommand<E> task; + private final CommandExecutor commandExecutor; + private final Class<E> exceptionClass; + + private static final Logger LOG = Logger.getLogger(RetryingRunnable.class.getName()); + + /** + * Create a Task with name {@code name} that executes at most {@code numTries} + * in case of failure with an interval of {@code retryDelay} between attempts. + * + * @param name Human readable name for this task. + * @param task the task to execute. + * @param exceptionClass class of the exception thrown by the task. + * @param numTries the total number of times to try. + * @param retryDelay the delay between successive tries. + * @param commandExecutor Executor to resubmit retries to. + * @param tryNum the seq number of this try. + */ + public RetryingRunnable( + String name, + ExceptionalCommand<E> task, + Class<E> exceptionClass, + int numTries, + Amount<Long, Time> retryDelay, + CommandExecutor commandExecutor, + int tryNum) { + + this.name = checkNotNull(name); + this.task = checkNotNull(task); + this.exceptionClass = checkNotNull(exceptionClass); + this.retryDelay = checkNotNull(retryDelay); + this.commandExecutor = checkNotNull(commandExecutor); + checkArgument(numTries > 0); + this.tryNum = tryNum; + this.numTries = numTries; + } + + /** + * Create a Task with name {@code name} that executes at most {@code numTries} + * in case of failure with an interval of {@code retryDelay} between attempts + * and sets tryNum to be the first (=1). + * + * @param name Human readable name for this task. + * @param task the task to execute. + * @param exceptionClass class of the exception thrown by the task. + * @param numTries the total number of times to try. + * @param retryDelay the delay between successive tries. + * @param commandExecutor Executor to resubmit retries to. + */ + public RetryingRunnable( + String name, + ExceptionalCommand<E> task, + Class<E> exceptionClass, + int numTries, + Amount<Long, Time> retryDelay, + CommandExecutor commandExecutor) { + + this(name, task, exceptionClass, numTries, retryDelay, commandExecutor, /*tryNum=*/ 1); + } + + @Override + public void run() { + try { + task.execute(); + } catch (Exception e) { + if (e.getClass().isAssignableFrom(exceptionClass)) { + if (tryNum < numTries) { + commandExecutor.execute(name, task, exceptionClass, numTries - 1, retryDelay); + } else { + LOG.log(Level.INFO, "Giving up on task: " + name + " " + + "after " + "trying " + numTries + " times" + ".", e); + } + } else { + LOG.log(Level.INFO, "Giving up on task: " + name + " after trying " + + numTries + " times. " + "due to unhandled exception ", e); + throw Throwables.propagate(e); + } + } + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("name", name) + .append("tryNum", tryNum) + .append("numTries", numTries) + .append("retryDelay", retryDelay) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/Sampler.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/Sampler.java b/commons/src/main/java/org/apache/aurora/common/util/Sampler.java new file mode 100644 index 0000000..d42a269 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/Sampler.java @@ -0,0 +1,54 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import com.google.common.base.Preconditions; + +/** + * A sampler that implements logic for fractional random selection. + * + * @author William Farner + */ +public class Sampler { + + private final Random rand; + private final double threshold; + + /** + * Creates a new sampler using the default system {@link Random}. + * + * @param selectPercent Percentage to randomly select, must be between 0 and 100 (inclusive). + */ + public Sampler(float selectPercent) { + this(selectPercent, Random.Util.newDefaultRandom()); + } + + /** + * Creates a new sampler using the provided {@link Random}. + * + * @param selectPercent Percentage to randoml select, must be between 0 and 100 (inclusive). + * @param rand The random utility to use for generating random numbers. + */ + public Sampler(float selectPercent, Random rand) { + Preconditions.checkArgument((selectPercent >= 0) && (selectPercent <= 100), + "Invalid selectPercent value: " + selectPercent); + + this.threshold = selectPercent / 100; + this.rand = Preconditions.checkNotNull(rand); + } + + public boolean select() { + return rand.nextDouble() < threshold; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/StartWatch.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/StartWatch.java b/commons/src/main/java/org/apache/aurora/common/util/StartWatch.java new file mode 100644 index 0000000..4d19ed6 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/StartWatch.java @@ -0,0 +1,48 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +//************************************************************************ +// +// Summize +// +// This work protected by US Copyright Law and contains proprietary and +// confidential trade secrets. +// +// (c) Copyright 2007 Summize, ALL RIGHTS RESERVED. +// +//************************************************************************ + +package org.apache.aurora.common.util; + +import org.apache.commons.lang.time.StopWatch; + +public class StartWatch extends StopWatch { + public StartWatch() { + super(); + } + + public void start() { + _started = true; + super.start(); + } + + public void resume() { + if (!_started) { + start(); + } else { + super.resume(); + } + } + + private boolean _started = false; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/Stat.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/Stat.java b/commons/src/main/java/org/apache/aurora/common/util/Stat.java new file mode 100644 index 0000000..2fec9d9 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/Stat.java @@ -0,0 +1,351 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +//*************************************************************** +// + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.text.NumberFormat; + +/** + * This class is designed to provide basic statistics collection. + * For each instance of this object statistics and be added to it + * then the sum, mean, std dev, min and max can be gathered at the + * end. To reuse this object, a clear method can be called to reset + * the statistics. + */ +public class Stat implements Serializable { + + /** + * Add a number to the statistics collector. + * doubles are used for all collections. + * + * @param x number added to the statistics. + */ + public void addNumber(int x) { + addNumber((double) x); + } + + /** + * Add a number to the statistics collector. + * doubles are used for all collections. + * + * @param x number added to the statistics. + */ + public void addNumber(float x) { + addNumber((double) x); + } + + /** + * Add a number to the statistics collector. + * doubles are used for all collections. + * + * @param x number added to the statistics. + */ + public synchronized void addNumber(double x) { + if (_max < x) { + _max = x; + } + if (_min > x) { + _min = x; + } + + _sum += x; + _sumOfSq += (x * x); + _number++; + + return; + } + + + /** + * Clear the statistics counters... + */ + public void clear() { + _max = 0; + _min = Double.MAX_VALUE; + _number = 0; + _mean = 0; + _stdDev = 0; + _sum = 0; + _sumOfSq = 0; + } + + + /** + * Create a string representation of the + * statistics collected so far. NOTE this + * is formatted and may not suit all needs + * and thus the user should just call the + * needed methods to get mean, std dev, etc. + * and format the data as needed. + * + * @return String Java string formatted output of results. + */ + public String toString() { + return toString(false); + } + + + /** + * Create a string representation of the + * statistics collected so far. The results + * are formatted in percentage format if + * passed in true, otherwise the results + * are the same as the toString call. NOTE this + * is formatted and may not suit all needs + * and thus the user should just call the + * needed methods to get mean, std dev, etc. + * and format the data as needed. + * + * @param percent Format as percentages if set to true. + * @return String Java string formatted output of results. + */ + public String toString(boolean percent) { + calculate(); + NumberFormat nf = NumberFormat.getInstance(); + nf.setMaximumFractionDigits(4); + + if (_number > 1) { + StringBuffer results = new StringBuffer(); + if (percent) { + results.append("Number:" + nf.format(_number * 100) + "%"); + } else { + results.append("Number:" + nf.format(_number)); + } + + if (percent) { + results.append(" Max:" + nf.format(_max * 100) + "%"); + } else { + results.append(" Max:" + nf.format(_max)); + } + + if (percent) { + results.append(" Min:" + nf.format(_min * 100) + "%"); + } else { + results.append(" Min:" + nf.format(_min)); + } + + if (percent) { + results.append(" Mean:" + nf.format(_mean * 100) + "%"); + } else { + results.append(" Mean:" + nf.format(_mean)); + } + + results.append(" Sum:" + nf.format(_sum)); + results.append(" STD:" + nf.format(_stdDev)); + return results.toString(); + } else if (_number == 1) { + if (percent) { + return ("Number:" + nf.format(_sum * 100) + "%"); + } else { + return ("Number:" + nf.format(_sum)); + } + } else { + return ("Number: N/A"); + } + } + + + private void calculate() { + getMean(); + getStandardDev(); + } + + + /** + * Get the max data element added to the statistics + * object so far. + * + * @return double - Maximum entry added so far. + */ + public double getMax() { + return _max; + } + + + /** + * Get the min data element added to the statistics + * object so far. + * + * @return double - Min entry added so far. + */ + public double getMin() { + return _min; + } + + + /** + * Get the number of data elements added to the statistics + * object so far. + * + * @return double - Number of entries added so far. + */ + public long getNumberOfElements() { + return _number; + } + + + /** + * Get the average or mean of data elements added to the + * statistics object so far. + * + * @return double - Mean of entries added so far. + */ + public double getMean() { + if (_number > 0) { + _mean = _sum / _number; + } + return _mean; + } + + /** + * Get the ratio of the sum of elements divided by the number + * of elements added * 100 + * + * @return double - Percent of entries added so far. + */ + public double getPercent() { + if (_number > 0) { + _mean = _sum / _number; + } + _mean = _mean * 100; + return _mean; + } + + + /** + * Get the sum or mean of data elements added to the + * statistics object so far. + * + * @return double - Sum of entries added so far. + */ + public double getSum() { + return _sum; + } + + + /** + * Get the sum of the squares of the data elements added + * to the statistics object so far. + * + * @return double - Sum of the squares of the entries added so far. + */ + public double getSumOfSq() { + return _sumOfSq; + } + + + /** + * Get the standard deviation of the data elements added + * to the statistics object so far. + * + * @return double - Sum of the standard deviation of the entries added so far. + */ + public double getStandardDev() { + if (_number > 1) { + _stdDev = Math.sqrt((_sumOfSq - ((_sum * _sum) / _number)) / (_number - 1)); + } + return _stdDev; + } + + + /** + * Read the data from the InputStream so it can be used to populate + * the current objects state. + * + * @param in java.io.InputStream to write to. + * @throws IOException + */ + public void readFromDataInput(InputStream in) throws IOException { + DataInput di = new DataInputStream(in); + readFromDataInput(di); + return; + } + + + /** + * Read the data from the DataInput so it can be used to populate + * the current objects state. + * + * @param in java.io.InputStream to write to. + * @throws IOException + */ + public void readFromDataInput(DataInput in) throws IOException { + _max = in.readDouble(); + _min = in.readDouble(); + _number = in.readLong(); + _mean = in.readDouble(); + _stdDev = in.readDouble(); + _sum = in.readDouble(); + _sumOfSq = in.readDouble(); + return; + } + + + /** + * Write the data to the output steam so it can be streamed to an + * other process, wire or storage median in a format that another Stats + * object can read. + * + * @param out java.io.OutputStream to write to. + * @throws IOException + */ + public void writeToDataOutput(OutputStream out) throws IOException { + DataOutput dout = new DataOutputStream(out); + writeToDataOutput(dout); + return; + + } + + + /** + * Write the data to the data output object so it can be written to an + * other process, wire or storage median in a format that another Stats + * object can read. + * + * @param out java.io.DataOutput to write to. + * @throws IOException + */ + public void writeToDataOutput(DataOutput out) throws IOException { + out.writeDouble(_max); + out.writeDouble(_min); + out.writeLong(_number); + out.writeDouble(_mean); + out.writeDouble(_stdDev); + out.writeDouble(_sum); + out.writeDouble(_sumOfSq); + return; + } + + + // ************************************ + private static final long serialVersionUID = 1L; + private double _max = 0 ; + private double _min = Double.MAX_VALUE ; + private long _number = 0 ; + private double _mean = 0 ; + private double _stdDev = 0 ; + private double _sum = 0 ; + private double _sumOfSq ; +} + http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java b/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java new file mode 100644 index 0000000..89046da --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java @@ -0,0 +1,583 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +import org.apache.commons.lang.builder.HashCodeBuilder; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.base.Closures; +import org.apache.aurora.common.base.ExceptionalSupplier; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank; + +/** + * Represents a state machine that is not necessarily a Finite State Machine. + * The caller may configure the state machine to permit only known state transitions, or to only + * disallow known state transitions (and permit unknown transitions). + * + * @param <T> THe type of objects that the caller uses to represent states. + * + * TODO(William Farner): Consider merging the stats-tracking ala PipelineStats into this. + */ +public class StateMachine<T> { + private static final Logger LOG = Logger.getLogger(StateMachine.class.getName()); + + private final String name; + + // Stores mapping from states to the states that the machine is allowed to transition into. + private final Multimap<T, T> stateTransitions; + + private final Closure<Transition<T>> transitionCallback; + private final boolean throwOnBadTransition; + + private volatile T currentState; + private final Lock readLock; + private final Lock writeLock; + + + private StateMachine(String name, + T initialState, + Multimap<T, T> stateTransitions, + Closure<Transition<T>> transitionCallback, + boolean throwOnBadTransition) { + this.name = name; + this.currentState = initialState; + this.stateTransitions = stateTransitions; + this.transitionCallback = transitionCallback; + this.throwOnBadTransition = throwOnBadTransition; + + ReadWriteLock stateLock = new ReentrantReadWriteLock(true /* fair */); + readLock = stateLock.readLock(); + writeLock = stateLock.writeLock(); + } + + /** + * Gets the name of this state machine. + * + * @return The state machine name. + */ + public String getName() { + return name; + } + + /** + * Fetches the state that the machine is currently in. + * + * @return Current state. + */ + public T getState() { + return currentState; + } + + /** + * Checks that the current state is the {@code expectedState} and throws if it is not. + * + * @param expectedState The expected state + * @throws IllegalStateException if the current state is not the {@code expectedState}. + */ + public void checkState(T expectedState) { + checkState(ImmutableSet.of(expectedState)); + } + + /** + * Checks that the current state is one of the {@code allowedStates} and throws if it is not. + * + * @param allowedStates The allowed states. + * @throws IllegalStateException if the current state is not the {@code expectedState}. + */ + public void checkState(Set<T> allowedStates) { + checkNotNull(allowedStates); + checkArgument(!allowedStates.isEmpty(), "At least one possible state must be provided."); + + readLock.lock(); + try { + if (!allowedStates.contains(currentState)) { + throw new IllegalStateException( + String.format("In state %s, expected to be in %s.", currentState, allowedStates)); + } + } finally { + readLock.unlock(); + } + } + + /** + * Executes the supplied {@code work} if the state machine is in the {@code expectedState}, + * postponing any concurrently requested {@link #transition(Object)} until after the execution of + * the work. + * + * @param expectedState The expected state the work should be performed in. + * @param work The work to perform in the {@code expectedState}. + * @param <O> The type returned by the unit of work. + * @param <E> The type of exception that may be thrown by the unit of work. + * @return The result of the unit of work if the current state is the {@code expectedState}. + * @throws IllegalStateException if the current state is not the {@code expectedState}. + * @throws E if the unit of work throws. + */ + public <O, E extends Exception> O doInState(T expectedState, ExceptionalSupplier<O, E> work) + throws E { + + checkNotNull(expectedState); + checkNotNull(work); + + readLock.lock(); + try { + checkState(expectedState); + return work.get(); + } finally { + readLock.unlock(); + } + } + + /** + * Transitions the machine into state {@code nextState}. + * + * @param nextState The state to move into. + * @throws IllegalStateTransitionException If the state transition is not allowed. + * @return {@code true} if the transition was allowed, {@code false} otherwise. + */ + public boolean transition(T nextState) throws IllegalStateTransitionException { + boolean transitionAllowed = false; + + T currentCopy = currentState; + + writeLock.lock(); + try { + if (stateTransitions.containsEntry(currentState, nextState)) { + currentState = nextState; + transitionAllowed = true; + } else if (throwOnBadTransition) { + throw new IllegalStateTransitionException( + String.format("State transition from %s to %s is not allowed.", currentState, + nextState)); + } + } finally { + writeLock.unlock(); + } + + transitionCallback.execute(new Transition<T>(currentCopy, nextState, transitionAllowed)); + return transitionAllowed; + } + + public static class IllegalStateTransitionException extends IllegalStateException { + public IllegalStateTransitionException(String msg) { + super(msg); + } + } + + /** + * Convenience method to create a builder object. + * + * @param <T> Type of builder to create. + * @param name Name of the state machine to create a builder for. + * @return New builder. + */ + public static <T> Builder<T> builder(String name) { + return new Builder<T>(name); + } + + /** + * A state and its allowed transitions (if any) and (optional) callback. + * + * @param <T> State type. + */ + public static class Rule<T> { + private final T from; + private final Set<T> to; + private final Closure<Transition<T>> callback; + + private Rule(T from) { + this(from, ImmutableSet.<T>of()); + } + + private Rule(T from, Set<T> to) { + this(from, to, Closures.<Transition<T>>noop()); + } + + private Rule(T from, Set<T> to, Closure<Transition<T>> callback) { + this.from = checkNotNull(from); + this.to = checkNotNull(to); + this.callback = checkNotNull(callback); + } + + /** + * Associates a callback to be triggered after any attempt to transition from this state is + * made. + * + * @param callback Callback to signal. + * @return A new rule that is identical to this rule, but with the provided + * callback + */ + public Rule<T> withCallback(Closure<Transition<T>> callback) { + return new Rule<T>(from, to, callback); + } + + /** + * A helper class when building a transition rule, to define the allowed transitions. + * + * @param <T> State type. + */ + public static class AllowedTransition<T> { + private final Rule<T> rule; + + private AllowedTransition(Rule<T> rule) { + this.rule = rule; + } + + /** + * Associates a single allowed transition with this state. + * + * @param state Allowed transition state. + * @return A new rule that identical to the original, but only allowing a transition to the + * provided state. + */ + public Rule<T> to(T state) { + return new Rule<T>(rule.from, ImmutableSet.<T>of(state), rule.callback); + } + + /** + * Associates multiple transitions with this state. + * + * @param state An allowed transition state. + * @param additionalStates Additional states that may be transitioned to. + * @return A new rule that identical to the original, but only allowing a transition to the + * provided states. + */ + public Rule<T> to(T state, T... additionalStates) { + return new Rule<T>(rule.from, ImmutableSet.copyOf(Lists.asList(state, additionalStates))); + } + + /** + * Allows no transitions to be performed from this state. + * + * @return The original rule. + */ + public Rule<T> noTransitions() { + return rule; + } + } + + /** + * Creates a new transition rule. + * + * @param state State to create and associate transitions with. + * @param <T> State type. + * @return A new transition rule builder. + */ + public static <T> AllowedTransition<T> from(T state) { + return new AllowedTransition<T>(new Rule<T>(state)); + } + } + + /** + * Builder to create a state machine. + * + * @param <T> + */ + public static class Builder<T> { + private final String name; + private T initialState; + private final Multimap<T, T> stateTransitions = HashMultimap.create(); + private final List<Closure<Transition<T>>> transitionCallbacks = Lists.newArrayList(); + private boolean throwOnBadTransition = true; + + public Builder(String name) { + this.name = checkNotBlank(name); + } + + /** + * Sets the initial state for the state machine. + * + * @param state Initial state. + * @return A reference to the builder. + */ + public Builder<T> initialState(T state) { + checkNotNull(state); + initialState = state; + return this; + } + + /** + * Adds a state and its allowed transitions. + * + * @param rule The state and transition rule to add. + * @return A reference to the builder. + */ + public Builder<T> addState(Rule<T> rule) { + return addState(rule.callback, rule.from, rule.to); + } + + /** + * Adds a state and its allowed transitions. + * At least one transition state must be added, it is not necessary to explicitly add states + * that have no allowed transitions (terminal states). + * + * @param callback Callback to notify of any transition attempted from the state. + * @param state State to add. + * @param transitionStates Allowed transitions from {@code state}. + * @return A reference to the builder. + */ + public Builder<T> addState(Closure<Transition<T>> callback, T state, + Set<T> transitionStates) { + checkNotNull(callback); + checkNotNull(state); + + Preconditions.checkArgument(Iterables.all(transitionStates, Predicates.notNull())); + + stateTransitions.putAll(state, transitionStates); + + @SuppressWarnings("unchecked") + Predicate<Transition<T>> filter = Transition.from(state); + onTransition(filter, callback); + return this; + } + + /** + * Varargs version of {@link #addState(Closure, Object, java.util.Set)}. + * + * @param callback Callback to notify of any transition attempted from the state. + * @param state State to add. + * @param transitionStates Allowed transitions from {@code state}. + * @return A reference to the builder. + */ + public Builder<T> addState(Closure<Transition<T>> callback, T state, + T... transitionStates) { + Set<T> states = ImmutableSet.copyOf(transitionStates); + Preconditions.checkArgument(Iterables.all(states, Predicates.notNull())); + + return addState(callback, state, states); + } + + /** + * Adds a state and its allowed transitions. + * At least one transition state must be added, it is not necessary to explicitly add states + * that have no allowed transitions (terminal states). + * + * @param state State to add. + * @param transitionStates Allowed transitions from {@code state}. + * @return A reference to the builder. + */ + public Builder<T> addState(T state, T... transitionStates) { + return addState(Closures.<Transition<T>>noop(), state, transitionStates); + } + + private void onTransition(Predicate<Transition<T>> transitionFilter, + Closure<Transition<T>> handler) { + onAnyTransition(Closures.filter(transitionFilter, handler)); + } + + /** + * Adds a callback to be executed for every state transition, including invalid transitions + * that are attempted. + * + * @param handler Callback to notify of transition attempts. + * @return A reference to the builder. + */ + public Builder<T> onAnyTransition(Closure<Transition<T>> handler) { + transitionCallbacks.add(handler); + return this; + } + + /** + * Adds a log message for every state transition that is attempted. + * + * @return A reference to the builder. + */ + public Builder<T> logTransitions() { + return onAnyTransition(new Closure<Transition<T>>() { + @Override public void execute(Transition<T> transition) { + LOG.info(name + " state machine transition " + transition); + } + }); + } + + /** + * Allows the caller to specify whether {@link IllegalStateTransitionException} should be thrown + * when a bad state transition is attempted (the default behavior). + * + * @param throwOnBadTransition Whether an exception should be thrown when a bad state transition + * is attempted. + * @return A reference to the builder. + */ + public Builder<T> throwOnBadTransition(boolean throwOnBadTransition) { + this.throwOnBadTransition = throwOnBadTransition; + return this; + } + + /** + * Builds the state machine. + * + * @return A reference to the prepared state machine. + */ + public StateMachine<T> build() { + Preconditions.checkState(initialState != null, "Initial state must be specified."); + checkArgument(!stateTransitions.isEmpty(), "No state transitions were specified."); + return new StateMachine<T>(name, + initialState, + stateTransitions, + Closures.combine(transitionCallbacks), + throwOnBadTransition); + } + } + + /** + * Representation of a state transition. + * + * @param <T> State type. + */ + public static class Transition<T> { + private final T from; + private final T to; + private final boolean allowed; + + public Transition(T from, T to, boolean allowed) { + this.from = checkNotNull(from); + this.to = checkNotNull(to); + this.allowed = allowed; + } + + private static <T> Function<Transition<T>, T> from() { + return new Function<Transition<T>, T>() { + @Override public T apply(Transition<T> transition) { + return transition.from; + } + }; + } + + private static <T> Function<Transition<T>, T> to() { + return new Function<Transition<T>, T>() { + @Override public T apply(Transition<T> transition) { + return transition.to; + } + }; + } + + private static <T> Predicate<Transition<T>> oneSideFilter( + Function<Transition<T>, T> extractor, final T... states) { + checkArgument(Iterables.all(Arrays.asList(states), Predicates.notNull())); + + return Predicates.compose(Predicates.in(ImmutableSet.copyOf(states)), extractor); + } + + /** + * Creates a predicate that returns {@code true} for transitions from the given states. + * + * @param states States to filter on. + * @param <T> State type. + * @return A from-state filter. + */ + public static <T> Predicate<Transition<T>> from(final T... states) { + return oneSideFilter(Transition.<T>from(), states); + } + + /** + * Creates a predicate that returns {@code true} for transitions to the given states. + * + * @param states States to filter on. + * @param <T> State type. + * @return A to-state filter. + */ + public static <T> Predicate<Transition<T>> to(final T... states) { + return oneSideFilter(Transition.<T>to(), states); + } + + /** + * Creates a predicate that returns {@code true} for a specific state transition. + * + * @param from From state. + * @param to To state. + * @param <T> State type. + * @return A state transition filter. + */ + public static <T> Predicate<Transition<T>> transition(final T from, final T to) { + @SuppressWarnings("unchecked") + Predicate<Transition<T>> fromFilter = from(from); + @SuppressWarnings("unchecked") + Predicate<Transition<T>> toFilter = to(to); + return Predicates.and(fromFilter, toFilter); + } + + public T getFrom() { + return from; + } + + public T getTo() { + return to; + } + + public boolean isAllowed() { + return allowed; + } + + /** + * Checks whether this transition represents a state change, which means that the 'to' state is + * not equal to the 'from' state, and the transition is allowed. + * + * @return {@code true} if the state was changed, {@code false} otherwise. + */ + public boolean isValidStateChange() { + return isAllowed() && !from.equals(to); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Transition)) { + return false; + } + + if (o == this) { + return true; + } + + Transition<?> other = (Transition) o; + return from.equals(other.from) && to.equals(other.to); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(from) + .append(to) + .toHashCode(); + } + + @Override + public String toString() { + String str = from.toString() + " -> " + to.toString(); + if (!isAllowed()) { + str += " (not allowed)"; + } + return str; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/Timer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/Timer.java b/commons/src/main/java/org/apache/aurora/common/util/Timer.java new file mode 100644 index 0000000..15602ac --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/Timer.java @@ -0,0 +1,71 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import org.apache.aurora.common.base.Commands; +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.base.ExceptionalSupplier; +import org.apache.aurora.common.stats.SlidingStats; + +/** + * A utility for timing blocks of code. + * + * <p>TODO(John Sirois): consider instead: + * <T, E extends Exception> Pair<T, Long> doTimed(ExceptionalSupplier<T, E> timedWork) throws E + * or a subinterface of Command/Closure/Supplier/Function that exposes a timing method as other ways + * to factor in timing. + * + * @author John Sirois + */ +public final class Timer { + + /** + * Times the block of code encapsulated by {@code timedWork} recoding the result in {@code stat}. + * + * @param stat the stat to record the timing with + * @param timedWork the code to time + * @param <E> the type of exception {@code timedWork} may throw + * @throws E if {@code timedWork} throws + */ + public static <E extends Exception> void doTimed(SlidingStats stat, + final ExceptionalCommand<E> timedWork) throws E { + doTimed(stat, Commands.asSupplier(timedWork)); + } + + /** + * Times the block of code encapsulated by {@code timedWork} recoding the result in {@code stat}. + * + * @param stat the stat to record the timing with + * @param timedWork the code to time + * @param <T> the type of result {@code timedWork} returns + * @param <E> the type of exception {@code timedWork} may throw + * @return the result of {@code timedWork} if it completes normally + * @throws E if {@code timedWork} throws + */ + public static <T, E extends Exception> T doTimed(SlidingStats stat, + ExceptionalSupplier<T, E> timedWork) throws E { + StartWatch timer = new StartWatch(); + timer.start(); + try { + return timedWork.get(); + } finally { + timer.stop(); + stat.accumulate(timer.getTime()); + } + } + + private Timer() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/TruncatedBinaryBackoff.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/TruncatedBinaryBackoff.java b/commons/src/main/java/org/apache/aurora/common/util/TruncatedBinaryBackoff.java new file mode 100644 index 0000000..fd74b9f --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/TruncatedBinaryBackoff.java @@ -0,0 +1,74 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import com.google.common.base.Preconditions; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * A BackoffStrategy that implements truncated binary exponential backoff. + */ +public class TruncatedBinaryBackoff implements BackoffStrategy { + private final long initialBackoffMs; + private final long maxBackoffIntervalMs; + private final boolean stopAtMax; + + /** + * Creates a new TruncatedBinaryBackoff that will start by backing off for {@code initialBackoff} + * and then backoff of twice as long each time its called until reaching the {@code maxBackoff} at + * which point shouldContinue() will return false and any future backoffs will always wait for + * that amount of time. + * + * @param initialBackoff the intial amount of time to backoff + * @param maxBackoff the maximum amount of time to backoff + * @param stopAtMax whether shouldContinue() returns false when the max is reached + */ + public TruncatedBinaryBackoff(Amount<Long, Time> initialBackoff, + Amount<Long, Time> maxBackoff, boolean stopAtMax) { + Preconditions.checkNotNull(initialBackoff); + Preconditions.checkNotNull(maxBackoff); + Preconditions.checkArgument(initialBackoff.getValue() > 0); + Preconditions.checkArgument(maxBackoff.compareTo(initialBackoff) >= 0); + initialBackoffMs = initialBackoff.as(Time.MILLISECONDS); + maxBackoffIntervalMs = maxBackoff.as(Time.MILLISECONDS); + this.stopAtMax = stopAtMax; + } + + /** + * Same as main constructor, but this will always return true from shouldContinue(). + * + * @param initialBackoff the intial amount of time to backoff + * @param maxBackoff the maximum amount of time to backoff + */ + public TruncatedBinaryBackoff(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff) { + this(initialBackoff, maxBackoff, false); + } + + @Override + public long calculateBackoffMs(long lastBackoffMs) { + Preconditions.checkArgument(lastBackoffMs >= 0); + long backoff = (lastBackoffMs == 0) ? initialBackoffMs + : Math.min(maxBackoffIntervalMs, lastBackoffMs * 2); + return backoff; + } + + @Override + public boolean shouldContinue(long lastBackoffMs) { + Preconditions.checkArgument(lastBackoffMs >= 0); + boolean stop = stopAtMax && (lastBackoffMs >= maxBackoffIntervalMs); + + return !stop; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java b/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java new file mode 100644 index 0000000..72605ed --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.caching; + +/** + * Definition of basic caching functionality. Cache keys and values are expected to always be + * valid, non-null values. + * + * @author William Farner + */ +public interface Cache<K, V> { + + /** + * Fetches a value from the cache. + * + * @param key The key for the value to fetch, must not be {@code null}. + * @return The cached value corresponding with {@code key}, or {@code null} if no entry exists. + */ + public V get(K key); + + /** + * Stores a key-value pair in the cache. + * + * @param key The key to store, must not be {@code null}. + * @param value The value to store, must not be {@code null}. + */ + public void put(K key, V value); + + /** + * Deletes an entry from the cache. + * + * @param key Key for the value to delete, must not be {@code null}. + */ + public void delete(K key); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java b/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java new file mode 100644 index 0000000..96a5377 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java @@ -0,0 +1,262 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.caching; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * A proxy class that handles caching of return values for method calls to a wrapped object. + * + * Example usage: + * + * Foo uncached = new Foo(); + * CachingMethodProxy<Foo> methodProxy = CachingMethodProxy.proxyFor(uncached, Foo.class); + * Foo foo = methodProxy.getCachingProxy(); + * methodProxy.cache(foo.doBar(), lruCache1) + * .cache(foo.doBaz(), lruCache2) + * .prepare(); + * + * @author William Farner + */ +public class CachingMethodProxy<T> { + + // Dummy return values to return when in recording state. + private static final Map<Class<?>, Object> EMPTY_RETURN_VALUES = + ImmutableMap.<Class<?>, Object>builder() + .put(Boolean.TYPE, Boolean.FALSE) + .put(Byte.TYPE, Byte.valueOf((byte) 0)) + .put(Short.TYPE, Short.valueOf((short) 0)) + .put(Character.TYPE, Character.valueOf((char)0)) + .put(Integer.TYPE, Integer.valueOf(0)) + .put(Long.TYPE, Long.valueOf(0)) + .put(Float.TYPE, Float.valueOf(0)) + .put(Double.TYPE, Double.valueOf(0)) + .build(); + private static final Map<Class<?>, Class<?>> AUTO_BOXING_MAP = + ImmutableMap.<Class<?>, Class<?>>builder() + .put(Boolean.TYPE, Boolean.class) + .put(Byte.TYPE, Byte.class) + .put(Short.TYPE, Short.class) + .put(Character.TYPE, Character.class) + .put(Integer.TYPE, Integer.class) + .put(Long.TYPE, Long.class) + .put(Float.TYPE, Float.class) + .put(Double.TYPE, Double.class) + .build(); + + // The uncached resource, whose method calls are deemed to be expensive and cacheable. + private final T uncached; + + // The methods that are cached, and the caches themselves. + private final Map<Method, MethodCache> methodCaches = Maps.newHashMap(); + private final Class<T> type; + + private Method lastMethodCall = null; + private boolean recordMode = true; + + /** + * Creates a new caching method proxy that will wrap an object and cache for the provided methods. + * + * @param uncached The uncached object that will be reverted to when a cache entry is not present. + */ + private CachingMethodProxy(T uncached, Class<T> type) { + this.uncached = Preconditions.checkNotNull(uncached); + this.type = Preconditions.checkNotNull(type); + Preconditions.checkArgument(type.isInterface(), "The proxied type must be an interface."); + } + + private static Object invokeMethod(Object subject, Method method, Object[] args) + throws Throwable { + try { + return method.invoke(subject, args); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot access " + subject.getClass() + "." + method, e); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } + + /** + * A cached method and its caching control structures. + * + * @param <K> Cache key type. + * @param <V> Cache value type, expected to match the return type of the method. + */ + private static class MethodCache<K, V> { + private final Method method; + private final Cache<K, V> cache; + private final Function<Object[], K> keyBuilder; + private final Predicate<V> entryFilter; + + MethodCache(Method method, Cache<K, V> cache, Function<Object[], K> keyBuilder, + Predicate<V> entryFilter) { + this.method = method; + this.cache = cache; + this.keyBuilder = keyBuilder; + this.entryFilter = entryFilter; + } + + V doInvoke(Object uncached, Object[] args) throws Throwable { + K key = keyBuilder.apply(args); + + V cachedValue = cache.get(key); + + if (cachedValue != null) return cachedValue; + + Object fetched = invokeMethod(uncached, method, args); + + if (fetched == null) return null; + + @SuppressWarnings("unchecked") + V typedValue = (V) fetched; + + if (entryFilter.apply(typedValue)) cache.put(key, typedValue); + + return typedValue; + } + } + + /** + * Creates a new builder for the given type. + * + * @param uncached The uncached object that should be insulated by caching. + * @param type The interface that a proxy should be created for. + * @param <T> Type parameter to the proxied class. + * @return A new builder. + */ + public static <T> CachingMethodProxy<T> proxyFor(T uncached, Class<T> type) { + return new CachingMethodProxy<T>(uncached, type); + } + + @SuppressWarnings("unchecked") + public T getCachingProxy() { + return (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[] { type }, + new InvocationHandler() { + @Override public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return doInvoke(method, args); + } + }); + } + + private Object doInvoke(Method method, Object[] args) throws Throwable { + return recordMode ? recordCall(method) : cacheRequest(method, args); + } + + private Object recordCall(Method method) { + Preconditions.checkArgument(method.getReturnType() != Void.TYPE, + "Void return methods cannot be cached: " + method); + Preconditions.checkArgument(method.getParameterTypes().length > 0, + "Methods with zero arguments cannot be cached: " + method); + Preconditions.checkState(lastMethodCall == null, + "No cache instructions provided for call to: " + lastMethodCall); + + lastMethodCall = method; + + Class<?> returnType = method.getReturnType(); + return returnType.isPrimitive() ? EMPTY_RETURN_VALUES.get(returnType) : null; + } + + private Object cacheRequest(Method method, Object[] args) throws Throwable { + MethodCache cache = methodCaches.get(method); + + // Check if we are caching for this method. + if (cache == null) return invokeMethod(uncached, method, args); + + return cache.doInvoke(uncached, args); + } + + /** + * Instructs the proxy that cache setup is complete, and the proxy instance should begin caching + * and delegating uncached calls. After this is called, any subsequent calls to any of the + * cache setup methods will result in an {@link IllegalStateException}. + */ + public void prepare() { + Preconditions.checkState(!methodCaches.isEmpty(), "At least one method must be cached."); + Preconditions.checkState(recordMode, "prepare() may only be invoked once."); + + recordMode = false; + } + + public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache) { + return cache(value, cache, Predicates.<V>alwaysTrue()); + } + + public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache, + Predicate<V> valueFilter) { + return cache(value, cache, DEFAULT_KEY_BUILDER, valueFilter); + } + + public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache, + Function<Object[], K> keyBuilder) { + // Get the last method call and declare it the cached method. + return cache(value, cache, keyBuilder, Predicates.<V>alwaysTrue()); + } + + public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache, + Function<Object[], K> keyBuilder, Predicate<V> valueFilter) { + Preconditions.checkNotNull(cache); + Preconditions.checkNotNull(keyBuilder); + Preconditions.checkNotNull(valueFilter); + + Preconditions.checkState(recordMode, "Cache setup is not allowed after prepare() is called."); + + // Get the last method call and declare it the cached method. + Preconditions.checkState(lastMethodCall != null, "No method call captured to be cached."); + + Class<?> returnType = lastMethodCall.getReturnType(); + + Preconditions.checkArgument(returnType != Void.TYPE, + "Cannot cache results from void method: " + lastMethodCall); + + if (returnType.isPrimitive()) { + // If a primitive type is returned, we need to make sure that the cache holds the boxed + // type for the primitive. + returnType = AUTO_BOXING_MAP.get(returnType); + } + + // TODO(William Farner): Figure out a simple way to make this possible. Right now, since the proxy + // objects return null, we get a null here and can't check the type. + //Preconditions.checkArgument(value.getClass() == returnType, + // String.format("Cache value type '%s' does not match method return type '%s'", + // value.getClass(), lastMethodCall.getReturnType())); + + methodCaches.put(lastMethodCall, new MethodCache<K, V>(lastMethodCall, cache, keyBuilder, + valueFilter)); + + lastMethodCall = null; + + return this; + } + + private static final Function<Object[], List> DEFAULT_KEY_BUILDER = + new Function<Object[], List>() { + @Override public List apply(Object[] args) { + return Arrays.asList(args); + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java b/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java new file mode 100644 index 0000000..65639e3 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java @@ -0,0 +1,170 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.caching; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.collections.Pair; +import org.apache.aurora.common.stats.Stats; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A cache with a fixed maximum size, evicting items that were used least-recently. + * WARNING: This is not thread-safe. If you wish to get a thread-safe version of a constructed + * LRUCache, you must wrap it with {@link Collections#synchronizedMap(java.util.Map)}. + * + * @author William Farner + */ +public class LRUCache<K, V> implements Cache<K, V> { + + private Map<K, V> map; + + private final AtomicLong accesses; + private final AtomicLong misses; + + /** + * Creates a new bounded cache with the given load factor. + * + * @param name Unique name for this cache. + * @param maxCapacity Maximum capacity for the cache, after which items will be evicted. + * @param loadFactor Load factor for the cache. + * @param makeSynchronized Whether the underlying map should be synchronized. + * @param evictionListener Listener to be notified when an element is evicted, or {@code null} if + * eviction notifications are not needed. + */ + private LRUCache(final String name, final int maxCapacity, float loadFactor, + boolean makeSynchronized, final Closure<Pair<K, V>> evictionListener) { + map = new LinkedHashMap<K, V>(maxCapacity, loadFactor, true /* Access order. */) { + @Override public boolean removeEldestEntry(Map.Entry<K, V> entry) { + boolean evict = size() > maxCapacity; + if (evict && evictionListener != null) { + evictionListener.execute(Pair.of(entry.getKey(), entry.getValue())); + } + return evict; + } + }; + + if (makeSynchronized) { + map = Collections.synchronizedMap(map); + } + + accesses = Stats.exportLong(name + "_lru_cache_accesses"); + misses = Stats.exportLong(name + "_lru_cache_misses"); + } + + public static <K, V> Builder<K, V> builder() { + return new Builder<K, V>(); + } + + public static class Builder<K, V> { + private String name = null; + + private int maxSize = 1000; + + // Sadly, LinkedHashMap doesn't expose this, so the default is pulled from the javadoc. + private float loadFactor = 0.75F; + + private boolean makeSynchronized = true; + + private Closure<Pair<K, V>> evictionListener = null; + + public Builder<K, V> name(String name) { + this.name = MorePreconditions.checkNotBlank(name); + return this; + } + + public Builder<K, V> maxSize(int maxSize) { + Preconditions.checkArgument(maxSize > 0); + this.maxSize = maxSize; + return this; + } + + public Builder<K, V> loadFactor(float loadFactor) { + this.loadFactor = loadFactor; + return this; + } + + public Builder<K, V> makeSynchronized(boolean makeSynchronized) { + this.makeSynchronized = makeSynchronized; + return this; + } + + public Builder<K, V> evictionListener(Closure<Pair<K, V>> evictionListener) { + this.evictionListener = evictionListener; + return this; + } + + public LRUCache<K, V> build() { + return new LRUCache<K, V>(name, maxSize, loadFactor, makeSynchronized, evictionListener); + } + } + + @Override + public V get(K key) { + accesses.incrementAndGet(); + V value = map.get(key); + if (value == null) { + misses.incrementAndGet(); + } + return value; + } + + @Override + public void put(K key, V value) { + map.put(key, value); + } + + @Override + public void delete(K key) { + map.remove(key); + } + + public int size() { + return map.size(); + } + + @Override + public String toString() { + return String.format("size: %d, accesses: %s, misses: %s", + map.size(), + accesses, + misses); + } + + public Collection<V> copyValues() { + synchronized(map) { + return ImmutableList.copyOf(map.values()); + } + } + + public long getAccesses() { + return accesses.longValue(); + } + + public long getMisses() { + return misses.longValue(); + } + + public double getHitRate() { + double numAccesses = accesses.longValue(); + return numAccesses == 0 ? 0 : (numAccesses - misses.longValue()) / numAccesses; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java new file mode 100644 index 0000000..2f963cf --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java @@ -0,0 +1,56 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import com.google.common.base.Preconditions; +import org.apache.aurora.common.util.BackoffStrategy; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A {@link RetryingFutureTask} that will resubmit itself to a work queue with a backoff. + * + * @author William Farner + */ +public class BackingOffFutureTask extends RetryingFutureTask { + private final ScheduledExecutorService executor; + private final BackoffStrategy backoffStrategy; + private long backoffMs = 0; + + /** + * Creates a new retrying future task that will execute a unit of work until successfully + * completed, or the retry limit has been reached. + * + * @param executor The executor service to resubmit the task to upon failure. + * @param callable The unit of work. The work is considered successful when {@code true} is + * returned. It may return {@code false} or throw an exception when + * unsueccessful. + * @param maxRetries The maximum number of times to retry the task. + * @param backoffStrategy Strategy to use for determining backoff duration. + */ + public BackingOffFutureTask(ScheduledExecutorService executor, Callable<Boolean> callable, + int maxRetries, BackoffStrategy backoffStrategy) { + super(executor, callable, maxRetries); + this.executor = executor; + this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy); + } + + @Override + protected void retry() { + backoffMs = backoffStrategy.calculateBackoffMs(backoffMs); + executor.schedule(this, backoffMs, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingExecutorService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingExecutorService.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingExecutorService.java new file mode 100644 index 0000000..df11d6d --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingExecutorService.java @@ -0,0 +1,91 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +/** + * An executor service that delegates to another executor service, invoking an uncaught + * exception handler if any exceptions are thrown in submitted work. + * + * @see MoreExecutors + */ +class ExceptionHandlingExecutorService extends ForwardingExecutorService<ExecutorService> { + private final Supplier<Thread.UncaughtExceptionHandler> handler; + + ExceptionHandlingExecutorService( + ExecutorService delegate, + Supplier<Thread.UncaughtExceptionHandler> handler) { + + super(delegate); + this.handler = Preconditions.checkNotNull(handler); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return super.submit(TaskConverter.alertingCallable(task, handler)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return super.submit(TaskConverter.alertingRunnable(task, handler), result); + } + + @Override + public Future<?> submit(Runnable task) { + return super.submit(TaskConverter.alertingRunnable(task, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks) throws InterruptedException { + + return super.invokeAll(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, + long timeout, + TimeUnit unit) throws InterruptedException { + + return super.invokeAll(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } + + @Override + public <T> T invokeAny( + Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + + return super.invokeAny(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> T invokeAny( + Collection<? extends Callable<T>> tasks, + long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + + return super.invokeAny(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java new file mode 100644 index 0000000..fa0bd7d --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExceptionHandlingScheduledExecutorService.java @@ -0,0 +1,118 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util.concurrent; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Supplier; + +/** + * A scheduled executor service that delegates to another executor service, invoking an uncaught + * exception handler if any exceptions are thrown in submitted work. + * + * @see MoreExecutors + */ +class ExceptionHandlingScheduledExecutorService + extends ForwardingExecutorService<ScheduledExecutorService> + implements ScheduledExecutorService { + private final Supplier<Thread.UncaughtExceptionHandler> handler; + + /** + * Construct a {@link ScheduledExecutorService} with a supplier of + * {@link Thread.UncaughtExceptionHandler} that handles exceptions thrown from submitted work. + */ + ExceptionHandlingScheduledExecutorService( + ScheduledExecutorService delegate, + Supplier<Thread.UncaughtExceptionHandler> handler) { + super(delegate); + this.handler = handler; + } + + @Override + public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) { + return delegate.schedule(TaskConverter.alertingRunnable(runnable, handler), delay, timeUnit); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit timeUnit) { + return delegate.schedule(TaskConverter.alertingCallable(callable, handler), delay, timeUnit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate( + Runnable runnable, long initialDelay, long period, TimeUnit timeUnit) { + return delegate.scheduleAtFixedRate( + TaskConverter.alertingRunnable(runnable, handler), initialDelay, period, timeUnit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay( + Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) { + return delegate.scheduleWithFixedDelay( + TaskConverter.alertingRunnable(runnable, handler), initialDelay, delay, timeUnit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return delegate.submit(TaskConverter.alertingCallable(task, handler)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return delegate.submit(TaskConverter.alertingRunnable(task, handler), result); + } + + @Override + public Future<?> submit(Runnable task) { + return delegate.submit(TaskConverter.alertingRunnable(task, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return delegate.invokeAll(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(TaskConverter.alertingCallables(tasks, handler)); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(TaskConverter.alertingCallables(tasks, handler), timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(TaskConverter.alertingRunnable(command, handler)); + } +}