http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Stats.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Stats.java b/commons/src/main/java/org/apache/aurora/common/stats/Stats.java new file mode 100644 index 0000000..2191f77 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/Stats.java @@ -0,0 +1,408 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Supplier; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MapMaker; +import com.google.common.util.concurrent.AtomicDouble; + +import org.apache.aurora.common.base.MorePreconditions; + +/** + * Manages {@link Stat}s that should be exported for monitoring. + * + * Statistic names may only contain {@code [A-Za-z0-9_]}, + * all other chars will be logged as a warning and replaced with underscore on export. + * + * @author John Sirois + */ +public class Stats { + + private static final Logger LOG = Logger.getLogger(Stats.class.getName()); + private static final Pattern NOT_NAME_CHAR = Pattern.compile("[^A-Za-z0-9_]"); + + private static final ConcurrentMap<String, Stat<?>> VAR_MAP = new MapMaker().makeMap(); + + // Store stats in the order they were registered, so that derived variables are + // sampled after their inputs. + private static final Collection<RecordingStat<? extends Number>> ORDERED_NUMERIC_STATS = + new ConcurrentLinkedQueue<RecordingStat<? extends Number>>(); + + private static final Cache<String, RecordingStat<? extends Number>> NUMERIC_STATS = + CacheBuilder.newBuilder().build(); + + public static String normalizeName(String name) { + return NOT_NAME_CHAR.matcher(name).replaceAll("_"); + } + + static String validateName(String name) { + String normalized = normalizeName(name); + if (!name.equals(normalized)) { + LOG.warning("Invalid stat name " + name + " exported as " + normalized); + } + return normalized; + } + + /** + * A {@link StatsProvider} that exports gauge-style stats to the global {@link Stat}s repository + * for time series tracking. + */ + public static final StatsProvider STATS_PROVIDER = new StatsProvider() { + private final StatsProvider untracked = new StatsProvider() { + @Override public AtomicLong makeCounter(String name) { + final AtomicLong longVar = new AtomicLong(); + Stats.exportStatic(new StatImpl<Long>(name) { + @Override public Long read() { + return longVar.get(); + } + }); + return longVar; + } + + @Override public <T extends Number> Stat<T> makeGauge(String name, final Supplier<T> gauge) { + return Stats.exportStatic(new StatImpl<T>(name) { + @Override public T read() { + return gauge.get(); + } + }); + } + + @Override public StatsProvider untracked() { + return this; + } + + @Override public RequestTimer makeRequestTimer(String name) { + // TODO(William Farner): Add support for this once a caller shows interest in using it. + throw new UnsupportedOperationException(); + } + }; + + @Override public <T extends Number> Stat<T> makeGauge(String name, final Supplier<T> gauge) { + return Stats.export(new StatImpl<T>(name) { + @Override public T read() { + return gauge.get(); + } + }); + } + + @Override public AtomicLong makeCounter(String name) { + return Stats.exportLong(name); + } + + @Override public StatsProvider untracked() { + return untracked; + } + + @Override public RequestTimer makeRequestTimer(String name) { + return new RequestStats(name); + } + }; + + /** + * A {@link StatRegistry} that provides stats registered with the global {@link Stat}s repository. + */ + public static final StatRegistry STAT_REGISTRY = new StatRegistry() { + @Override public Iterable<RecordingStat<? extends Number>> getStats() { + return Stats.getNumericVariables(); + } + }; + + private static class ExportStat implements Callable<RecordingStat<? extends Number>> { + private final AtomicBoolean called = new AtomicBoolean(false); + + private final RecordingStat<? extends Number> stat; + private final String name; + + private <T extends Number> ExportStat(String name, Stat<T> stat) { + this.name = name; + this.stat = (stat instanceof RecordingStat) + ? (RecordingStat<? extends Number>) stat + : new RecordingStatImpl<T>(stat); + } + + @Override + public RecordingStat<? extends Number> call() { + try { + exportStaticInternal(name, stat); + ORDERED_NUMERIC_STATS.add(stat); + return stat; + } finally { + called.set(true); + } + } + } + + /** + * Exports a stat for tracking. + * if the stat provided implements the internal {@link RecordingStat} interface, it will be + * registered for time series collection and returned. If a {@link RecordingStat} with the same + * name as the provided stat has already been exported, the previously-exported stat will be + * returned and no additional registration will be performed. + * + * @param var The variable to export. + * @param <T> The value exported by the variable. + * @return A reference to the stat that was stored. The stat returned may not be equal to the + * stat provided. If a variable was already returned with the same + */ + public static <T extends Number> Stat<T> export(Stat<T> var) { + String validatedName = validateName(MorePreconditions.checkNotBlank(var.getName())); + ExportStat exportStat = new ExportStat(validatedName, var); + try { + @SuppressWarnings("unchecked") + Stat<T> exported = (Stat<T>) NUMERIC_STATS.get(validatedName, exportStat); + return exported; + } catch (ExecutionException e) { + throw new IllegalStateException( + "Unexpected error exporting stat " + validatedName, e.getCause()); + } finally { + if (!exportStat.called.get()) { + LOG.warning("Re-using already registered variable for key " + validatedName); + } + } + } + + /** + * Exports a string stat. + * String-based statistics will not be registered for time series collection. + * + * @param var Stat to export. + * @return A reference back to {@code var}, or the variable that was already registered under the + * same name as {@code var}. + */ + public static Stat<String> exportString(Stat<String> var) { + return exportStatic(var); + } + + /** + * Adds a collection of stats for export. + * + * @param vars The variables to add. + */ + public static void exportAll(Iterable<Stat<? extends Number>> vars) { + for (Stat<? extends Number> var : vars) { + export(var); + } + } + + /** + * Exports an {@link AtomicInteger}, which will be included in time series tracking. + * + * @param name The name to export the stat with. + * @param intVar The variable to export. + * @return A reference to the {@link AtomicInteger} provided. + */ + public static AtomicInteger export(final String name, final AtomicInteger intVar) { + export(new SampledStat<Integer>(name, 0) { + @Override public Integer doSample() { return intVar.get(); } + }); + + return intVar; + } + + /** + * Creates and exports an {@link AtomicInteger}. + * + * @param name The name to export the stat with. + * @return A reference to the {@link AtomicInteger} created. + */ + public static AtomicInteger exportInt(String name) { + return exportInt(name, 0); + } + + /** + * Creates and exports an {@link AtomicInteger} with initial value. + * + * @param name The name to export the stat with. + * @param initialValue The initial stat value. + * @return A reference to the {@link AtomicInteger} created. + */ + public static AtomicInteger exportInt(String name, int initialValue) { + return export(name, new AtomicInteger(initialValue)); + } + + /** + * Exports an {@link AtomicLong}, which will be included in time series tracking. + * + * @param name The name to export the stat with. + * @param longVar The variable to export. + * @return A reference to the {@link AtomicLong} provided. + */ + public static AtomicLong export(String name, final AtomicLong longVar) { + export(new StatImpl<Long>(name) { + @Override public Long read() { return longVar.get(); } + }); + + return longVar; + } + + /** + * Creates and exports an {@link AtomicLong}. + * + * @param name The name to export the stat with. + * @return A reference to the {@link AtomicLong} created. + */ + public static AtomicLong exportLong(String name) { + return exportLong(name, 0L); + } + + /** + * Creates and exports an {@link AtomicLong} with initial value. + * + * @param name The name to export the stat with. + * @param initialValue The initial stat value. + * @return A reference to the {@link AtomicLong} created. + */ + public static AtomicLong exportLong(String name, long initialValue) { + return export(name, new AtomicLong(initialValue)); + } + + /** + * Exports an {@link AtomicDouble}, which will be included in time series tracking. + * + * @param name The name to export the stat with. + * @param doubleVar The variable to export. + * @return A reference to the {@link AtomicDouble} provided. + */ + public static AtomicDouble export(String name, final AtomicDouble doubleVar) { + export(new StatImpl<Double>(name) { + @Override public Double read() { return doubleVar.doubleValue(); } + }); + + return doubleVar; + } + + /** + * Creates and exports an {@link AtomicDouble}. + * + * @param name The name to export the stat with. + * @return A reference to the {@link AtomicDouble} created. + */ + public static AtomicDouble exportDouble(String name) { + return exportDouble(name, 0.0); + } + + /** + * Creates and exports an {@link AtomicDouble} with initial value. + * + * @param name The name to export the stat with. + * @param initialValue The initial stat value. + * @return A reference to the {@link AtomicDouble} created. + */ + public static AtomicDouble exportDouble(String name, double initialValue) { + return export(name, new AtomicDouble(initialValue)); + } + + /** + * Exports a metric that tracks the size of a collection. + * + * @param name Name of the stat to export. + * @param collection Collection whose size should be tracked. + */ + public static void exportSize(String name, final Collection<?> collection) { + export(new StatImpl<Integer>(name) { + @Override public Integer read() { + return collection.size(); + } + }); + } + + /** + * Exports a metric that tracks the size of a map. + * + * @param name Name of the stat to export. + * @param map Map whose size should be tracked. + */ + public static void exportSize(String name, final Map<?, ?> map) { + export(new StatImpl<Integer>(name) { + @Override public Integer read() { + return map.size(); + } + }); + } + + /** + * Exports a metric that tracks the size of a cache. + * + * @param name Name of the stat to export. + * @param cache Cache whose size should be tracked. + */ + public static void exportSize(String name, final Cache<?, ?> cache) { + export(new StatImpl<Long>(name) { + @Override public Long read() { + return cache.size(); + } + }); + } + + /** + * Exports a 'static' statistic, which will not be registered for time series tracking. + * + * @param var Variable to statically export. + * @return A reference back to the provided {@link Stat}. + */ + public static <T> Stat<T> exportStatic(Stat<T> var) { + String validatedName = validateName(MorePreconditions.checkNotBlank(var.getName())); + exportStaticInternal(validatedName, var); + return var; + } + + private static void exportStaticInternal(String name, Stat<?> stat) { + if (VAR_MAP.put(name, stat) != null) { + LOG.warning("Warning - exported variable collision on " + name); + } + } + + /** + * Fetches all registered stat. + * + * @return An iterable of all registered stats. + */ + public static Iterable<Stat<?>> getVariables() { + return ImmutableList.copyOf(VAR_MAP.values()); + } + + static Iterable<RecordingStat<? extends Number>> getNumericVariables() { + return ImmutableList.copyOf(ORDERED_NUMERIC_STATS); + } + + @VisibleForTesting + public static void flush() { + VAR_MAP.clear(); + ORDERED_NUMERIC_STATS.clear(); + NUMERIC_STATS.invalidateAll(); + } + + public static <T> Stat<T> getVariable(String name) { + MorePreconditions.checkNotBlank(name); + @SuppressWarnings("unchecked") + Stat<T> stat = (Stat<T>) VAR_MAP.get(name); + return stat; + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java b/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java new file mode 100644 index 0000000..cb1c56b --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java @@ -0,0 +1,88 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import com.google.common.base.Supplier; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A minimal interface to a Stats repository. + * + * @author John Sirois + */ +public interface StatsProvider { + + /** + * Creates and exports a counter for tracking. + * + * @param name The name to export the stat with. + * @return A reference to the counter that will be tracked for incrementing. + */ + AtomicLong makeCounter(String name); + + /** + * Exports a read-only value for tracking. + * + * @param name The name of the variable to export. + * @param gauge The supplier of the instantaneous values to export. + * @param <T> The type of number exported by the variable. + * @return A reference to the stat that was stored. + */ + <T extends Number> Stat<T> makeGauge(String name, Supplier<T> gauge); + + /** + * Gets a stats provider that does not track stats in an internal time series repository. + * The stored variables will only be available as instantaneous values. + * + * @return A stats provider that creates untracked stats. + */ + StatsProvider untracked(); + + /** + * A stat for tracking service requests. + */ + interface RequestTimer { + + /** + * Accumulates a request and its latency. + * + * @param latencyMicros The elapsed time required to complete the request. + */ + void requestComplete(long latencyMicros); + + /** + * Accumulates the error counter and the request counter. + */ + void incErrors(); + + /** + * Accumulates the reconnect counter. + */ + void incReconnects(); + + /** + * Accumulates the timeout counter. + */ + void incTimeouts(); + } + + /** + * Creates and exports a sets of stats that allows for typical rROC request tracking. + * + * @param name The name to export the stat with. + * @return A reference to the request timer that can be used to track RPCs. + */ + RequestTimer makeRequestTimer(String name); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java new file mode 100644 index 0000000..45f604c --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.util.Calendar; + +/** + * A time series of values. + * + * @author William Farner + */ +public interface TimeSeries { + + /** + * A name describing this time series. + * + * @return The name of this time series data. + */ + public String getName(); + + /** + * A series of numbers representing regular samples of a variable. + * + * @return The time series of sample values. + */ + public Iterable<Number> getSamples(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java new file mode 100644 index 0000000..6928e48 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.util.Set; + +import org.apache.aurora.common.application.ShutdownRegistry; + +/** + * A repository for time series data. + * + * @author William Farner + */ +public interface TimeSeriesRepository { + + /** + * Starts the time series sampler. + * + * @param shutdownRegistry An action registry that the repository can use to register a shutdown + * for the sampler. + */ + public void start(ShutdownRegistry shutdownRegistry); + + /** + * Fetches the names of all available time series. + * + * @return Available time series, which can then be obtained by calling {@link #get(String)}. + */ + public Set<String> getAvailableSeries(); + + /** + * Fetches a time series by name. + * + * @param name The name of the time series to fetch. + * @return The time series registered with the given name, or {@code null} if no such time series + * has been registered. + */ + public TimeSeries get(String name); + + /** + * Gets an ordered iterable of the timestamps that all timeseries were sampled at. + * + * @return All current timestamps. + */ + public Iterable<Number> getTimestamps(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java new file mode 100644 index 0000000..387e379 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java @@ -0,0 +1,197 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Inject; +import com.google.inject.name.Named; + +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.collections.BoundedQueue; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A simple in-memory repository for exported variables. + * + * @author John Sirois + */ +public class TimeSeriesRepositoryImpl implements TimeSeriesRepository { + + private static final Logger LOG = Logger.getLogger(TimeSeriesRepositoryImpl.class.getName()); + + /** + * {@literal @Named} binding key for the sampling period. + */ + public static final String SAMPLE_PERIOD = + "com.twitter.common.stats.TimeSeriesRepositoryImpl.SAMPLE_PERIOD"; + + /** + * {@literal @Named} binding key for the maximum number of retained samples. + */ + public static final String SAMPLE_RETENTION_PERIOD = + "com.twitter.common.stats.TimeSeriesRepositoryImpl.SAMPLE_RETENTION_PERIOD"; + + private final SlidingStats scrapeDuration = new SlidingStats("variable_scrape", "micros"); + + // We store TimeSeriesImpl, which allows us to add samples. + private final LoadingCache<String, TimeSeriesImpl> timeSeries; + private final BoundedQueue<Number> timestamps; + + private final StatRegistry statRegistry; + private final Amount<Long, Time> samplePeriod; + private final int retainedSampleLimit; + + @Inject + public TimeSeriesRepositoryImpl( + StatRegistry statRegistry, + @Named(SAMPLE_PERIOD) Amount<Long, Time> samplePeriod, + @Named(SAMPLE_RETENTION_PERIOD) final Amount<Long, Time> retentionPeriod) { + this.statRegistry = checkNotNull(statRegistry); + this.samplePeriod = checkNotNull(samplePeriod); + Preconditions.checkArgument(samplePeriod.getValue() > 0, "Sample period must be positive."); + checkNotNull(retentionPeriod); + Preconditions.checkArgument(retentionPeriod.getValue() > 0, + "Sample retention period must be positive."); + + retainedSampleLimit = (int) (retentionPeriod.as(Time.SECONDS) / samplePeriod.as(Time.SECONDS)); + Preconditions.checkArgument(retainedSampleLimit > 0, + "Sample retention period must be greater than sample period."); + + timeSeries = CacheBuilder.newBuilder().build( + new CacheLoader<String, TimeSeriesImpl>() { + @Override public TimeSeriesImpl load(final String name) { + TimeSeriesImpl timeSeries = new TimeSeriesImpl(name); + + // Backfill so we have data for pre-accumulated timestamps. + int numTimestamps = timestamps.size(); + if (numTimestamps != 0) { + for (int i = 1; i < numTimestamps; i++) { + timeSeries.addSample(0L); + } + } + + return timeSeries; + } + }); + + timestamps = new BoundedQueue<Number>(retainedSampleLimit); + } + + /** + * Starts the variable sampler, which will fetch variables {@link Stats} on the given period. + * + */ + @Override + public void start(ShutdownRegistry shutdownRegistry) { + checkNotNull(shutdownRegistry); + checkNotNull(samplePeriod); + Preconditions.checkArgument(samplePeriod.getValue() > 0); + + final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1 /* One thread. */, + new ThreadFactoryBuilder().setNameFormat("VariableSampler-%d").setDaemon(true).build()); + + final AtomicBoolean shouldSample = new AtomicBoolean(true); + final Runnable sampler = new Runnable() { + @Override public void run() { + if (shouldSample.get()) { + try { + runSampler(Clock.SYSTEM_CLOCK); + } catch (Exception e) { + LOG.log(Level.SEVERE, "ignoring runSampler failure", e); + } + } + } + }; + + executor.scheduleAtFixedRate(sampler, samplePeriod.getValue(), samplePeriod.getValue(), + samplePeriod.getUnit().getTimeUnit()); + shutdownRegistry.addAction(new Command() { + @Override + public void execute() throws RuntimeException { + shouldSample.set(false); + executor.shutdown(); + LOG.info("Variable sampler shut down"); + } + }); + } + + @VisibleForTesting + synchronized void runSampler(Clock clock) { + timestamps.add(clock.nowMillis()); + + long startNanos = clock.nowNanos(); + for (RecordingStat<? extends Number> var : statRegistry.getStats()) { + timeSeries.getUnchecked(var.getName()).addSample(var.sample()); + } + scrapeDuration.accumulate( + Amount.of(clock.nowNanos() - startNanos, Time.NANOSECONDS).as(Time.MICROSECONDS)); + } + + @Override + public synchronized Set<String> getAvailableSeries() { + return ImmutableSet.copyOf(timeSeries.asMap().keySet()); + } + + @Override + public synchronized TimeSeries get(String name) { + if (!timeSeries.asMap().containsKey(name)) return null; + return timeSeries.getUnchecked(name); + } + + @Override + public synchronized Iterable<Number> getTimestamps() { + return Iterables.unmodifiableIterable(timestamps); + } + + private class TimeSeriesImpl implements TimeSeries { + private final String name; + private final BoundedQueue<Number> samples; + + TimeSeriesImpl(String name) { + this.name = name; + samples = new BoundedQueue<Number>(retainedSampleLimit); + } + + @Override public String getName() { + return name; + } + + void addSample(Number value) { + samples.add(value); + } + + @Override public Iterable<Number> getSamples() { + return Iterables.unmodifiableIterable(samples); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java b/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java new file mode 100644 index 0000000..12ab468 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java @@ -0,0 +1,136 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.lang.reflect.Array; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; + +/** + * Windowed is an abstraction that let you span a class across a sliding window. + * It creates a ring buffer of T and reuse the buffer after clearing it or use a new one (via + * the {@code clearer} function). + * + * <pre> + * tenured instances + * ++++++++++++++++++++++++++++++++++ + * [----A-----][-----B----][-----C----][-----D----] + * ++++++++++ + * current instance + * </pre> + * + * The schema above represents the valid instances over time + * (A,B,C) are the tenured ones + * D is the current instance + */ +public abstract class Windowed<T> { + private Class<T> clazz; + protected final T[] buffers; + private final long sliceDuration; + private final Clock clock; + private long index = -1L; + private Function<T, T> clearer; + + /** + * @param clazz the type of the underlying element T + * @param window the length of the window + * @param slices the number of slices (the window will be divided into {@code slices} slices) + * @param sliceProvider the supplier of element + * @param clearer the function that clear (or re-create) an element + * @param clock the clock used for to select the appropriate histogram + */ + public Windowed(Class<T> clazz, Amount<Long, Time> window, int slices, + Supplier<T> sliceProvider, Function<T, T> clearer, Clock clock) { + Preconditions.checkNotNull(window); + // Ensure that we have at least 1ms per slice + Preconditions.checkArgument(window.as(Time.MILLISECONDS) > (slices + 1)); + Preconditions.checkArgument(window.as(Time.MILLISECONDS) > (slices + 1)); + Preconditions.checkArgument(0 < slices); + Preconditions.checkNotNull(sliceProvider); + Preconditions.checkNotNull(clock); + + this.clazz = clazz; + this.sliceDuration = window.as(Time.MILLISECONDS) / slices; + @SuppressWarnings("unchecked") // safe because we have the clazz proof of type H + T[] bufs = (T[]) Array.newInstance(clazz, slices + 1); + for (int i = 0; i < bufs.length; i++) { + bufs[i] = sliceProvider.get(); + } + this.buffers = bufs; + this.clearer = clearer; + this.clock = clock; + } + + /** + * Return the index of the latest Histogram. + * You have to modulo it with buffer.length before accessing the array with this number. + */ + protected int getCurrentIndex() { + long now = clock.nowMillis(); + return (int) (now / sliceDuration); + } + + /** + * Check for expired elements and return the current one. + */ + protected T getCurrent() { + sync(getCurrentIndex()); + return buffers[(int) (index % buffers.length)]; + } + + /** + * Check for expired elements and return all the tenured (old) ones. + */ + protected T[] getTenured() { + long currentIndex = getCurrentIndex(); + sync(currentIndex); + @SuppressWarnings("unchecked") // safe because we have the clazz proof of type T + T[] tmp = (T[]) Array.newInstance(clazz, buffers.length - 1); + for (int i = 0; i < tmp.length; i++) { + int idx = (int) ((currentIndex + 1 + i) % buffers.length); + tmp[i] = buffers[idx]; + } + return tmp; + } + + /** + * Clear all the elements. + */ + public void clear() { + for (int i = 0; i <= buffers.length; i++) { + buffers[i] = clearer.apply(buffers[i]); + } + } + + /** + * Synchronize elements with a point in time. + * i.e. Check for expired ones and clear them, and update the index variable. + */ + protected void sync(long currentIndex) { + if (index < currentIndex) { + long from = Math.max(index + 1, currentIndex - buffers.length + 1); + for (long i = from; i <= currentIndex; i++) { + int idx = (int) (i % buffers.length); + buffers[idx] = clearer.apply(buffers[idx]); + } + index = currentIndex; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java new file mode 100644 index 0000000..6461a2e --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java @@ -0,0 +1,153 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Supplier; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; + +/** + * WindowedApproxHistogram is an implementation of WindowedHistogram with an + * ApproximateHistogram as the underlying storing histogram. + */ +public class WindowedApproxHistogram extends WindowedHistogram<ApproximateHistogram> { + @VisibleForTesting public static final int DEFAULT_SLICES = 3; + @VisibleForTesting public static final Amount<Long, Time> DEFAULT_WINDOW = + Amount.of(1L, Time.MINUTES); + @VisibleForTesting public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of( + (DEFAULT_SLICES + 1) * ApproximateHistogram.DEFAULT_MAX_MEMORY.as(Data.BYTES), Data.BYTES); + + /** + * Create a {@code WindowedApproxHistogram } with a window duration of {@code window} and + * decomposed in {@code slices} Histograms. Those Histograms will individually take less than + * {@code maxMemory / (slices + 1)}. The clock will be used to find the correct index in the + * ring buffer. + * + * @param window duration of the window + * @param slices number of slices in the window + * @param maxMemory maximum memory used by the whole histogram + */ + public WindowedApproxHistogram(Amount<Long, Time> window, final int slices, + final Amount<Long, Data> maxMemory, Clock clock) { + super(ApproximateHistogram.class, window, slices, + new Supplier<ApproximateHistogram>() { + private Amount<Long, Data> perHistogramMemory = Amount.of( + maxMemory.as(Data.BYTES) / (slices + 1), Data.BYTES); + @Override + public ApproximateHistogram get() { + return new ApproximateHistogram(perHistogramMemory); + } + }, + new Function<ApproximateHistogram[], Histogram>() { + @Override + public Histogram apply(ApproximateHistogram[] histograms) { + return ApproximateHistogram.merge(histograms); + } + }, clock); + } + + /** + * Create a {@code WindowedApproxHistogram } with a window duration of {@code window} and + * decomposed in {@code slices} Histograms. Those Histograms will individually have a + * precision of {@code precision / (slices + 1)}. The ticker will be used to measure elapsed + * time in the WindowedHistogram. + * + * @param window duration of the window + * @param slices number of slices in the window + * @param precision precision of the whole histogram + */ + public WindowedApproxHistogram(Amount<Long, Time> window, final int slices, + final Precision precision, Clock clock) { + super(ApproximateHistogram.class, window, slices, + new Supplier<ApproximateHistogram>() { + private Precision perHistogramPrecision = new Precision( + precision.getEpsilon(), precision.getN() / (slices + 1)); + @Override + public ApproximateHistogram get() { + return new ApproximateHistogram(perHistogramPrecision); + } + }, + new Function<ApproximateHistogram[], Histogram>() { + @Override + public Histogram apply(ApproximateHistogram[] histograms) { + return ApproximateHistogram.merge(histograms); + } + }, clock); + } + + /** + * Equivalent to calling + * {@link #WindowedApproxHistogram(Amount, int, Amount, Clock)} + * with the System clock. + */ + public WindowedApproxHistogram(Amount<Long, Time> window, int slices, + Amount<Long, Data> maxMemory) { + this(window, slices, maxMemory, Clock.SYSTEM_CLOCK); + } + + /** + * Equivalent to calling + * {@link #WindowedApproxHistogram(Amount, int, Amount)} + * with default window and slices. + */ + public WindowedApproxHistogram(Amount<Long, Data> maxMemory) { + this(DEFAULT_WINDOW, DEFAULT_SLICES, maxMemory); + } + + /** + * Equivalent to calling + * {@link #WindowedApproxHistogram(Amount, int, Precision, Clock)} + * with the System clock. + */ + public WindowedApproxHistogram(Amount<Long, Time> window, int slices, Precision precision) { + this(window, slices, precision, Clock.SYSTEM_CLOCK); + } + + /** + * Equivalent to calling + * {@link #WindowedApproxHistogram(Amount, int, Precision)} + * with default window and slices. + */ + public WindowedApproxHistogram(Precision precision) { + this(DEFAULT_WINDOW, DEFAULT_SLICES, precision); + } + + /** + * Equivalent to calling + * {@link #WindowedApproxHistogram(Amount, int, Amount, Clock)} + * with the default maxMemory parameter and System clock. + */ + public WindowedApproxHistogram(Amount<Long, Time> window, int slices) { + this(window, slices, DEFAULT_MAX_MEMORY, Clock.SYSTEM_CLOCK); + } + + /** + * WindowedApproxHistogram constructor with default values. + */ + public WindowedApproxHistogram() { + this(DEFAULT_WINDOW, DEFAULT_SLICES, DEFAULT_MAX_MEMORY, Clock.SYSTEM_CLOCK); + } + + /** + * WindowedApproxHistogram constructor with custom Clock (for testing purposes only). + */ + @VisibleForTesting public WindowedApproxHistogram(Clock clock) { + this(DEFAULT_WINDOW, DEFAULT_SLICES, DEFAULT_MAX_MEMORY, clock); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java new file mode 100644 index 0000000..23e2f4f --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java @@ -0,0 +1,110 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; + +/** + * Histogram windowed over time. + * <p> + * This histogram is composed of a series of ({@code slices} + 1) histograms representing a window + * of {@code range} duration. We only update the latest one, and we query the oldest ones (i.e. all + * histograms except the head). + * </p> + * <pre> + * range + * <-------------> + * [AAA][BBB][CCC][DDD] here slices = 3 + * ---------------------> + * t1 t2 + * + * For t in [t1,t2) we: + * insert elements in DDD + * query quantile over [AAA][BBB][CCC] + * </pre> + * <p> + * When {@code t} is in {@code [t1, t2)} we insert value into the latest histogram (DDD here), + * when we query the histogram, we 'merge' all other histograms (all except the latest) and query + * it. when {@code t > t2} the oldest histogram become the newest (like in a ring buffer) and + * so on ... + * </p> + * <p> + * Note: We use MergedHistogram class to represent a merged histogram without actually + * merging the underlying histograms. + * </p> + */ +public class WindowedHistogram<H extends Histogram> extends Windowed<H> implements Histogram { + + private long mergedHistIndex = -1L; + private Function<H[], Histogram> merger; + private Histogram mergedHistogram = null; + + /** + * Create a WindowedHistogram of {@code slices + 1} elements over a time {@code window}. + * This code is independent from the implementation of Histogram, you just need to provide + * a {@code Supplier<H>} to create the histograms and a {@code Function<H[], Histogram>} to + * merge them. + * + * @param clazz the type of the underlying Histogram H + * @param window the length of the window + * @param slices the number of slices (the window will be divided into {@code slices} slices) + * @param sliceProvider the supplier of histogram + * @param merger the function that merge an array of histogram H[] into a single Histogram + * @param clock the clock used for to select the appropriate histogram + */ + public WindowedHistogram(Class<H> clazz, Amount<Long, Time> window, int slices, + Supplier<H> sliceProvider, Function<H[], Histogram> merger, Clock clock) { + super(clazz, window, slices, sliceProvider, new Function<H, H>() { + @Override + public H apply(H h) { h.clear(); return h; } + }, clock); + Preconditions.checkNotNull(merger); + + this.merger = merger; + } + + @Override + public synchronized void add(long x) { + getCurrent().add(x); + } + + @Override + public synchronized void clear() { + for (Histogram h: buffers) { + h.clear(); + } + } + + @Override + public synchronized long getQuantile(double quantile) { + long currentIndex = getCurrentIndex(); + if (mergedHistIndex < currentIndex) { + H[] tmp = getTenured(); + mergedHistogram = merger.apply(tmp); + mergedHistIndex = currentIndex; + } + return mergedHistogram.getQuantile(quantile); + } + + @Override + public synchronized long[] getQuantiles(double[] quantiles) { + return Histograms.extractQuantiles(this, quantiles); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java b/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java new file mode 100644 index 0000000..ded3faf --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java @@ -0,0 +1,173 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import com.google.common.base.Supplier; +import com.google.common.base.Function; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; + +/** + * Keep track of statistics over a set of value in a sliding window. + * WARNING: The computation of the statistics needs to be explicitly requested with + * {@code refresh()} before reading any statistics. + * + * @see Windowed class for more details about how the window is parametrized. + */ +public class WindowedStatistics extends Windowed<Statistics> implements StatisticsInterface { + private int lastIndex = -1; + private double variance = 0.0; + private double mean = 0.0; + private long sum = 0L; + private long min = Long.MAX_VALUE; + private long max = Long.MIN_VALUE; + private long populationSize = 0L; + + public WindowedStatistics(Amount<Long, Time> window, int slices, Clock clock) { + super(Statistics.class, window, slices, + new Supplier<Statistics>() { + @Override public Statistics get() { return new Statistics(); } + }, + new Function<Statistics, Statistics>() { + @Override public Statistics apply(Statistics s) { s.clear(); return s; } + }, + clock); + } + + /** + * Construct a Statistics sliced over time in {@code slices + 1} windows. + * The {@code window} parameter represents the total window, that will be sliced into + * {@code slices + 1} parts. + * + * Ex: WindowedStatistics(Amount.of(1L, Time.MINUTES), 3) will be sliced like this: + * <pre> + * 20s 20s 20s 20s + * [----A-----][-----B----][-----C----][-----D----] + * </pre> + * The current window is 'D' (the one you insert elements into) and the tenured windows + * are 'A', 'B', 'C' (the ones you read elements from). + */ + public WindowedStatistics(Amount<Long, Time> window, int slices) { + this(window, slices, Clock.SYSTEM_CLOCK); + } + + /** + * Equivalent to calling {@link #WindowedStatistics(Amount, int)} with a 1 minute window + * and 3 slices. + */ + public WindowedStatistics() { + this(Amount.of(1L, Time.MINUTES), 3, Clock.SYSTEM_CLOCK); + } + + public void accumulate(long value) { + getCurrent().accumulate(value); + } + + /** + * Compute all the statistics in one pass. + */ + public void refresh() { + int currentIndex = getCurrentIndex(); + if (lastIndex != currentIndex) { + lastIndex = currentIndex; + double x = 0.0; + variance = 0.0; + mean = 0.0; + sum = 0L; + populationSize = 0L; + min = Long.MAX_VALUE; + max = Long.MIN_VALUE; + for (Statistics s : getTenured()) { + if (s.populationSize() == 0) { + continue; + } + x += s.populationSize() * (s.variance() + s.mean() * s.mean()); + sum += s.sum(); + populationSize += s.populationSize(); + min = Math.min(min, s.min()); + max = Math.max(max, s.max()); + } + if (populationSize != 0) { + mean = ((double) sum) / populationSize; + variance = x / populationSize - mean * mean; + } + } + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the variance of the aggregated windows + */ + public double variance() { + return variance; + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the standard deviation of the aggregated windows + */ + public double standardDeviation() { + return Math.sqrt(variance()); + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the mean of the aggregated windows + */ + public double mean() { + return mean; + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the sum of the aggregated windows + */ + public long sum() { + return sum; + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the min of the aggregated windows + */ + public long min() { + return min; + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the max of the aggregated windows + */ + public long max() { + return max; + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the range of the aggregated windows + */ + public long range() { + return max - min; + } + + /** + * WARNING: You need to call refresh() to recompute the variance + * @return the population size of the aggregated windows + */ + public long populationSize() { + return populationSize; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java new file mode 100644 index 0000000..36b1174 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats.testing; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.aurora.common.stats.Histogram; +import org.apache.aurora.common.stats.Histograms; + +public class RealHistogram implements Histogram { + private final List<Long> buffer = new ArrayList<Long>(); + + @Override public void add(long x) { + buffer.add(x); + } + + @Override public void clear() { + buffer.clear(); + } + + @Override public long getQuantile(double quantile) { + Collections.sort(buffer); + return buffer.get((int) (quantile * buffer.size())); + } + + @Override public long[] getQuantiles(double[] quantiles) { + return Histograms.extractQuantiles(this, quantiles); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java b/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java new file mode 100644 index 0000000..02db075 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java @@ -0,0 +1,51 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.testing; + +import com.google.common.base.Preconditions; +import com.google.common.testing.TearDown; +import com.google.common.testing.TearDownAccepter; + +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.base.ExceptionalCommand; + +/** + * An action registry suitable for use as a shutdownRegistry in tests that extend + * {@link com.google.common.testing.junit4.TearDownTestCase}. + * + * @author John Sirois + */ +public class TearDownRegistry implements ShutdownRegistry { + private final TearDownAccepter tearDownAccepter; + + /** + * Creates a new tear down registry that delegates execution of shutdown actions to a + * {@code tearDownAccepter}. + * + * @param tearDownAccepter A tear down accepter that will be used to register shutdown actions + * with. + */ + public TearDownRegistry(TearDownAccepter tearDownAccepter) { + this.tearDownAccepter = Preconditions.checkNotNull(tearDownAccepter); + } + + @Override + public <E extends Exception, T extends ExceptionalCommand<E>> void addAction(final T action) { + tearDownAccepter.addTearDown(new TearDown() { + @Override public void tearDown() throws Exception { + action.execute(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java new file mode 100644 index 0000000..92ba0c3 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java @@ -0,0 +1,121 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.testing.easymock; + +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.WildcardType; + +import com.google.common.base.Preconditions; +import com.google.common.reflect.TypeToken; +import com.google.common.testing.TearDown; +import com.google.common.testing.junit4.TearDownTestCase; + +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Before; + +import static org.easymock.EasyMock.createControl; + +/** + * A baseclass for tests that use EasyMock. A new {@link IMocksControl control} is set up before + * each test and the mocks created and replayed with it are verified during tear down. + * + * @author John Sirois + */ +public abstract class EasyMockTest extends TearDownTestCase { + protected IMocksControl control; + + /** + * Creates an EasyMock {@link #control} for tests to use that will be automatically + * {@link IMocksControl#verify() verified} on tear down. + */ + @Before + public final void setupEasyMock() { + control = createControl(); + addTearDown(new TearDown() { + @Override public void tearDown() { + control.verify(); + } + }); + } + + /** + * Creates an EasyMock mock with this test's control. Will be + * {@link IMocksControl#verify() verified} in a tear down. + */ + public <T> T createMock(Class<T> type) { + Preconditions.checkNotNull(type); + return control.createMock(type); + } + + /** + * A class meant to be sub-classed in order to capture a generic type literal value. To capture + * the type of a {@code List<String>} you would use: {@code new Clazz<List<String>>() {}} + */ + public abstract static class Clazz<T> extends TypeToken { + Class<T> rawType() { + @SuppressWarnings("unchecked") + Class<T> rawType = (Class<T>) findRawType(); + return rawType; + } + + private Class<?> findRawType() { + if (getType() instanceof Class<?>) { // Plain old + return (Class<?>) getType(); + + } else if (getType() instanceof ParameterizedType) { // Nested type parameter + ParameterizedType parametrizedType = (ParameterizedType) getType(); + Type rawType = parametrizedType.getRawType(); + return (Class<?>) rawType; + } else if (getType() instanceof GenericArrayType) { + throw new IllegalStateException("cannot mock arrays, rejecting type: " + getType()); + } else if (getType() instanceof WildcardType) { + throw new IllegalStateException( + "wildcarded instantiations are not allowed in java, rejecting type: " + getType()); + } else { + throw new IllegalArgumentException("Could not decode raw type for: " + getType()); + } + } + + public T createMock() { + return EasyMock.createMock(rawType()); + } + + public T createMock(IMocksControl control) { + return control.createMock(rawType()); + } + } + + /** + * Creates an EasyMock mock with this test's control. Will be + * {@link IMocksControl#verify() verified} in a tear down. + * + * Allows for mocking of parameterized types without all the unchecked conversion warnings in a + * safe way. + */ + public <T> T createMock(Clazz<T> type) { + Preconditions.checkNotNull(type); + return type.createMock(control); + } + + /** + * A type-inferring convenience method for creating new captures. + */ + public static <T> Capture<T> createCapture() { + return new Capture<T>(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java b/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java new file mode 100644 index 0000000..bcd0a15 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java @@ -0,0 +1,90 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.testing.easymock; + +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multiset; + +import org.easymock.IArgumentMatcher; + +import static org.easymock.EasyMock.reportMatcher; + +/** + * This EasyMock argument matcher tests Iterables for equality irrespective of order. + * + * @param <T> type argument for the Iterables being matched. + */ +public class IterableEquals<T> implements IArgumentMatcher { + private final Multiset<T> elements = HashMultiset.create(); + + /** + * Constructs an IterableEquals object that tests for equality against the specified expected + * Iterable. + * + * @param expected an Iterable containing the elements that are expected, in any order. + */ + public IterableEquals(Iterable<T> expected) { + Iterables.addAll(elements, expected); + } + + @Override + public boolean matches(Object observed) { + if (observed instanceof Iterable<?>) { + Multiset<Object> observedElements = HashMultiset.create((Iterable<?>) observed); + return elements.equals(observedElements); + } + return false; + } + + @Override + public void appendTo(StringBuffer buffer) { + buffer.append("eqIterable(").append(elements).append(")"); + } + + /** + * When used in EasyMock expectations, this matches an Iterable having the same elements in any + * order. + * + * @return null, to avoid a compile time error. + */ + public static <T> Iterable<T> eqIterable(Iterable<T> in) { + reportMatcher(new IterableEquals(in)); + return null; + } + + /** + * When used in EasyMock expectations, this matches a List having the same elements in any order. + * + * @return null, to avoid a compile time error. + */ + public static <T> List<T> eqList(Iterable<T> in) { + reportMatcher(new IterableEquals(in)); + return null; + } + + /** + * When used in EasyMock expectations, this matches a Collection having the same elements in any + * order. + * + * @return null, to avoid a compile time error. + */ + public static <T> Collection<T> eqCollection(Iterable<T> in) { + reportMatcher(new IterableEquals(in)); + return null; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java b/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java new file mode 100644 index 0000000..3b97118 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java @@ -0,0 +1,158 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.testing.junit.rules; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + +import org.junit.rules.MethodRule; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.Statement; + +/** + * A test method annotation useful for smoking out flaky behavior in tests. + * + * @see Retry.Rule RetryRule needed to enable this annotation in a test class. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Retry { + + /** + * The number of times to retry the test. + * + * When a {@link Retry.Rule} is installed and a test method is annotated for {@literal @Retry}, + * it will be retried 0 to N times. If times is negative, it is treated as 0 and no retries are + * performed. If times is >= 1 then a successful execution of the annotated test method is + * retried until the 1st error, failure or otherwise up to {@code times} times. + */ + int times() default 1; + + /** + * Enables {@link Retry @Retry}able tests. + */ + class Rule implements MethodRule { + private interface ThrowableFactory { + Throwable create(String message, Throwable cause); + } + + private static Throwable annotate( + int tryNumber, + final int maxRetries, + Throwable cause, + String prefix, + ThrowableFactory throwableFactory) { + + Throwable annotated = + throwableFactory.create( + String.format("%s on try %d of %d: %s", prefix, tryNumber, maxRetries + 1, + Objects.firstNonNull(cause.getMessage(), "")), cause); + annotated.setStackTrace(cause.getStackTrace()); + return annotated; + } + + static class RetriedAssertionError extends AssertionError { + private final int tryNumber; + private final int maxRetries; + + RetriedAssertionError(int tryNumber, int maxRetries, String message, Throwable cause) { + // We do a manual initCause here to be compatible with the Java 1.6 AssertionError + // constructors. + super(message); + initCause(cause); + + this.tryNumber = tryNumber; + this.maxRetries = maxRetries; + } + + @VisibleForTesting + int getTryNumber() { + return tryNumber; + } + + @VisibleForTesting + int getMaxRetries() { + return maxRetries; + } + } + + private static Throwable annotate(final int tryNumber, final int maxRetries, AssertionError e) { + return annotate(tryNumber, maxRetries, e, "Failure", new ThrowableFactory() { + @Override public Throwable create(String message, Throwable cause) { + return new RetriedAssertionError(tryNumber, maxRetries, message, cause); + } + }); + } + + static class RetriedException extends Exception { + private final int tryNumber; + private final int maxRetries; + + RetriedException(int tryNumber, int maxRetries, String message, Throwable cause) { + super(message, cause); + this.tryNumber = tryNumber; + this.maxRetries = maxRetries; + } + + @VisibleForTesting + int getTryNumber() { + return tryNumber; + } + + @VisibleForTesting + int getMaxRetries() { + return maxRetries; + } + } + + private static Throwable annotate(final int tryNumber, final int maxRetries, Exception e) { + return annotate(tryNumber, maxRetries, e, "Error", new ThrowableFactory() { + @Override public Throwable create(String message, Throwable cause) { + return new RetriedException(tryNumber, maxRetries, message, cause); + } + }); + } + + @Override + public Statement apply(final Statement statement, FrameworkMethod method, Object receiver) { + Retry retry = method.getAnnotation(Retry.class); + if (retry == null || retry.times() <= 0) { + return statement; + } else { + final int times = retry.times(); + return new Statement() { + @Override public void evaluate() throws Throwable { + for (int i = 0; i <= times; i++) { + try { + statement.evaluate(); + } catch (AssertionError e) { + throw annotate(i + 1, times, e); + // We purposefully catch any non-assertion exceptions in order to tag the try count + // for erroring (as opposed to failing) tests. + // SUPPRESS CHECKSTYLE RegexpSinglelineJava + } catch (Exception e) { + throw annotate(i + 1, times, e); + } + } + } + }; + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java b/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java new file mode 100644 index 0000000..cef57cc --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java @@ -0,0 +1,31 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.testing.mockito; + +import org.junit.Before; +import org.mockito.MockitoAnnotations; + +/** + * A base class for tests that use Mockito. Before each test, it initializes all the mocks + * declared in the class. + */ +public abstract class MockitoTest { + /** + * Initializes all fields annotated with {@link org.mockito.Mock}. + */ + @Before + public final void initMockito() { + MockitoAnnotations.initMocks(this); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/Config.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/Config.java b/commons/src/main/java/org/apache/aurora/common/thrift/Config.java new file mode 100644 index 0000000..7ab122b --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/Config.java @@ -0,0 +1,302 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.common.stats.StatsProvider; + +/** + * Represents the configuration for a thrift call. Use {@link #builder()} to create a new one or + * or {@link #builder(Config)} to create a new config based on another config. + * + * <p>If a deadline is specified, it acts as a global timeout for each thrift call made. + * Obtaining connections, performing the remote call and executing retries are all expected to + * complete within this deadline. When the specified deadline is not met, an + * {@link TTimeoutException} will be thrown. + * + * <p>If max retries is specified as zero (never retry), then the list of retryable exceptions are + * ignored. It is only when max retries is greater than zero that list of retryable exceptions is + * used to determine if a particular failed call should be retried. + * + * @author John Sirois + */ +public class Config { + + /** + * Created a builder for a new {@link Config}. Default values are as follows: + * <ul> + * <li>{@link #getRequestTimeout()} 0 + * <li>{@link #getMaxRetries()} 0 + * <li>{@link #getRetryableExceptions()} [] + * <li>{@link #isDebug()} ()} false + * </ul> + */ + public static Builder builder() { + return new Builder(); + } + + /** + * + * @param config the builder configuration to use + */ + public static Builder builder(Config config) { + Preconditions.checkNotNull(config); + return new Builder(config); + } + + private static final Amount<Long,Time> DEADLINE_BLOCKING = Amount.of(0L, Time.MILLISECONDS); + + @VisibleForTesting + static final Amount<Long,Time> DEFAULT_CONNECT_TIMEOUT = Amount.of(5L, Time.SECONDS); + + private Amount<Long, Time> requestTimeout = DEADLINE_BLOCKING; + private Amount<Long, Time> connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int maxRetries; + private ImmutableSet<Class<? extends Exception>> retryableExceptions = ImmutableSet.of(); + private boolean debug = false; + private boolean enableStats = true; + private StatsProvider statsProvider = Stats.STATS_PROVIDER; + + private Config() { + // defaults + } + + private Config(Config copyFrom) { + requestTimeout = copyFrom.requestTimeout; + maxRetries = copyFrom.maxRetries; + retryableExceptions = copyFrom.retryableExceptions; + debug = copyFrom.debug; + statsProvider = copyFrom.statsProvider; + } + + /** + * Returns the maximum time to wait for any thrift call to complete. A deadline of 0 means to + * wait forever + */ + public Amount<Long, Time> getRequestTimeout() { + return requestTimeout; + } + + /** + * Returns the maximum time to wait for a connection to be established. A deadline of 0 means to + * wait forever + */ + public Amount<Long, Time> getConnectTimeout() { + return connectTimeout; + } + + /** + * Returns the maximum number of retries to perform for each thrift call. A value of 0 means to + * never retry and in this case {@link #getRetryableExceptions()} will be an empty set. + */ + public int getMaxRetries() { + return maxRetries; + } + + /** + * Returns the set of exceptions to retry calls for. The returned set will only be empty if + * {@link #getMaxRetries()} is 0. + */ + public ImmutableSet<Class<? extends Exception>> getRetryableExceptions() { + return retryableExceptions; + } + + /** + * Returns {@code true} if the client should log extra debugging information. Currently this + * includes method call arguments when RPCs fail with exceptions. + */ + public boolean isDebug() { + return debug; + } + + /** + * Returns {@code true} if the client should track request statistics. + */ + public boolean enableStats() { + return enableStats; + } + + /** + * Returns the stats provider to use to record Thrift client stats. + */ + public StatsProvider getStatsProvider() { + return statsProvider; + } + + // This was made public because it seems to be causing problems for scala users when it is not + // public. + public static abstract class AbstractBuilder<T extends AbstractBuilder> { + private final Config config; + + AbstractBuilder() { + this.config = new Config(); + } + + AbstractBuilder(Config template) { + Preconditions.checkNotNull(template); + this.config = new Config(template); + } + + protected abstract T getThis(); + + // TODO(John Sirois): extra validation or design ... can currently do strange things like: + // builder.blocking().withDeadline(1, TimeUnit.MILLISECONDS) + // builder.noRetries().retryOn(TException.class) + + /** + * Specifies that all calls be blocking calls with no inherent deadline. It may be the + * case that underlying transports will eventually deadline, but {@link Thrift} will not + * enforce a deadline. + */ + public final T blocking() { + config.requestTimeout = DEADLINE_BLOCKING; + return getThis(); + } + + /** + * Specifies that all calls be subject to a global timeout. This deadline includes all call + * activities, including obtaining a free connection and any automatic retries. + */ + public final T withRequestTimeout(Amount<Long, Time> timeout) { + Preconditions.checkNotNull(timeout); + Preconditions.checkArgument(timeout.getValue() >= 0, + "A negative deadline is invalid: %s", timeout); + config.requestTimeout = timeout; + return getThis(); + } + + /** + * Assigns the timeout for all connections established with the blocking client. + * On an asynchronous client this timeout is only used for the connection pool lock + * acquisition on initial calls (not retries, @see withRetries). The actual network + * connection timeout for the asynchronous client is governed by socketTimeout. + * + * @param timeout Connection timeout. + * @return A reference to the builder. + */ + public final T withConnectTimeout(Amount<Long, Time> timeout) { + Preconditions.checkNotNull(timeout); + Preconditions.checkArgument(timeout.getValue() >= 0, + "A negative deadline is invalid: %s", timeout); + config.connectTimeout = timeout; + return getThis(); + } + + /** + * Specifies that no calls be automatically retried. + */ + public final T noRetries() { + config.maxRetries = 0; + config.retryableExceptions = ImmutableSet.of(); + return getThis(); + } + + /** + * Specifies that failing calls meeting {@link #retryOn retry} criteria be retried up to a + * maximum of {@code retries} times before failing. On an asynchronous client, these retries + * will be forced to be non-blocking, failing fast if they cannot immediately acquire the + * connection pool locks, so they only provide a best-effort retry strategy there. + */ + public final T withRetries(int retries) { + Preconditions.checkArgument(retries >= 0, "A negative retry count is invalid: %d", retries); + config.maxRetries = retries; + return getThis(); + } + + /** + * Specifies the set of exception classes that are to be considered retryable (if retries are + * enabled). Any exceptions thrown by the underlying thrift call will be considered retryable + * if they are an instance of any one of the specified exception classes. The set of exception + * classes must contain at least exception class. To specify no retries either use + * {@link #noRetries()} or pass zero to {@link #withRetries(int)}. + */ + public final T retryOn(Iterable<? extends Class<? extends Exception>> retryableExceptions) { + Preconditions.checkNotNull(retryableExceptions); + ImmutableSet<Class<? extends Exception>> classes = + ImmutableSet.copyOf(Iterables.filter(retryableExceptions, Predicates.notNull())); + Preconditions.checkArgument(!classes.isEmpty(), + "Must provide at least one retryable exception class"); + config.retryableExceptions = classes; + return getThis(); + } + + /** + * Specifies the set of exception classes that are to be considered retryable (if retries are + * enabled). Any exceptions thrown by the underlying thrift call will be considered retryable + * if they are an instance of any one of the specified exception classes. The set of exception + * classes must contain at least exception class. To specify no retries either use + * {@link #noRetries()} or pass zero to {@link #withRetries(int)}. + */ + public final T retryOn(Class<? extends Exception> exception) { + Preconditions.checkNotNull(exception); + config.retryableExceptions = + ImmutableSet.<Class<? extends Exception>>builder().add(exception).build(); + return getThis(); + } + + /** + * When {@code debug == true}, specifies that extra debugging information should be logged. + */ + public final T withDebug(boolean debug) { + config.debug = debug; + return getThis(); + } + + /** + * Disables stats collection on the client (enabled by default). + */ + public T disableStats() { + config.enableStats = false; + return getThis(); + } + + /** + * Registers a custom stats provider to use to track various client stats. + */ + public T withStatsProvider(StatsProvider statsProvider) { + config.statsProvider = Preconditions.checkNotNull(statsProvider); + return getThis(); + } + + protected final Config getConfig() { + return config; + } + } + + public static final class Builder extends AbstractBuilder<Builder> { + private Builder() { + super(); + } + + private Builder(Config template) { + super(template); + } + + @Override + protected Builder getThis() { + return this; + } + + public Config create() { + return getConfig(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java b/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java new file mode 100644 index 0000000..54e2bd3 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift; + +import org.apache.thrift.TException; + +/** + * @author Adam Samet + * + * This is exception is thrown when there are no available instances of a thrift backend + * service to serve requests. + */ +public class TResourceExhaustedException extends TException { + + private static final long serialVersionUID = 1L; + + public TResourceExhaustedException(String message) { + super(message); + } + + public TResourceExhaustedException(Throwable cause) { + super(cause); + } + + public TResourceExhaustedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java b/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java new file mode 100644 index 0000000..068abea --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift; + +import org.apache.thrift.TException; + +/** + * @author Adam Samet + * + * This is exception is thrown when accessing a thrift service resource times out. + */ +public class TTimeoutException extends TException { + + private static final long serialVersionUID = 1L; + + public TTimeoutException(String message) { + super(message); + } + + public TTimeoutException(Throwable cause) { + super(cause); + } + + public TTimeoutException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java b/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java new file mode 100644 index 0000000..5e5df6d --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java @@ -0,0 +1,70 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift; + +import com.google.common.base.Preconditions; +import org.apache.aurora.common.net.pool.Connection; +import org.apache.aurora.common.net.pool.ConnectionPool; +import org.apache.thrift.transport.TTransport; + +import java.net.InetSocketAddress; + +/** + * A {@link ConnectionPool} compatible thrift connection that can work with any valid thrift + * transport. + * + * @author John Sirois + */ +public class TTransportConnection implements Connection<TTransport, InetSocketAddress> { + + private final TTransport transport; + private final InetSocketAddress endpoint; + + public TTransportConnection(TTransport transport, InetSocketAddress endpoint) { + this.transport = Preconditions.checkNotNull(transport); + this.endpoint = Preconditions.checkNotNull(endpoint); + } + + /** + * Returns {@code true} if the underlying transport is still open. To invalidate a transport it + * should be closed. + * + * <p>TODO(John Sirois): it seems like an improper soc to have validity testing here and not also an + * invalidation method - correct or accept + */ + @Override + public boolean isValid() { + return transport.isOpen(); + } + + @Override + public TTransport get() { + return transport; + } + + @Override + public void close() { + transport.close(); + } + + @Override + public InetSocketAddress getEndpoint() { + return endpoint; + } + + @Override + public String toString() { + return endpoint.toString(); + } +}