http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Stats.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Stats.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Stats.java
new file mode 100644
index 0000000..2191f77
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Stats.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.MapMaker;
+import com.google.common.util.concurrent.AtomicDouble;
+
+import org.apache.aurora.common.base.MorePreconditions;
+
+/**
+ * Manages {@link Stat}s that should be exported for monitoring.
+ *
+ * Statistic names may only contain {@code [A-Za-z0-9_]},
+ * all other chars will be logged as a warning and replaced with underscore on 
export.
+ *
+ * @author John Sirois
+ */
+public class Stats {
+
+  private static final Logger LOG = Logger.getLogger(Stats.class.getName());
+  private static final Pattern NOT_NAME_CHAR = 
Pattern.compile("[^A-Za-z0-9_]");
+
+  private static final ConcurrentMap<String, Stat<?>> VAR_MAP = new 
MapMaker().makeMap();
+
+  // Store stats in the order they were registered, so that derived variables 
are
+  // sampled after their inputs.
+  private static final Collection<RecordingStat<? extends Number>> 
ORDERED_NUMERIC_STATS =
+      new ConcurrentLinkedQueue<RecordingStat<? extends Number>>();
+
+  private static final Cache<String, RecordingStat<? extends Number>> 
NUMERIC_STATS =
+      CacheBuilder.newBuilder().build();
+
+  public static String normalizeName(String name) {
+    return NOT_NAME_CHAR.matcher(name).replaceAll("_");
+  }
+
+  static String validateName(String name) {
+    String normalized = normalizeName(name);
+    if (!name.equals(normalized)) {
+      LOG.warning("Invalid stat name " + name + " exported as " + normalized);
+    }
+    return normalized;
+  }
+
+  /**
+   * A {@link StatsProvider} that exports gauge-style stats to the global 
{@link Stat}s repository
+   * for time series tracking.
+   */
+  public static final StatsProvider STATS_PROVIDER = new StatsProvider() {
+    private final StatsProvider untracked = new StatsProvider() {
+      @Override public AtomicLong makeCounter(String name) {
+        final AtomicLong longVar = new AtomicLong();
+        Stats.exportStatic(new StatImpl<Long>(name) {
+          @Override public Long read() {
+            return longVar.get();
+          }
+        });
+        return longVar;
+      }
+
+      @Override public <T extends Number> Stat<T> makeGauge(String name, final 
Supplier<T> gauge) {
+        return Stats.exportStatic(new StatImpl<T>(name) {
+          @Override public T read() {
+            return gauge.get();
+          }
+        });
+      }
+
+      @Override public StatsProvider untracked() {
+        return this;
+      }
+
+      @Override public RequestTimer makeRequestTimer(String name) {
+        // TODO(William Farner): Add support for this once a caller shows 
interest in using it.
+        throw new UnsupportedOperationException();
+      }
+    };
+
+    @Override public <T extends Number> Stat<T> makeGauge(String name, final 
Supplier<T> gauge) {
+      return Stats.export(new StatImpl<T>(name) {
+        @Override public T read() {
+          return gauge.get();
+        }
+      });
+    }
+
+    @Override public AtomicLong makeCounter(String name) {
+      return Stats.exportLong(name);
+    }
+
+    @Override public StatsProvider untracked() {
+      return untracked;
+    }
+
+    @Override public RequestTimer makeRequestTimer(String name) {
+      return new RequestStats(name);
+    }
+  };
+
+  /**
+   * A {@link StatRegistry} that provides stats registered with the global 
{@link Stat}s repository.
+   */
+  public static final StatRegistry STAT_REGISTRY = new StatRegistry() {
+    @Override public Iterable<RecordingStat<? extends Number>> getStats() {
+      return Stats.getNumericVariables();
+    }
+  };
+
+  private static class ExportStat implements Callable<RecordingStat<? extends 
Number>> {
+    private final AtomicBoolean called = new AtomicBoolean(false);
+
+    private final RecordingStat<? extends Number> stat;
+    private final String name;
+
+    private <T extends Number> ExportStat(String name, Stat<T> stat) {
+      this.name = name;
+      this.stat = (stat instanceof RecordingStat)
+          ? (RecordingStat<? extends Number>) stat
+          : new RecordingStatImpl<T>(stat);
+    }
+
+    @Override
+    public RecordingStat<? extends Number> call() {
+      try {
+        exportStaticInternal(name, stat);
+        ORDERED_NUMERIC_STATS.add(stat);
+        return stat;
+      } finally {
+        called.set(true);
+      }
+    }
+  }
+
+  /**
+   * Exports a stat for tracking.
+   * if the stat provided implements the internal {@link RecordingStat} 
interface, it will be
+   * registered for time series collection and returned.  If a {@link 
RecordingStat} with the same
+   * name as the provided stat has already been exported, the 
previously-exported stat will be
+   * returned and no additional registration will be performed.
+   *
+   * @param var The variable to export.
+   * @param <T> The value exported by the variable.
+   * @return A reference to the stat that was stored.  The stat returned may 
not be equal to the
+   *    stat provided.  If a variable was already returned with the same
+   */
+  public static <T extends Number> Stat<T> export(Stat<T> var) {
+    String validatedName = 
validateName(MorePreconditions.checkNotBlank(var.getName()));
+    ExportStat exportStat = new ExportStat(validatedName, var);
+    try {
+      @SuppressWarnings("unchecked")
+      Stat<T> exported = (Stat<T>) NUMERIC_STATS.get(validatedName, 
exportStat);
+      return exported;
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "Unexpected error exporting stat " + validatedName, e.getCause());
+    } finally {
+      if (!exportStat.called.get()) {
+        LOG.warning("Re-using already registered variable for key " + 
validatedName);
+      }
+    }
+  }
+
+  /**
+   * Exports a string stat.
+   * String-based statistics will not be registered for time series collection.
+   *
+   * @param var Stat to export.
+   * @return A reference back to {@code var}, or the variable that was already 
registered under the
+   *    same name as {@code var}.
+   */
+  public static Stat<String> exportString(Stat<String> var) {
+    return exportStatic(var);
+  }
+
+  /**
+   * Adds a collection of stats for export.
+   *
+   * @param vars The variables to add.
+   */
+  public static void exportAll(Iterable<Stat<? extends Number>> vars) {
+    for (Stat<? extends Number> var : vars) {
+      export(var);
+    }
+  }
+
+  /**
+   * Exports an {@link AtomicInteger}, which will be included in time series 
tracking.
+   *
+   * @param name The name to export the stat with.
+   * @param intVar The variable to export.
+   * @return A reference to the {@link AtomicInteger} provided.
+   */
+  public static AtomicInteger export(final String name, final AtomicInteger 
intVar) {
+    export(new SampledStat<Integer>(name, 0) {
+      @Override public Integer doSample() { return intVar.get(); }
+    });
+
+    return intVar;
+  }
+
+  /**
+   * Creates and exports an {@link AtomicInteger}.
+   *
+   * @param name The name to export the stat with.
+   * @return A reference to the {@link AtomicInteger} created.
+   */
+  public static AtomicInteger exportInt(String name) {
+    return exportInt(name, 0);
+  }
+
+  /**
+   * Creates and exports an {@link AtomicInteger} with initial value.
+   *
+   * @param name The name to export the stat with.
+   * @param initialValue The initial stat value.
+   * @return A reference to the {@link AtomicInteger} created.
+   */
+  public static AtomicInteger exportInt(String name, int initialValue) {
+    return export(name, new AtomicInteger(initialValue));
+  }
+
+  /**
+   * Exports an {@link AtomicLong}, which will be included in time series 
tracking.
+   *
+   * @param name The name to export the stat with.
+   * @param longVar The variable to export.
+   * @return A reference to the {@link AtomicLong} provided.
+   */
+  public static AtomicLong export(String name, final AtomicLong longVar) {
+    export(new StatImpl<Long>(name) {
+      @Override public Long read() { return longVar.get(); }
+    });
+
+    return longVar;
+  }
+
+  /**
+   * Creates and exports an {@link AtomicLong}.
+   *
+   * @param name The name to export the stat with.
+   * @return A reference to the {@link AtomicLong} created.
+   */
+  public static AtomicLong exportLong(String name) {
+    return exportLong(name, 0L);
+  }
+
+  /**
+   * Creates and exports an {@link AtomicLong} with initial value.
+   *
+   * @param name The name to export the stat with.
+   * @param initialValue The initial stat value.
+   * @return A reference to the {@link AtomicLong} created.
+   */
+  public static AtomicLong exportLong(String name, long initialValue) {
+    return export(name, new AtomicLong(initialValue));
+  }
+
+  /**
+   * Exports an {@link AtomicDouble}, which will be included in time series 
tracking.
+   *
+   * @param name The name to export the stat with.
+   * @param doubleVar The variable to export.
+   * @return A reference to the {@link AtomicDouble} provided.
+   */
+  public static AtomicDouble export(String name, final AtomicDouble doubleVar) 
{
+    export(new StatImpl<Double>(name) {
+      @Override public Double read() { return doubleVar.doubleValue(); }
+    });
+
+    return doubleVar;
+  }
+
+  /**
+   * Creates and exports an {@link AtomicDouble}.
+   *
+   * @param name The name to export the stat with.
+   * @return A reference to the {@link AtomicDouble} created.
+   */
+  public static AtomicDouble exportDouble(String name) {
+    return exportDouble(name, 0.0);
+  }
+
+  /**
+   * Creates and exports an {@link AtomicDouble} with initial value.
+   *
+   * @param name The name to export the stat with.
+   * @param initialValue The initial stat value.
+   * @return A reference to the {@link AtomicDouble} created.
+   */
+  public static AtomicDouble exportDouble(String name, double initialValue) {
+    return export(name, new AtomicDouble(initialValue));
+  }
+
+  /**
+   * Exports a metric that tracks the size of a collection.
+   *
+   * @param name Name of the stat to export.
+   * @param collection Collection whose size should be tracked.
+   */
+  public static void exportSize(String name, final Collection<?> collection) {
+    export(new StatImpl<Integer>(name) {
+      @Override public Integer read() {
+        return collection.size();
+      }
+    });
+  }
+
+  /**
+   * Exports a metric that tracks the size of a map.
+   *
+   * @param name Name of the stat to export.
+   * @param map Map whose size should be tracked.
+   */
+  public static void exportSize(String name, final Map<?, ?> map) {
+    export(new StatImpl<Integer>(name) {
+      @Override public Integer read() {
+        return map.size();
+      }
+    });
+  }
+
+  /**
+   * Exports a metric that tracks the size of a cache.
+   *
+   * @param name Name of the stat to export.
+   * @param cache Cache whose size should be tracked.
+   */
+  public static void exportSize(String name, final Cache<?, ?> cache) {
+    export(new StatImpl<Long>(name) {
+      @Override public Long read() {
+        return cache.size();
+      }
+    });
+  }
+
+  /**
+   * Exports a 'static' statistic, which will not be registered for time 
series tracking.
+   *
+   * @param var Variable to statically export.
+   * @return A reference back to the provided {@link Stat}.
+   */
+  public static <T> Stat<T> exportStatic(Stat<T> var) {
+    String validatedName = 
validateName(MorePreconditions.checkNotBlank(var.getName()));
+    exportStaticInternal(validatedName, var);
+    return var;
+  }
+
+  private static void exportStaticInternal(String name, Stat<?> stat) {
+    if (VAR_MAP.put(name, stat) != null) {
+      LOG.warning("Warning - exported variable collision on " + name);
+    }
+  }
+
+  /**
+   * Fetches all registered stat.
+   *
+   * @return An iterable of all registered stats.
+   */
+  public static Iterable<Stat<?>> getVariables() {
+    return ImmutableList.copyOf(VAR_MAP.values());
+  }
+
+  static Iterable<RecordingStat<? extends Number>> getNumericVariables() {
+    return ImmutableList.copyOf(ORDERED_NUMERIC_STATS);
+  }
+
+  @VisibleForTesting
+  public static void flush() {
+    VAR_MAP.clear();
+    ORDERED_NUMERIC_STATS.clear();
+    NUMERIC_STATS.invalidateAll();
+  }
+
+  public static <T> Stat<T> getVariable(String name) {
+    MorePreconditions.checkNotBlank(name);
+    @SuppressWarnings("unchecked")
+    Stat<T> stat = (Stat<T>) VAR_MAP.get(name);
+    return stat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java 
b/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java
new file mode 100644
index 0000000..cb1c56b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/StatsProvider.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Supplier;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A minimal interface to a Stats repository.
+ *
+ * @author John Sirois
+ */
+public interface StatsProvider {
+
+  /**
+   * Creates and exports a counter for tracking.
+   *
+   * @param name The name to export the stat with.
+   * @return A reference to the counter that will be tracked for incrementing.
+   */
+  AtomicLong makeCounter(String name);
+
+  /**
+   * Exports a read-only value for tracking.
+   *
+   * @param name The name of the variable to export.
+   * @param gauge The supplier of the instantaneous values to export.
+   * @param <T> The type of number exported by the variable.
+   * @return A reference to the stat that was stored.
+   */
+  <T extends Number> Stat<T> makeGauge(String name, Supplier<T> gauge);
+
+  /**
+   * Gets a stats provider that does not track stats in an internal time 
series repository.
+   * The stored variables will only be available as instantaneous values.
+   *
+   * @return A stats provider that creates untracked stats.
+   */
+  StatsProvider untracked();
+
+  /**
+   * A stat for tracking service requests.
+   */
+  interface RequestTimer {
+
+    /**
+     * Accumulates a request and its latency.
+     *
+     * @param latencyMicros The elapsed time required to complete the request.
+     */
+    void requestComplete(long latencyMicros);
+
+    /**
+     * Accumulates the error counter and the request counter.
+     */
+    void incErrors();
+
+    /**
+     * Accumulates the reconnect counter.
+     */
+    void incReconnects();
+
+    /**
+     * Accumulates the timeout counter.
+     */
+    void incTimeouts();
+  }
+
+  /**
+   * Creates and exports a sets of stats that allows for typical rROC request 
tracking.
+   *
+   * @param name The name to export the stat with.
+   * @return A reference to the request timer that can be used to track RPCs.
+   */
+  RequestTimer makeRequestTimer(String name);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java 
b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java
new file mode 100644
index 0000000..45f604c
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeries.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Calendar;
+
+/**
+ * A time series of values.
+ *
+ * @author William Farner
+ */
+public interface TimeSeries {
+
+  /**
+   * A name describing this time series.
+   *
+   * @return The name of this time series data.
+   */
+  public String getName();
+
+  /**
+   * A series of numbers representing regular samples of a variable.
+   *
+   * @return The time series of sample values.
+   */
+  public Iterable<Number> getSamples();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java
 
b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java
new file mode 100644
index 0000000..6928e48
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepository.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Set;
+
+import org.apache.aurora.common.application.ShutdownRegistry;
+
+/**
+ * A repository for time series data.
+ *
+ * @author William Farner
+ */
+public interface TimeSeriesRepository {
+
+  /**
+   * Starts the time series sampler.
+   *
+   * @param shutdownRegistry An action registry that the repository can use to 
register a shutdown
+   *    for the sampler.
+   */
+  public void start(ShutdownRegistry shutdownRegistry);
+
+  /**
+   * Fetches the names of all available time series.
+   *
+   * @return Available time series, which can then be obtained by calling 
{@link #get(String)}.
+   */
+  public Set<String> getAvailableSeries();
+
+  /**
+   * Fetches a time series by name.
+   *
+   * @param name The name of the time series to fetch.
+   * @return The time series registered with the given name, or {@code null} 
if no such time series
+   *     has been registered.
+   */
+  public TimeSeries get(String name);
+
+  /**
+   * Gets an ordered iterable of the timestamps that all timeseries were 
sampled at.
+   *
+   * @return All current timestamps.
+   */
+  public Iterable<Number> getTimestamps();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java
 
b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java
new file mode 100644
index 0000000..387e379
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/TimeSeriesRepositoryImpl.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.collections.BoundedQueue;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A simple in-memory repository for exported variables.
+ *
+ * @author John Sirois
+ */
+public class TimeSeriesRepositoryImpl implements TimeSeriesRepository {
+
+  private static final Logger LOG = 
Logger.getLogger(TimeSeriesRepositoryImpl.class.getName());
+
+  /**
+   * {@literal @Named} binding key for the sampling period.
+   */
+  public static final String SAMPLE_PERIOD =
+      "com.twitter.common.stats.TimeSeriesRepositoryImpl.SAMPLE_PERIOD";
+
+  /**
+   * {@literal @Named} binding key for the maximum number of retained samples.
+   */
+  public static final String SAMPLE_RETENTION_PERIOD =
+      
"com.twitter.common.stats.TimeSeriesRepositoryImpl.SAMPLE_RETENTION_PERIOD";
+
+  private final SlidingStats scrapeDuration = new 
SlidingStats("variable_scrape", "micros");
+
+  // We store TimeSeriesImpl, which allows us to add samples.
+  private final LoadingCache<String, TimeSeriesImpl> timeSeries;
+  private final BoundedQueue<Number> timestamps;
+
+  private final StatRegistry statRegistry;
+  private final Amount<Long, Time> samplePeriod;
+  private final int retainedSampleLimit;
+
+  @Inject
+  public TimeSeriesRepositoryImpl(
+      StatRegistry statRegistry,
+      @Named(SAMPLE_PERIOD) Amount<Long, Time> samplePeriod,
+      @Named(SAMPLE_RETENTION_PERIOD) final Amount<Long, Time> 
retentionPeriod) {
+    this.statRegistry = checkNotNull(statRegistry);
+    this.samplePeriod = checkNotNull(samplePeriod);
+    Preconditions.checkArgument(samplePeriod.getValue() > 0, "Sample period 
must be positive.");
+    checkNotNull(retentionPeriod);
+    Preconditions.checkArgument(retentionPeriod.getValue() > 0,
+        "Sample retention period must be positive.");
+
+    retainedSampleLimit = (int) (retentionPeriod.as(Time.SECONDS) / 
samplePeriod.as(Time.SECONDS));
+    Preconditions.checkArgument(retainedSampleLimit > 0,
+        "Sample retention period must be greater than sample period.");
+
+    timeSeries = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, TimeSeriesImpl>() {
+          @Override public TimeSeriesImpl load(final String name) {
+            TimeSeriesImpl timeSeries = new TimeSeriesImpl(name);
+
+            // Backfill so we have data for pre-accumulated timestamps.
+            int numTimestamps = timestamps.size();
+            if (numTimestamps != 0) {
+              for (int i = 1; i < numTimestamps; i++) {
+                timeSeries.addSample(0L);
+              }
+            }
+
+            return timeSeries;
+          }
+        });
+
+    timestamps = new BoundedQueue<Number>(retainedSampleLimit);
+  }
+
+  /**
+   * Starts the variable sampler, which will fetch variables {@link Stats} on 
the given period.
+   *
+   */
+  @Override
+  public void start(ShutdownRegistry shutdownRegistry) {
+    checkNotNull(shutdownRegistry);
+    checkNotNull(samplePeriod);
+    Preconditions.checkArgument(samplePeriod.getValue() > 0);
+
+    final ScheduledExecutorService executor = new 
ScheduledThreadPoolExecutor(1 /* One thread. */,
+        new 
ThreadFactoryBuilder().setNameFormat("VariableSampler-%d").setDaemon(true).build());
+
+    final AtomicBoolean shouldSample = new AtomicBoolean(true);
+    final Runnable sampler = new Runnable() {
+      @Override public void run() {
+        if (shouldSample.get()) {
+          try {
+            runSampler(Clock.SYSTEM_CLOCK);
+          } catch (Exception e) {
+            LOG.log(Level.SEVERE, "ignoring runSampler failure", e);
+          }
+        }
+      }
+    };
+
+    executor.scheduleAtFixedRate(sampler, samplePeriod.getValue(), 
samplePeriod.getValue(),
+        samplePeriod.getUnit().getTimeUnit());
+    shutdownRegistry.addAction(new Command() {
+      @Override
+      public void execute() throws RuntimeException {
+        shouldSample.set(false);
+        executor.shutdown();
+        LOG.info("Variable sampler shut down");
+      }
+    });
+  }
+
+  @VisibleForTesting
+  synchronized void runSampler(Clock clock) {
+    timestamps.add(clock.nowMillis());
+
+    long startNanos = clock.nowNanos();
+    for (RecordingStat<? extends Number> var : statRegistry.getStats()) {
+      timeSeries.getUnchecked(var.getName()).addSample(var.sample());
+    }
+    scrapeDuration.accumulate(
+        Amount.of(clock.nowNanos() - startNanos, 
Time.NANOSECONDS).as(Time.MICROSECONDS));
+  }
+
+  @Override
+  public synchronized Set<String> getAvailableSeries() {
+    return ImmutableSet.copyOf(timeSeries.asMap().keySet());
+  }
+
+  @Override
+  public synchronized TimeSeries get(String name) {
+    if (!timeSeries.asMap().containsKey(name)) return null;
+    return timeSeries.getUnchecked(name);
+  }
+
+  @Override
+  public synchronized Iterable<Number> getTimestamps() {
+    return Iterables.unmodifiableIterable(timestamps);
+  }
+
+  private class TimeSeriesImpl implements TimeSeries {
+    private final String name;
+    private final BoundedQueue<Number> samples;
+
+    TimeSeriesImpl(String name) {
+      this.name = name;
+      samples = new BoundedQueue<Number>(retainedSampleLimit);
+    }
+
+    @Override public String getName() {
+      return name;
+    }
+
+    void addSample(Number value) {
+      samples.add(value);
+    }
+
+    @Override public Iterable<Number> getSamples() {
+      return Iterables.unmodifiableIterable(samples);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java 
b/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java
new file mode 100644
index 0000000..12ab468
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/stats/Windowed.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import java.lang.reflect.Array;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+
+/**
+ * Windowed is an abstraction that let you span a class across a sliding 
window.
+ * It creates a ring buffer of T and reuse the buffer after clearing it or use 
a new one (via
+ * the {@code clearer} function).
+ *
+ * <pre>
+ *          tenured instances
+ *  ++++++++++++++++++++++++++++++++++
+ * [----A-----][-----B----][-----C----][-----D----]
+ *                                      ++++++++++
+ *                                    current instance
+ * </pre>
+ *
+ * The schema above represents the valid instances over time
+ * (A,B,C) are the tenured ones
+ * D is the current instance
+ */
+public abstract class Windowed<T> {
+  private Class<T> clazz;
+  protected final T[] buffers;
+  private final long sliceDuration;
+  private final Clock clock;
+  private long index = -1L;
+  private Function<T, T> clearer;
+
+  /**
+   * @param clazz the type of the underlying element T
+   * @param window the length of the window
+   * @param slices the number of slices (the window will be divided into 
{@code slices} slices)
+   * @param sliceProvider the supplier of element
+   * @param clearer the function that clear (or re-create) an element
+   * @param clock  the clock used for to select the appropriate histogram
+   */
+  public Windowed(Class<T> clazz, Amount<Long, Time> window, int slices,
+      Supplier<T> sliceProvider, Function<T, T> clearer, Clock clock) {
+    Preconditions.checkNotNull(window);
+    // Ensure that we have at least 1ms per slice
+    Preconditions.checkArgument(window.as(Time.MILLISECONDS) > (slices + 1));
+    Preconditions.checkArgument(window.as(Time.MILLISECONDS) > (slices + 1));
+    Preconditions.checkArgument(0 < slices);
+    Preconditions.checkNotNull(sliceProvider);
+    Preconditions.checkNotNull(clock);
+
+    this.clazz = clazz;
+    this.sliceDuration = window.as(Time.MILLISECONDS) / slices;
+    @SuppressWarnings("unchecked") // safe because we have the clazz proof of 
type H
+    T[] bufs = (T[]) Array.newInstance(clazz, slices + 1);
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i] = sliceProvider.get();
+    }
+    this.buffers = bufs;
+    this.clearer = clearer;
+    this.clock = clock;
+  }
+
+  /**
+   * Return the index of the latest Histogram.
+   * You have to modulo it with buffer.length before accessing the array with 
this number.
+   */
+  protected int getCurrentIndex() {
+    long now = clock.nowMillis();
+    return (int) (now / sliceDuration);
+  }
+
+  /**
+   * Check for expired elements and return the current one.
+   */
+  protected T getCurrent() {
+    sync(getCurrentIndex());
+    return buffers[(int) (index % buffers.length)];
+  }
+
+  /**
+   * Check for expired elements and return all the tenured (old) ones.
+   */
+  protected T[] getTenured() {
+    long currentIndex = getCurrentIndex();
+    sync(currentIndex);
+    @SuppressWarnings("unchecked") // safe because we have the clazz proof of 
type T
+    T[] tmp = (T[]) Array.newInstance(clazz, buffers.length - 1);
+    for (int i = 0; i < tmp.length; i++) {
+      int idx = (int) ((currentIndex + 1 + i) % buffers.length);
+      tmp[i] = buffers[idx];
+    }
+    return tmp;
+  }
+
+  /**
+   * Clear all the elements.
+   */
+  public void clear() {
+    for (int i = 0; i <= buffers.length; i++) {
+      buffers[i] = clearer.apply(buffers[i]);
+    }
+  }
+
+  /**
+   * Synchronize elements with a point in time.
+   * i.e. Check for expired ones and clear them, and update the index variable.
+   */
+  protected void sync(long currentIndex) {
+    if (index < currentIndex) {
+      long from = Math.max(index + 1, currentIndex - buffers.length + 1);
+      for (long i = from; i <= currentIndex; i++) {
+        int idx = (int) (i % buffers.length);
+        buffers[idx] = clearer.apply(buffers[idx]);
+      }
+      index = currentIndex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java
 
b/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java
new file mode 100644
index 0000000..6461a2e
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/WindowedApproxHistogram.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+
+/**
+ * WindowedApproxHistogram is an implementation of WindowedHistogram with an
+ * ApproximateHistogram as the underlying storing histogram.
+ */
+public class WindowedApproxHistogram extends 
WindowedHistogram<ApproximateHistogram> {
+  @VisibleForTesting public static final int DEFAULT_SLICES = 3;
+  @VisibleForTesting public static final Amount<Long, Time> DEFAULT_WINDOW =
+      Amount.of(1L, Time.MINUTES);
+  @VisibleForTesting public static final Amount<Long, Data> DEFAULT_MAX_MEMORY 
= Amount.of(
+      (DEFAULT_SLICES + 1) * 
ApproximateHistogram.DEFAULT_MAX_MEMORY.as(Data.BYTES), Data.BYTES);
+
+  /**
+   * Create a {@code WindowedApproxHistogram } with a window duration of 
{@code window} and
+   * decomposed in {@code slices} Histograms. Those Histograms will 
individually take less than
+   * {@code maxMemory / (slices + 1)}. The clock will be used to find the 
correct index in the
+   * ring buffer.
+   *
+   * @param window duration of the window
+   * @param slices number of slices in the window
+   * @param maxMemory maximum memory used by the whole histogram
+   */
+  public WindowedApproxHistogram(Amount<Long, Time> window, final int slices,
+      final Amount<Long, Data> maxMemory, Clock clock) {
+    super(ApproximateHistogram.class, window, slices,
+        new Supplier<ApproximateHistogram>() {
+          private Amount<Long, Data> perHistogramMemory = Amount.of(
+              maxMemory.as(Data.BYTES) / (slices + 1), Data.BYTES);
+          @Override
+          public ApproximateHistogram get() {
+            return new ApproximateHistogram(perHistogramMemory);
+          }
+        },
+        new Function<ApproximateHistogram[], Histogram>() {
+          @Override
+          public Histogram apply(ApproximateHistogram[] histograms) {
+            return ApproximateHistogram.merge(histograms);
+          }
+        }, clock);
+  }
+
+  /**
+   * Create a {@code WindowedApproxHistogram } with a window duration of 
{@code window} and
+   * decomposed in {@code slices} Histograms. Those Histograms will 
individually have a
+   * precision of {@code precision / (slices + 1)}. The ticker will be used to 
measure elapsed
+   * time in the WindowedHistogram.
+   *
+   * @param window duration of the window
+   * @param slices number of slices in the window
+   * @param precision precision of the whole histogram
+   */
+  public WindowedApproxHistogram(Amount<Long, Time> window, final int slices,
+      final Precision precision, Clock clock) {
+    super(ApproximateHistogram.class, window, slices,
+        new Supplier<ApproximateHistogram>() {
+          private Precision perHistogramPrecision = new Precision(
+              precision.getEpsilon(), precision.getN() / (slices + 1));
+          @Override
+          public ApproximateHistogram get() {
+            return new ApproximateHistogram(perHistogramPrecision);
+          }
+        },
+        new Function<ApproximateHistogram[], Histogram>() {
+          @Override
+          public Histogram apply(ApproximateHistogram[] histograms) {
+            return ApproximateHistogram.merge(histograms);
+          }
+        }, clock);
+  }
+
+  /**
+   * Equivalent to calling
+   * {@link #WindowedApproxHistogram(Amount, int, Amount, Clock)}
+   * with the System clock.
+   */
+  public WindowedApproxHistogram(Amount<Long, Time> window, int slices,
+      Amount<Long, Data> maxMemory) {
+    this(window, slices, maxMemory, Clock.SYSTEM_CLOCK);
+  }
+
+  /**
+   * Equivalent to calling
+   * {@link #WindowedApproxHistogram(Amount, int, Amount)}
+   * with default window and slices.
+   */
+  public WindowedApproxHistogram(Amount<Long, Data> maxMemory) {
+    this(DEFAULT_WINDOW, DEFAULT_SLICES, maxMemory);
+  }
+
+  /**
+   * Equivalent to calling
+   * {@link #WindowedApproxHistogram(Amount, int, Precision, Clock)}
+   * with the System clock.
+   */
+  public WindowedApproxHistogram(Amount<Long, Time> window, int slices, 
Precision precision) {
+    this(window, slices, precision, Clock.SYSTEM_CLOCK);
+  }
+
+  /**
+   * Equivalent to calling
+   * {@link #WindowedApproxHistogram(Amount, int, Precision)}
+   * with default window and slices.
+   */
+  public WindowedApproxHistogram(Precision precision) {
+    this(DEFAULT_WINDOW, DEFAULT_SLICES, precision);
+  }
+
+  /**
+   * Equivalent to calling
+   * {@link #WindowedApproxHistogram(Amount, int, Amount, Clock)}
+   * with the default maxMemory parameter and System clock.
+   */
+  public WindowedApproxHistogram(Amount<Long, Time> window, int slices) {
+    this(window, slices, DEFAULT_MAX_MEMORY, Clock.SYSTEM_CLOCK);
+  }
+
+  /**
+   * WindowedApproxHistogram constructor with default values.
+   */
+  public WindowedApproxHistogram() {
+    this(DEFAULT_WINDOW, DEFAULT_SLICES, DEFAULT_MAX_MEMORY, 
Clock.SYSTEM_CLOCK);
+  }
+
+  /**
+   * WindowedApproxHistogram constructor with custom Clock (for testing 
purposes only).
+   */
+  @VisibleForTesting public WindowedApproxHistogram(Clock clock) {
+    this(DEFAULT_WINDOW, DEFAULT_SLICES, DEFAULT_MAX_MEMORY, clock);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java 
b/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java
new file mode 100644
index 0000000..23e2f4f
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/WindowedHistogram.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+
+/**
+ * Histogram windowed over time.
+ * <p>
+ * This histogram is composed of a series of ({@code slices} + 1) histograms 
representing a window
+ * of {@code range} duration. We only update the latest one, and we query the 
oldest ones (i.e. all
+ * histograms except the head).
+ * </p>
+ * <pre>
+ *      range
+ * <------------->
+ * [AAA][BBB][CCC][DDD]   here slices = 3
+ * --------------------->
+ *                t1  t2
+ *
+ *  For t in [t1,t2) we:
+ *  insert elements in DDD
+ *  query quantile over [AAA][BBB][CCC]
+ * </pre>
+ * <p>
+ * When {@code t} is in {@code [t1, t2)} we insert value into the latest 
histogram (DDD here),
+ * when we query the histogram, we 'merge' all other histograms (all except 
the latest) and query
+ * it. when {@code t > t2} the oldest histogram become the newest (like in a 
ring buffer) and
+ * so on ...
+ * </p>
+ * <p>
+ * Note: We use MergedHistogram class to represent a merged histogram without 
actually
+ * merging the underlying histograms.
+ * </p>
+ */
+public class WindowedHistogram<H extends Histogram> extends Windowed<H> 
implements Histogram {
+
+  private long mergedHistIndex = -1L;
+  private Function<H[], Histogram> merger;
+  private Histogram mergedHistogram = null;
+
+  /**
+   * Create a WindowedHistogram of {@code slices + 1} elements over a time 
{@code window}.
+   * This code is independent from the implementation of Histogram, you just 
need to provide
+   * a {@code Supplier<H>} to create the histograms and a {@code Function<H[], 
Histogram>} to
+   * merge them.
+   *
+   * @param clazz the type of the underlying Histogram H
+   * @param window the length of the window
+   * @param slices the number of slices (the window will be divided into 
{@code slices} slices)
+   * @param sliceProvider the supplier of histogram
+   * @param merger the function that merge an array of histogram H[] into a 
single Histogram
+   * @param clock the clock used for to select the appropriate histogram
+   */
+  public WindowedHistogram(Class<H> clazz, Amount<Long, Time> window, int 
slices,
+        Supplier<H> sliceProvider, Function<H[], Histogram> merger, Clock 
clock) {
+    super(clazz, window, slices, sliceProvider, new Function<H, H>() {
+      @Override
+      public H apply(H h) { h.clear(); return h; }
+    }, clock);
+    Preconditions.checkNotNull(merger);
+
+    this.merger = merger;
+  }
+
+  @Override
+  public synchronized void add(long x) {
+    getCurrent().add(x);
+  }
+
+  @Override
+  public synchronized void clear() {
+    for (Histogram h: buffers) {
+      h.clear();
+    }
+  }
+
+  @Override
+  public synchronized long getQuantile(double quantile) {
+    long currentIndex = getCurrentIndex();
+    if (mergedHistIndex < currentIndex) {
+      H[] tmp = getTenured();
+      mergedHistogram = merger.apply(tmp);
+      mergedHistIndex = currentIndex;
+    }
+    return mergedHistogram.getQuantile(quantile);
+  }
+
+  @Override
+  public synchronized long[] getQuantiles(double[] quantiles) {
+    return Histograms.extractQuantiles(this, quantiles);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java 
b/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java
new file mode 100644
index 0000000..ded3faf
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/WindowedStatistics.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Function;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+
+/**
+ * Keep track of statistics over a set of value in a sliding window.
+ * WARNING: The computation of the statistics needs to be explicitly requested 
with
+ * {@code refresh()} before reading any statistics.
+ *
+ * @see Windowed class for more details about how the window is parametrized.
+ */
+public class WindowedStatistics extends Windowed<Statistics> implements 
StatisticsInterface {
+  private int lastIndex = -1;
+  private double variance = 0.0;
+  private double mean = 0.0;
+  private long sum = 0L;
+  private long min = Long.MAX_VALUE;
+  private long max = Long.MIN_VALUE;
+  private long populationSize = 0L;
+
+  public WindowedStatistics(Amount<Long, Time> window, int slices, Clock 
clock) {
+    super(Statistics.class, window, slices,
+        new Supplier<Statistics>() {
+          @Override public Statistics get() { return new Statistics(); }
+        },
+        new Function<Statistics, Statistics>() {
+          @Override public Statistics apply(Statistics s) { s.clear(); return 
s; }
+        },
+        clock);
+  }
+
+  /**
+   * Construct a Statistics sliced over time in {@code slices + 1} windows.
+   * The {@code window} parameter represents the total window, that will be 
sliced into
+   * {@code slices + 1} parts.
+   *
+   * Ex: WindowedStatistics(Amount.of(1L, Time.MINUTES), 3) will be sliced 
like this:
+   * <pre>
+   *        20s         20s         20s         20s
+   *   [----A-----][-----B----][-----C----][-----D----]
+   * </pre>
+   * The current window is 'D' (the one you insert elements into) and the 
tenured windows
+   * are 'A', 'B', 'C' (the ones you read elements from).
+   */
+  public WindowedStatistics(Amount<Long, Time> window, int slices) {
+    this(window, slices, Clock.SYSTEM_CLOCK);
+  }
+
+  /**
+   * Equivalent to calling {@link #WindowedStatistics(Amount, int)} with a 1 
minute window
+   * and 3 slices.
+   */
+  public WindowedStatistics() {
+    this(Amount.of(1L, Time.MINUTES), 3, Clock.SYSTEM_CLOCK);
+  }
+
+  public void accumulate(long value) {
+    getCurrent().accumulate(value);
+  }
+
+  /**
+   * Compute all the statistics in one pass.
+   */
+  public void refresh() {
+    int currentIndex = getCurrentIndex();
+    if (lastIndex != currentIndex) {
+      lastIndex = currentIndex;
+      double x = 0.0;
+      variance = 0.0;
+      mean = 0.0;
+      sum = 0L;
+      populationSize = 0L;
+      min = Long.MAX_VALUE;
+      max = Long.MIN_VALUE;
+      for (Statistics s : getTenured()) {
+        if (s.populationSize() == 0) {
+          continue;
+        }
+        x += s.populationSize() * (s.variance() + s.mean() * s.mean());
+        sum += s.sum();
+        populationSize += s.populationSize();
+        min = Math.min(min, s.min());
+        max = Math.max(max, s.max());
+      }
+      if (populationSize != 0) {
+        mean = ((double) sum) / populationSize;
+        variance = x / populationSize - mean * mean;
+      }
+    }
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the variance of the aggregated windows
+   */
+  public double variance() {
+    return variance;
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the standard deviation of the aggregated windows
+   */
+  public double standardDeviation() {
+    return Math.sqrt(variance());
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the mean of the aggregated windows
+   */
+  public double mean() {
+    return mean;
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the sum of the aggregated windows
+   */
+  public long sum() {
+    return sum;
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the min of the aggregated windows
+   */
+  public long min() {
+    return min;
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the max of the aggregated windows
+   */
+  public long max() {
+    return max;
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the range of the aggregated windows
+   */
+  public long range() {
+    return max - min;
+  }
+
+  /**
+   * WARNING: You need to call refresh() to recompute the variance
+   * @return the population size of the aggregated windows
+   */
+  public long populationSize() {
+    return populationSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java
 
b/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java
new file mode 100644
index 0000000..36b1174
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/stats/testing/RealHistogram.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.stats.testing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.aurora.common.stats.Histogram;
+import org.apache.aurora.common.stats.Histograms;
+
+public class RealHistogram implements Histogram {
+  private final List<Long> buffer = new ArrayList<Long>();
+
+  @Override public void add(long x) {
+    buffer.add(x);
+  }
+
+  @Override public void clear() {
+    buffer.clear();
+  }
+
+  @Override public long getQuantile(double quantile) {
+    Collections.sort(buffer);
+    return buffer.get((int) (quantile * buffer.size()));
+  }
+
+  @Override public long[] getQuantiles(double[] quantiles) {
+    return Histograms.extractQuantiles(this, quantiles);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java 
b/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java
new file mode 100644
index 0000000..02db075
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/testing/TearDownRegistry.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.testing;
+
+import com.google.common.base.Preconditions;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.TearDownAccepter;
+
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.base.ExceptionalCommand;
+
+/**
+ * An action registry suitable for use as a shutdownRegistry in tests that 
extend
+ * {@link com.google.common.testing.junit4.TearDownTestCase}.
+ *
+ * @author John Sirois
+ */
+public class TearDownRegistry implements ShutdownRegistry {
+  private final TearDownAccepter tearDownAccepter;
+
+  /**
+   * Creates a new tear down registry that delegates execution of shutdown 
actions to a
+   * {@code tearDownAccepter}.
+   *
+   * @param tearDownAccepter A tear down accepter that will be used to 
register shutdown actions
+   *     with.
+   */
+  public TearDownRegistry(TearDownAccepter tearDownAccepter) {
+    this.tearDownAccepter = Preconditions.checkNotNull(tearDownAccepter);
+  }
+
+  @Override
+  public <E extends Exception, T extends ExceptionalCommand<E>> void 
addAction(final T action) {
+    tearDownAccepter.addTearDown(new TearDown() {
+      @Override public void tearDown() throws Exception {
+        action.execute();
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
 
b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
new file mode 100644
index 0000000..92ba0c3
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.testing.easymock;
+
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.WildcardType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.reflect.TypeToken;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+
+import static org.easymock.EasyMock.createControl;
+
+/**
+ * A baseclass for tests that use EasyMock.  A new {@link IMocksControl 
control} is set up before
+ * each test and the mocks created and replayed with it are verified during 
tear down.
+ *
+ * @author John Sirois
+ */
+public abstract class EasyMockTest extends TearDownTestCase {
+  protected IMocksControl control;
+
+  /**
+   * Creates an EasyMock {@link #control} for tests to use that will be 
automatically
+   * {@link IMocksControl#verify() verified} on tear down.
+   */
+  @Before
+  public final void setupEasyMock() {
+    control = createControl();
+    addTearDown(new TearDown() {
+      @Override public void tearDown() {
+        control.verify();
+      }
+    });
+  }
+
+  /**
+   * Creates an EasyMock mock with this test's control.  Will be
+   * {@link IMocksControl#verify() verified} in a tear down.
+   */
+  public <T> T createMock(Class<T> type) {
+    Preconditions.checkNotNull(type);
+    return control.createMock(type);
+  }
+
+  /**
+   * A class meant to be sub-classed in order to capture a generic type 
literal value.  To capture
+   * the type of a {@code List<String>} you would use: {@code new 
Clazz<List<String>>() {}}
+   */
+  public abstract static class Clazz<T> extends TypeToken {
+    Class<T> rawType() {
+      @SuppressWarnings("unchecked")
+      Class<T> rawType = (Class<T>) findRawType();
+      return rawType;
+    }
+
+    private Class<?> findRawType() {
+      if (getType() instanceof Class<?>) { // Plain old
+        return (Class<?>) getType();
+
+      } else if (getType() instanceof ParameterizedType) { // Nested type 
parameter
+        ParameterizedType parametrizedType = (ParameterizedType) getType();
+        Type rawType = parametrizedType.getRawType();
+        return (Class<?>) rawType;
+      } else if (getType() instanceof GenericArrayType) {
+        throw new IllegalStateException("cannot mock arrays, rejecting type: " 
+ getType());
+      } else if (getType() instanceof WildcardType) {
+        throw new IllegalStateException(
+            "wildcarded instantiations are not allowed in java, rejecting 
type: " + getType());
+      } else {
+        throw new IllegalArgumentException("Could not decode raw type for: " + 
getType());
+      }
+    }
+
+    public T createMock() {
+      return EasyMock.createMock(rawType());
+    }
+
+    public T createMock(IMocksControl control) {
+      return control.createMock(rawType());
+    }
+  }
+
+  /**
+   * Creates an EasyMock mock with this test's control.  Will be
+   * {@link IMocksControl#verify() verified} in a tear down.
+   *
+   * Allows for mocking of parameterized types without all the unchecked 
conversion warnings in a
+   * safe way.
+   */
+  public <T> T createMock(Clazz<T> type) {
+    Preconditions.checkNotNull(type);
+    return type.createMock(control);
+  }
+
+  /**
+   * A type-inferring convenience method for creating new captures.
+   */
+  public static <T> Capture<T> createCapture() {
+    return new Capture<T>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java
 
b/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java
new file mode 100644
index 0000000..bcd0a15
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/testing/easymock/IterableEquals.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.testing.easymock;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multiset;
+
+import org.easymock.IArgumentMatcher;
+
+import static org.easymock.EasyMock.reportMatcher;
+
+/**
+ * This EasyMock argument matcher tests Iterables for equality irrespective of 
order.
+ *
+ * @param <T> type argument for the Iterables being matched.
+ */
+public class IterableEquals<T> implements IArgumentMatcher {
+  private final Multiset<T> elements = HashMultiset.create();
+
+  /**
+   * Constructs an IterableEquals object that tests for equality against the 
specified expected
+   * Iterable.
+   *
+   * @param expected an Iterable containing the elements that are expected, in 
any order.
+   */
+  public IterableEquals(Iterable<T> expected) {
+    Iterables.addAll(elements, expected);
+  }
+
+  @Override
+  public boolean matches(Object observed) {
+    if (observed instanceof Iterable<?>) {
+      Multiset<Object> observedElements = HashMultiset.create((Iterable<?>) 
observed);
+      return elements.equals(observedElements);
+    }
+    return false;
+  }
+
+  @Override
+  public void appendTo(StringBuffer buffer) {
+    buffer.append("eqIterable(").append(elements).append(")");
+  }
+
+  /**
+   * When used in EasyMock expectations, this matches an Iterable having the 
same elements in any
+   * order.
+   *
+   * @return null, to avoid a compile time error.
+   */
+  public static <T> Iterable<T> eqIterable(Iterable<T> in) {
+    reportMatcher(new IterableEquals(in));
+    return null;
+  }
+
+  /**
+   * When used in EasyMock expectations, this matches a List having the same 
elements in any order.
+   *
+   * @return null, to avoid a compile time error.
+   */
+  public static <T> List<T> eqList(Iterable<T> in) {
+    reportMatcher(new IterableEquals(in));
+    return null;
+  }
+
+  /**
+   * When used in EasyMock expectations, this matches a Collection having the 
same elements in any
+   * order.
+   *
+   * @return null, to avoid a compile time error.
+   */
+  public static <T> Collection<T> eqCollection(Iterable<T> in) {
+    reportMatcher(new IterableEquals(in));
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java 
b/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java
new file mode 100644
index 0000000..3b97118
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/testing/junit/rules/Retry.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.testing.junit.rules;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+
+import org.junit.rules.MethodRule;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.Statement;
+
+/**
+ * A test method annotation useful for smoking out flaky behavior in tests.
+ *
+ * @see Retry.Rule RetryRule needed to enable this annotation in a test class.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Retry {
+
+  /**
+   * The number of times to retry the test.
+   *
+   * When a {@link Retry.Rule} is installed and a test method is annotated for 
{@literal @Retry},
+   * it will be retried 0 to N times.  If times is negative, it is treated as 
0 and no retries are
+   * performed.  If times is &gt;= 1 then a successful execution of the 
annotated test method is
+   * retried until the 1st error, failure or otherwise up to {@code times} 
times.
+   */
+  int times() default 1;
+
+  /**
+   * Enables {@link Retry @Retry}able tests.
+   */
+  class Rule implements MethodRule {
+    private interface ThrowableFactory {
+      Throwable create(String message, Throwable cause);
+    }
+
+    private static Throwable annotate(
+        int tryNumber,
+        final int maxRetries,
+        Throwable cause,
+        String prefix,
+        ThrowableFactory throwableFactory) {
+
+      Throwable annotated =
+          throwableFactory.create(
+              String.format("%s on try %d of %d: %s", prefix, tryNumber, 
maxRetries + 1,
+                  Objects.firstNonNull(cause.getMessage(), "")), cause);
+      annotated.setStackTrace(cause.getStackTrace());
+      return annotated;
+    }
+
+    static class RetriedAssertionError extends AssertionError {
+      private final int tryNumber;
+      private final int maxRetries;
+
+      RetriedAssertionError(int tryNumber, int maxRetries, String message, 
Throwable cause) {
+        // We do a manual initCause here to be compatible with the Java 1.6 
AssertionError
+        // constructors.
+        super(message);
+        initCause(cause);
+
+        this.tryNumber = tryNumber;
+        this.maxRetries = maxRetries;
+      }
+
+      @VisibleForTesting
+      int getTryNumber() {
+        return tryNumber;
+      }
+
+      @VisibleForTesting
+      int getMaxRetries() {
+        return maxRetries;
+      }
+    }
+
+    private static Throwable annotate(final int tryNumber, final int 
maxRetries, AssertionError e) {
+      return annotate(tryNumber, maxRetries, e, "Failure", new 
ThrowableFactory() {
+        @Override public Throwable create(String message, Throwable cause) {
+          return new RetriedAssertionError(tryNumber, maxRetries, message, 
cause);
+        }
+      });
+    }
+
+    static class RetriedException extends Exception {
+      private final int tryNumber;
+      private final int maxRetries;
+
+      RetriedException(int tryNumber, int maxRetries, String message, 
Throwable cause) {
+        super(message, cause);
+        this.tryNumber = tryNumber;
+        this.maxRetries = maxRetries;
+      }
+
+      @VisibleForTesting
+      int getTryNumber() {
+        return tryNumber;
+      }
+
+      @VisibleForTesting
+      int getMaxRetries() {
+        return maxRetries;
+      }
+    }
+
+    private static Throwable annotate(final int tryNumber, final int 
maxRetries, Exception e) {
+      return annotate(tryNumber, maxRetries, e, "Error", new 
ThrowableFactory() {
+        @Override public Throwable create(String message, Throwable cause) {
+          return new RetriedException(tryNumber, maxRetries, message, cause);
+        }
+      });
+    }
+
+    @Override
+    public Statement apply(final Statement statement, FrameworkMethod method, 
Object receiver) {
+      Retry retry = method.getAnnotation(Retry.class);
+      if (retry == null || retry.times() <= 0) {
+        return statement;
+      } else {
+        final int times = retry.times();
+        return new Statement() {
+          @Override public void evaluate() throws Throwable {
+            for (int i = 0; i <= times; i++) {
+              try {
+                statement.evaluate();
+              } catch (AssertionError e) {
+                throw annotate(i + 1, times, e);
+              // We purposefully catch any non-assertion exceptions in order 
to tag the try count
+              // for erroring (as opposed to failing) tests.
+              // SUPPRESS CHECKSTYLE RegexpSinglelineJava
+              } catch (Exception e) {
+                throw annotate(i + 1, times, e);
+              }
+            }
+          }
+        };
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java
 
b/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java
new file mode 100644
index 0000000..cef57cc
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/testing/mockito/MockitoTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.testing.mockito;
+
+import org.junit.Before;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * A base class for tests that use Mockito. Before each test, it initializes 
all the mocks
+ * declared in the class.
+ */
+public abstract class MockitoTest  {
+  /**
+   * Initializes all fields annotated with {@link org.mockito.Mock}.
+   */
+  @Before
+  public final void initMockito() {
+    MockitoAnnotations.initMocks(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/Config.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/Config.java 
b/commons/src/main/java/org/apache/aurora/common/thrift/Config.java
new file mode 100644
index 0000000..7ab122b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/thrift/Config.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.stats.StatsProvider;
+
+/**
+ * Represents the configuration for a thrift call.  Use {@link #builder()} to 
create a new one or
+ * or {@link #builder(Config)} to create a new config based on another config.
+ *
+ * <p>If a deadline is specified, it acts as a global timeout for each thrift 
call made.
+ * Obtaining connections, performing the remote call and executing retries are 
all expected to
+ * complete within this deadline.  When the specified deadline is not met, an
+ * {@link TTimeoutException} will be thrown.
+ *
+ * <p>If max retries is specified as zero (never retry), then the list of 
retryable exceptions are
+ * ignored.  It is only when max retries is greater than zero that list of 
retryable exceptions is
+ * used to determine if a particular failed call should be retried.
+ *
+ * @author John Sirois
+ */
+public class Config {
+
+  /**
+   * Created a builder for a new {@link Config}.  Default values are as 
follows:
+   * <ul>
+   * <li>{@link #getRequestTimeout()} 0
+   * <li>{@link #getMaxRetries()} 0
+   * <li>{@link #getRetryableExceptions()} []
+   * <li>{@link #isDebug()} ()} false
+   * </ul>
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   *
+   * @param config the builder configuration to use
+   */
+  public static Builder builder(Config config) {
+    Preconditions.checkNotNull(config);
+    return new Builder(config);
+  }
+
+  private static final Amount<Long,Time> DEADLINE_BLOCKING = Amount.of(0L, 
Time.MILLISECONDS);
+
+  @VisibleForTesting
+  static final Amount<Long,Time> DEFAULT_CONNECT_TIMEOUT = Amount.of(5L, 
Time.SECONDS);
+
+  private Amount<Long, Time> requestTimeout = DEADLINE_BLOCKING;
+  private Amount<Long, Time> connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+  private int maxRetries;
+  private ImmutableSet<Class<? extends Exception>> retryableExceptions = 
ImmutableSet.of();
+  private boolean debug = false;
+  private boolean enableStats = true;
+  private StatsProvider statsProvider = Stats.STATS_PROVIDER;
+
+  private Config() {
+    // defaults
+  }
+
+  private Config(Config copyFrom) {
+    requestTimeout = copyFrom.requestTimeout;
+    maxRetries = copyFrom.maxRetries;
+    retryableExceptions = copyFrom.retryableExceptions;
+    debug = copyFrom.debug;
+    statsProvider = copyFrom.statsProvider;
+  }
+
+  /**
+   * Returns the maximum time to wait for any thrift call to complete.  A 
deadline of 0 means to
+   * wait forever
+   */
+  public Amount<Long, Time> getRequestTimeout() {
+    return requestTimeout;
+  }
+
+  /**
+   * Returns the maximum time to wait for a connection to be established.  A 
deadline of 0 means to
+   * wait forever
+   */
+  public Amount<Long, Time> getConnectTimeout() {
+    return connectTimeout;
+  }
+
+  /**
+   * Returns the maximum number of retries to perform for each thrift call.  A 
value of 0 means to
+   * never retry and in this case {@link #getRetryableExceptions()} will be an 
empty set.
+   */
+  public int getMaxRetries() {
+    return maxRetries;
+  }
+
+  /**
+   * Returns the set of exceptions to retry calls for.  The returned set will 
only be empty if
+   * {@link #getMaxRetries()} is 0.
+   */
+  public ImmutableSet<Class<? extends Exception>> getRetryableExceptions() {
+    return retryableExceptions;
+  }
+
+  /**
+   * Returns {@code true} if the client should log extra debugging 
information.  Currently this
+   * includes method call arguments when RPCs fail with exceptions.
+   */
+  public boolean isDebug() {
+    return debug;
+  }
+
+  /**
+   * Returns {@code true} if the client should track request statistics.
+   */
+  public boolean enableStats() {
+    return enableStats;
+  }
+
+  /**
+   * Returns the stats provider to use to record Thrift client stats.
+   */
+  public StatsProvider getStatsProvider() {
+    return statsProvider;
+  }
+
+  // This was made public because it seems to be causing problems for scala 
users when it is not
+  // public.
+  public static abstract class AbstractBuilder<T extends AbstractBuilder> {
+    private final Config config;
+
+    AbstractBuilder() {
+      this.config = new Config();
+    }
+
+    AbstractBuilder(Config template) {
+      Preconditions.checkNotNull(template);
+      this.config = new Config(template);
+    }
+
+    protected abstract T getThis();
+
+    // TODO(John Sirois): extra validation or design ... can currently do 
strange things like:
+    // builder.blocking().withDeadline(1, TimeUnit.MILLISECONDS)
+    // builder.noRetries().retryOn(TException.class)
+
+    /**
+     * Specifies that all calls be blocking calls with no inherent deadline.  
It may be the
+     * case that underlying transports will eventually deadline, but {@link 
Thrift} will not
+     * enforce a deadline.
+     */
+    public final T blocking() {
+      config.requestTimeout = DEADLINE_BLOCKING;
+      return getThis();
+    }
+
+    /**
+     * Specifies that all calls be subject to a global timeout.  This deadline 
includes all call
+     * activities, including obtaining a free connection and any automatic 
retries.
+     */
+    public final T withRequestTimeout(Amount<Long, Time> timeout) {
+      Preconditions.checkNotNull(timeout);
+      Preconditions.checkArgument(timeout.getValue() >= 0,
+          "A negative deadline is invalid: %s", timeout);
+      config.requestTimeout = timeout;
+      return getThis();
+    }
+
+    /**
+     * Assigns the timeout for all connections established with the blocking 
client.
+     * On an asynchronous client this timeout is only used for the connection 
pool lock
+     * acquisition on initial calls (not retries, @see withRetries).  The 
actual network
+     * connection timeout for the asynchronous client is governed by 
socketTimeout.
+     *
+     * @param timeout Connection timeout.
+     * @return A reference to the builder.
+     */
+    public final T withConnectTimeout(Amount<Long, Time> timeout) {
+      Preconditions.checkNotNull(timeout);
+      Preconditions.checkArgument(timeout.getValue() >= 0,
+          "A negative deadline is invalid: %s", timeout);
+      config.connectTimeout = timeout;
+      return getThis();
+    }
+
+    /**
+     * Specifies that no calls be automatically retried.
+     */
+    public final T noRetries() {
+      config.maxRetries = 0;
+      config.retryableExceptions = ImmutableSet.of();
+      return getThis();
+    }
+
+    /**
+     * Specifies that failing calls meeting {@link #retryOn retry} criteria be 
retried up to a
+     * maximum of {@code retries} times before failing.  On an asynchronous 
client, these retries
+     * will be forced to be non-blocking, failing fast if they cannot 
immediately acquire the
+     * connection pool locks, so they only provide a best-effort retry 
strategy there.
+     */
+    public final T withRetries(int retries) {
+      Preconditions.checkArgument(retries >= 0, "A negative retry count is 
invalid: %d", retries);
+      config.maxRetries = retries;
+      return getThis();
+    }
+
+    /**
+     * Specifies the set of exception classes that are to be considered 
retryable (if retries are
+     * enabled).  Any exceptions thrown by the underlying thrift call will be 
considered retryable
+     * if they are an instance of any one of the specified exception classes.  
The set of exception
+     * classes must contain at least exception class.  To specify no retries 
either use
+     * {@link #noRetries()} or pass zero to {@link #withRetries(int)}.
+     */
+    public final T retryOn(Iterable<? extends Class<? extends Exception>> 
retryableExceptions) {
+      Preconditions.checkNotNull(retryableExceptions);
+      ImmutableSet<Class<? extends Exception>> classes =
+          ImmutableSet.copyOf(Iterables.filter(retryableExceptions, 
Predicates.notNull()));
+      Preconditions.checkArgument(!classes.isEmpty(),
+          "Must provide at least one retryable exception class");
+      config.retryableExceptions = classes;
+      return getThis();
+    }
+
+    /**
+     * Specifies the set of exception classes that are to be considered 
retryable (if retries are
+     * enabled).  Any exceptions thrown by the underlying thrift call will be 
considered retryable
+     * if they are an instance of any one of the specified exception classes.  
The set of exception
+     * classes must contain at least exception class.  To specify no retries 
either use
+     * {@link #noRetries()} or pass zero to {@link #withRetries(int)}.
+     */
+    public final T retryOn(Class<? extends Exception> exception) {
+      Preconditions.checkNotNull(exception);
+      config.retryableExceptions =
+          ImmutableSet.<Class<? extends 
Exception>>builder().add(exception).build();
+      return getThis();
+    }
+
+    /**
+     * When {@code debug == true}, specifies that extra debugging information 
should be logged.
+     */
+    public final T withDebug(boolean debug) {
+      config.debug = debug;
+      return getThis();
+    }
+
+    /**
+     * Disables stats collection on the client (enabled by default).
+     */
+    public T disableStats() {
+      config.enableStats = false;
+      return getThis();
+    }
+
+    /**
+     * Registers a custom stats provider to use to track various client stats.
+     */
+    public T withStatsProvider(StatsProvider statsProvider) {
+      config.statsProvider = Preconditions.checkNotNull(statsProvider);
+      return getThis();
+    }
+
+    protected final Config getConfig() {
+      return config;
+    }
+  }
+
+  public static final class Builder extends AbstractBuilder<Builder> {
+    private Builder() {
+      super();
+    }
+
+    private Builder(Config template) {
+      super(template);
+    }
+
+    @Override
+    protected Builder getThis() {
+      return this;
+    }
+
+    public Config create() {
+      return getConfig();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java
 
b/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java
new file mode 100644
index 0000000..54e2bd3
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/thrift/TResourceExhaustedException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.thrift;
+
+import org.apache.thrift.TException;
+
+/**
+ * @author Adam Samet
+ *
+ * This is exception is thrown when there are no available instances of a 
thrift backend
+ * service to serve requests.
+ */
+public class TResourceExhaustedException extends TException {
+
+  private static final long serialVersionUID = 1L;
+
+  public TResourceExhaustedException(String message) {
+    super(message);
+  }
+
+  public TResourceExhaustedException(Throwable cause) {
+    super(cause);
+  }
+
+  public TResourceExhaustedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java 
b/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java
new file mode 100644
index 0000000..068abea
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/thrift/TTimeoutException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.thrift;
+
+import org.apache.thrift.TException;
+
+/**
+ * @author Adam Samet
+ *
+ * This is exception is thrown when accessing a thrift service resource times 
out.
+ */
+public class TTimeoutException extends TException {
+
+  private static final long serialVersionUID = 1L;
+
+  public TTimeoutException(String message) {
+    super(message);
+  }
+
+  public TTimeoutException(Throwable cause) {
+    super(cause);
+  }
+
+  public TTimeoutException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java
 
b/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java
new file mode 100644
index 0000000..5e5df6d
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.thrift;
+
+import com.google.common.base.Preconditions;
+import org.apache.aurora.common.net.pool.Connection;
+import org.apache.aurora.common.net.pool.ConnectionPool;
+import org.apache.thrift.transport.TTransport;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A {@link ConnectionPool} compatible thrift connection that can work with 
any valid thrift
+ * transport.
+ *
+ * @author John Sirois
+ */
+public class TTransportConnection implements Connection<TTransport, 
InetSocketAddress> {
+
+  private final TTransport transport;
+  private final InetSocketAddress endpoint;
+
+  public TTransportConnection(TTransport transport, InetSocketAddress 
endpoint) {
+    this.transport = Preconditions.checkNotNull(transport);
+    this.endpoint = Preconditions.checkNotNull(endpoint);
+  }
+
+  /**
+   * Returns {@code true} if the underlying transport is still open.  To 
invalidate a transport it
+   * should be closed.
+   *
+   * <p>TODO(John Sirois): it seems like an improper soc to have validity 
testing here and not also an
+   * invalidation method - correct or accept
+   */
+  @Override
+  public boolean isValid() {
+    return transport.isOpen();
+  }
+
+  @Override
+  public TTransport get() {
+    return transport;
+  }
+
+  @Override
+  public void close() {
+    transport.close();
+  }
+
+  @Override
+  public InetSocketAddress getEndpoint() {
+    return endpoint;
+  }
+
+  @Override
+  public String toString() {
+    return endpoint.toString();
+  }
+}

Reply via email to