http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java
 
b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java
new file mode 100644
index 0000000..e87fe2c
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.http.handlers;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Splitter;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.net.MediaType;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.collections.Iterables2;
+import org.apache.aurora.common.stats.TimeSeries;
+import org.apache.aurora.common.stats.TimeSeriesRepository;
+
+/**
+ * A servlet that provides time series data in JSON format.
+ */
+public class TimeSeriesDataSource extends HttpServlet {
+
+  @VisibleForTesting static final String TIME_METRIC = "time";
+
+  private static final String METRICS = "metrics";
+  private static final String SINCE = "since";
+
+  private final TimeSeriesRepository timeSeriesRepo;
+  private final Gson gson = new Gson();
+
+  @Inject
+  public TimeSeriesDataSource(TimeSeriesRepository timeSeriesRepo) {
+    this.timeSeriesRepo = Preconditions.checkNotNull(timeSeriesRepo);
+  }
+
+  @VisibleForTesting
+  String getResponse(
+      @Nullable String metricsQuery,
+      @Nullable String sinceQuery) throws MetricException {
+
+    if (metricsQuery == null) {
+      // Return metric listing.
+      return 
gson.toJson(ImmutableList.copyOf(timeSeriesRepo.getAvailableSeries()));
+    }
+
+    List<Iterable<Number>> tsData = Lists.newArrayList();
+    tsData.add(timeSeriesRepo.getTimestamps());
+    // Ignore requests for "time" since it is implicitly returned.
+    Iterable<String> names = Iterables.filter(
+        Splitter.on(",").split(metricsQuery),
+        Predicates.not(Predicates.equalTo(TIME_METRIC)));
+    for (String metric : names) {
+      TimeSeries series = timeSeriesRepo.get(metric);
+      if (series == null) {
+        JsonObject response = new JsonObject();
+        response.addProperty("error", "Unknown metric " + metric);
+        throw new MetricException(gson.toJson(response));
+      }
+      tsData.add(series.getSamples());
+    }
+
+    final long since = 
Long.parseLong(Optional.fromNullable(sinceQuery).or("0"));
+    Predicate<List<Number>> sinceFilter = new Predicate<List<Number>>() {
+      @Override public boolean apply(List<Number> next) {
+        return next.get(0).longValue() > since;
+      }
+    };
+
+    ResponseStruct response = new ResponseStruct(
+        ImmutableList.<String>builder().add(TIME_METRIC).addAll(names).build(),
+        FluentIterable.from(Iterables2.zip(tsData, 
0)).filter(sinceFilter).toList());
+    return gson.toJson(response);
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+
+    resp.setContentType(MediaType.JSON_UTF_8.toString());
+    PrintWriter out = resp.getWriter();
+    try {
+      out.write(getResponse(req.getParameter(METRICS), 
req.getParameter(SINCE)));
+    } catch (MetricException e) {
+      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+      out.write(e.getMessage());
+    }
+  }
+
+  @VisibleForTesting
+  static class ResponseStruct {
+    // Fields must be non-final for deserialization.
+    List<String> names;
+    List<List<Number>> data;
+
+    ResponseStruct(List<String> names, List<List<Number>> data) {
+      this.names = names;
+      this.data = data;
+    }
+  }
+
+  @VisibleForTesting
+  static class MetricException extends Exception {
+    MetricException(String message) {
+      super(message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java
 
b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java
new file mode 100644
index 0000000..bf04525
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.http.handlers;
+
+import java.util.Collections;
+import java.util.List;
+
+import javax.servlet.http.HttpServletRequest;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.stats.Stat;
+
+/**
+ * HTTP handler that prints all registered variables and their current values.
+ *
+ * @author William Farner
+ */
+public class VarsHandler extends TextResponseHandler {
+
+  private static final Function<Stat, String> VAR_PRINTER = new Function<Stat, 
String>() {
+    @Override public String apply(Stat stat) {
+      return stat.getName() + " " + stat.read();
+    }
+  };
+
+  private final Supplier<Iterable<Stat<?>>> statSupplier;
+
+  /**
+   * Creates a new handler that will report stats from the provided supplier.
+   *
+   * @param statSupplier Stats supplier.
+   */
+  @Inject
+  public VarsHandler(Supplier<Iterable<Stat<?>>> statSupplier) {
+    this.statSupplier = Preconditions.checkNotNull(statSupplier);
+  }
+
+  @Override
+  public Iterable<String> getLines(HttpServletRequest request) {
+    List<String> lines = 
Lists.newArrayList(Iterables.transform(statSupplier.get(), VAR_PRINTER));
+    Collections.sort(lines);
+    return lines;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java
 
b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java
new file mode 100644
index 0000000..e97ec60
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.http.handlers;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.stats.Stat;
+
+/**
+ * A servlet that returns the current value of all variables in JSON format.
+ * The format returns a JSON object with string fields and typed values:
+ * <pre>
+ *   {
+ *     "var_a": 1,
+ *     "var_b": 126.0,
+ *     "var_c": "a string value",
+ *   }
+ * </pre>
+ * If the optional URL parameter 'pretty' is used, the output will be 
pretty-printed
+ * (similar to the above example).
+ *
+ * @author William Farner
+ */
+public class VarsJsonHandler extends HttpServlet {
+
+  private final Supplier<Iterable<Stat<?>>> statSupplier;
+
+  /**
+   * Creates a new handler that will report stats from the provided supplier.
+   *
+   * @param statSupplier Stats supplier.
+   */
+  @Inject
+  public VarsJsonHandler(Supplier<Iterable<Stat<?>>> statSupplier) {
+    this.statSupplier = Preconditions.checkNotNull(statSupplier);
+  }
+
+  @VisibleForTesting
+  String getBody(boolean pretty) {
+    Map<String, Object> vars = Maps.newLinkedHashMap();
+    for (Stat<?> var : statSupplier.get()) {
+      vars.put(var.getName(), var.read());
+    }
+    return getGson(pretty).toJson(vars);
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+
+    resp.setContentType("application/json");
+    resp.setStatus(HttpServletResponse.SC_OK);
+    PrintWriter responseBody = resp.getWriter();
+    try {
+      responseBody.print(getBody(req.getParameter("pretty") != null));
+    } finally {
+      responseBody.close();
+    }
+  }
+
+  private Gson getGson(boolean pretty) {
+    return pretty ? new GsonBuilder().setPrettyPrinting().create() : new 
Gson();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java
new file mode 100644
index 0000000..e0beb25
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+
+/**
+ * A load balancer that attempts to direct load towards a backend that has the 
fewest leased
+ * connections.
+ *
+ * @author William Farner
+ */
+public class LeastConnectedStrategy<S> extends StaticLoadBalancingStrategy<S> {
+  private static final Logger LOG = 
Logger.getLogger(LeastConnectedStrategy.class.getName());
+
+  // Maps from backends to the number of connections made to them.
+  private final Map<S, ConnectionStats> connections = Maps.newHashMap();
+
+  // Manages sorting of connection counts, with a reference back to the 
backend.
+  private final SortedSet<ConnectionStats> connectionStats = Sets.newTreeSet();
+
+  /**
+   * Encapsulates a set of connection stats that allow connections to be 
sorted as per the least
+   * connected strategy.
+   */
+  private class ConnectionStats implements Comparable<ConnectionStats> {
+    final S connectionKey;
+    final int connectionId;
+    int activeCount = 0; // Stores the total number of active connections.
+    long useCount = 0;  // Stores the total number times a connection has been 
used.
+
+    ConnectionStats(S connectionKey, int connectionId) {
+      this.connectionKey = connectionKey;
+      this.connectionId = connectionId;
+    }
+
+    @Override
+    public int compareTo(ConnectionStats other) {
+      // Sort by number of active connections first.
+      int difference = activeCount - other.activeCount;
+      if (difference != 0) {
+        return difference;
+      }
+
+      // Sub-sort by total number of times a connection has been used (this 
will ensure that
+      // all backends are exercised).
+      long useDifference = useCount - other.useCount;
+      if (useDifference != 0) {
+        return Long.signum(useDifference);
+      }
+
+      // If the above two are equal, break the tie using the connection id.
+      return connectionId - other.connectionId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      // We use ConnectionStats in a sorted container and so we need to have 
an equals
+      // implementation consistent with compareTo, ie:
+      // (x.compareTo(y) == 0) == x.equals(y)
+      // We accomplish this directly.
+
+      @SuppressWarnings("unchecked")
+      ConnectionStats other = (ConnectionStats) o;
+      return compareTo(other) == 0;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%d-%d", activeCount, useCount);
+    }
+  }
+
+  @Override
+  protected Collection<S> onBackendsOffered(Set<S> backends) {
+    Map<S, ConnectionStats> newConnections = 
Maps.newHashMapWithExpectedSize(backends.size());
+    Collection<ConnectionStats> newConnectionStats =
+        Lists.newArrayListWithCapacity(backends.size());
+
+    // Recreate all connection stats since their ordering may have changed and 
this is used for
+    // comparison tie breaks.
+    int backendId = 0;
+    for (S backend : backends) {
+      ConnectionStats stats = new ConnectionStats(backend, backendId++);
+
+      // Retain the activeCount for existing backends to prevent dogpiling 
existing active servers
+      ConnectionStats existing = connections.get(backend);
+      if (existing != null) {
+        stats.activeCount = existing.activeCount;
+      }
+
+      newConnections.put(backend, stats);
+      newConnectionStats.add(stats);
+    }
+
+    connections.clear();
+    connections.putAll(newConnections);
+    connectionStats.clear();
+    connectionStats.addAll(newConnectionStats);
+
+    return connections.keySet();
+  }
+
+  @Override
+  public S nextBackend() throws ResourceExhaustedException {
+    Preconditions.checkState(connections.size() == connectionStats.size());
+
+    if (connectionStats.isEmpty()) {
+      throw new ResourceExhaustedException("No backends.");
+    }
+
+    return connectionStats.first().connectionKey;
+  }
+
+  @Override
+  public void addConnectResult(S backendKey, ConnectionResult result, long 
connectTimeNanos) {
+    Preconditions.checkNotNull(backendKey);
+    Preconditions.checkState(connections.size() == connectionStats.size());
+    Preconditions.checkNotNull(result);
+
+    ConnectionStats stats = connections.get(backendKey);
+    Preconditions.checkNotNull(stats);
+
+    Preconditions.checkState(connectionStats.remove(stats));
+    if (result == ConnectionResult.SUCCESS) {
+      stats.activeCount++;
+    }
+    stats.useCount++;
+    Preconditions.checkState(connectionStats.add(stats));
+  }
+
+  @Override
+  public void connectionReturned(S backendKey) {
+    Preconditions.checkNotNull(backendKey);
+    Preconditions.checkState(connections.size() == connectionStats.size());
+
+    ConnectionStats stats = connections.get(backendKey);
+    Preconditions.checkNotNull(stats);
+
+    if (stats.activeCount > 0) {
+      Preconditions.checkState(connectionStats.remove(stats));
+      stats.activeCount--;
+      Preconditions.checkState(connectionStats.add(stats));
+    } else {
+      LOG.warning("connection stats dropped below zero, ignoring");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java
new file mode 100644
index 0000000..b15137d
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+import 
org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * A load balancer, which will be used to determine which of a set of backends 
should be connected
+ * to for service calls.  It is expected that the backends themselves can be 
changed at any time,
+ * and the load balancer should immediately restrict itself to using only 
those backends.
+ *
+ * It is likely that the load balancer implementation will periodically 
receive information about
+ * backends that it technically should no longer know about.  An example is 
calls to
+ * {@link #requestResult(Object, RequestResult, long)} and {@link 
#released(Object)} for
+ * in-flight requests after backends were changed by {@link 
#offerBackends(Set, Closure)}.
+ *
+ * @author William Farner
+ */
+public interface LoadBalancer<K> extends RequestTracker<K> {
+
+  /**
+   * Offers a set of backends that the load balancer should choose from to 
distribute load amongst.
+   *
+   * @param offeredBackends Backends to choose from.
+   * @param onBackendsChosen A callback that should be notified when the 
offered backends have been
+   *     (re)chosen from.
+   */
+  void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> 
onBackendsChosen);
+
+  /**
+   * Gets the next backend that a request should be sent to.
+   *
+   * @return Next backend to send a request.
+   * @throws ResourceExhaustedException If there are no available backends.
+   */
+  K nextBackend() throws ResourceExhaustedException;
+
+  /**
+   * Signals the load balancer that a connection was made.
+   *
+   * @param backend The backend that was connected to.
+   * @param connectTimeNanos The time spent waiting for the connection to be 
established.
+   */
+  void connected(K backend, long connectTimeNanos);
+
+  /**
+   * Signals the load balancer that a connection was attempted, but failed.
+   *
+   * @param backend The backend to which connection attempt was made.
+   * @param result The result of the connection attempt (only FAILED and 
TIMEOUT are permitted).
+   */
+  void connectFailed(K backend, ConnectionResult result);
+
+  /**
+   * Signals the load balancer that a connection was released, and is idle.
+   *
+   * @param connection Idle connection.
+   */
+  void released(K connection);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java
new file mode 100644
index 0000000..30e77c9
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import java.util.Collection;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Closure;
+import 
org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+
+/**
+ * Implementation of a load balancer, that uses a pluggable {@link 
LoadBalancingStrategy} to define
+ * actual load balancing behavior.  This class handles the responsibility of 
associating connections
+ * with backends.
+ *
+ * Calls to {@link #connected(Object, long)},
+ * {@link #requestResult(Object, RequestResult, long)}, and {@link 
#released(Object)} will not
+ * be forwarded for unknown backends/connections.
+ *
+ * @author William Farner
+ */
+public class LoadBalancerImpl<K> implements LoadBalancer<K> {
+
+  private final LoadBalancingStrategy<K> strategy;
+
+  private Set<K> offeredBackends = ImmutableSet.of();
+
+  /**
+   * Creates a new load balancer that will use the given strategy.
+   *
+   * @param strategy Strategy to delegate load balancing work to.
+   */
+  public LoadBalancerImpl(LoadBalancingStrategy<K> strategy) {
+    this.strategy = Preconditions.checkNotNull(strategy);
+  }
+
+  @Override
+  public synchronized void offerBackends(Set<K> offeredBackends,
+      final Closure<Collection<K>> onBackendsChosen) {
+    this.offeredBackends = ImmutableSet.copyOf(offeredBackends);
+    strategy.offerBackends(offeredBackends, new Closure<Collection<K>>() {
+      @Override public void execute(Collection<K> chosenBackends) {
+        onBackendsChosen.execute(chosenBackends);
+      }
+    });
+  }
+
+  @Override
+  public synchronized K nextBackend() throws ResourceExhaustedException {
+    return strategy.nextBackend();
+  }
+
+  @Override
+  public synchronized void connected(K backend, long connectTimeNanos) {
+    Preconditions.checkNotNull(backend);
+
+    if (!hasBackend(backend)) return;
+
+    strategy.addConnectResult(backend, ConnectionResult.SUCCESS, 
connectTimeNanos);
+  }
+
+  private boolean hasBackend(K backend) {
+    return offeredBackends.contains(backend);
+  }
+
+  @Override
+  public synchronized void connectFailed(K backend, ConnectionResult result) {
+    Preconditions.checkNotNull(backend);
+    Preconditions.checkNotNull(result);
+    Preconditions.checkArgument(result != ConnectionResult.SUCCESS);
+
+    if (!hasBackend(backend)) return;
+
+    strategy.addConnectResult(backend, result, 0);
+  }
+
+  @Override
+  public synchronized void released(K backend) {
+    Preconditions.checkNotNull(backend);
+
+    if (!hasBackend(backend)) return;
+
+    strategy.connectionReturned(backend);
+  }
+
+  @Override
+  public synchronized void requestResult(K backend, RequestResult result, long 
requestTimeNanos) {
+    Preconditions.checkNotNull(backend);
+    Preconditions.checkNotNull(result);
+
+    if (!hasBackend(backend)) return;
+
+    strategy.addRequestResult(backend, result, requestTimeNanos);
+  }
+
+  /**
+   * Convenience method to create a new load balancer.
+   *
+   * @param strategy Strategy to use.
+   * @param <K> Backend type.
+   * @return A new load balancer.
+   */
+  public static <K> LoadBalancerImpl<K>
+      create(LoadBalancingStrategy<K> strategy) {
+    return new LoadBalancerImpl<K>(strategy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java
new file mode 100644
index 0000000..7f33416
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * A strategy for balancing request load among backends.
+ *
+ * Strategies should be externally synchronized, and therefore do not have to 
worry about reentrant
+ * access.
+ *
+ * @author William Farner
+ */
+public interface LoadBalancingStrategy<K> {
+
+  /**
+   * Offers a set of backends that the load balancer should choose from to 
distribute load amongst.
+   *
+   * @param offeredBackends Backends to choose from.
+   * @param onBackendsChosen A callback that should be notified when the 
offered backends have been
+   *     (re)chosen from.
+   */
+  public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> 
onBackendsChosen);
+
+  /**
+   * Gets the next backend that a request should be sent to.
+   *
+   * @return Next backend to send a request.
+   * @throws ResourceExhaustedException If there are no available backends.
+   */
+  public K nextBackend() throws ResourceExhaustedException;
+
+  /**
+   * Offers information about a connection result.
+   *
+   * @param key Backend key.
+   * @param result Connection result.
+   * @param connectTimeNanos Time spent waiting for connection to be 
established.
+   */
+  public void addConnectResult(K key, ConnectionResult result, long 
connectTimeNanos);
+
+  /**
+   * Offers information about a connection that was returned.
+   *
+   * @param key Backend key.
+   */
+  public void connectionReturned(K key);
+
+  /**
+   * Offers information about a request result.
+   *
+   * @param key Backend key.
+   * @param result Request result.
+   * @param requestTimeNanos Time spent waiting for a connection to be 
established.
+   */
+  public void addRequestResult(K key, RequestTracker.RequestResult result, 
long requestTimeNanos);
+
+  enum ConnectionResult {
+    FAILED,
+    TIMEOUT,
+    SUCCESS
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java
new file mode 100644
index 0000000..05a4056
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+import org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult;
+import org.apache.aurora.common.util.BackoffDecider;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/**
+ * A load balancer that serves as a layer above another load balancer to mark 
hosts as dead, and
+ * prevent them from being visible to the wrapped load balancer.
+ * If all backends become marked as dead, they will all be unmarked.
+ *
+ * @author William Farner
+ */
+public class MarkDeadStrategy<S> implements LoadBalancingStrategy<S> {
+  private static final Logger LOG = 
Logger.getLogger(MarkDeadStrategy.class.getName());
+
+  private final LoadBalancingStrategy<S> wrappedStrategy;
+  private final Map<S, BackoffDecider> targets = Maps.newHashMap();
+  private final Function<S, BackoffDecider> backoffFactory;
+  protected final Predicate<S> hostChecker;
+
+  private Set<S> liveBackends = null;
+  private Closure<Collection<S>> onBackendsChosen = null;
+
+  // Flipped when we are in "forced live" mode, where all backends are 
considered dead and we
+  // send them all traffic as a last-ditch effort.
+  private boolean forcedLive = false;
+
+  /**
+   * Creates a mark dead strategy with a wrapped strategy, backoff decider 
factory
+   * and a predicate host checker. Use this constructor if you want to pass in 
the
+   * your own implementation of the host checker.
+   *
+   * @param wrappedStrategy one of the implementations of the load balancing 
strategy.
+   * @param backoffFactory backoff decider factory per host.
+   * @param hostChecker predicate that returns {@code true} if the host is 
alive, otherwise returns {@code false}.
+   */
+  public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy,
+      Function<S, BackoffDecider> backoffFactory, Predicate<S> hostChecker) {
+    this.wrappedStrategy = Preconditions.checkNotNull(wrappedStrategy);
+    this.backoffFactory = Preconditions.checkNotNull(backoffFactory);
+    this.hostChecker = Preconditions.checkNotNull(hostChecker);
+  }
+
+  /**
+   * Constructor that uses a default predicate host checker that always 
returns true.
+   * This is the default constructor that all consumers of MarkDeadStrategy 
currently use.
+   *
+   * @param wrappedStrategy one of the implementations of the load balancing 
strategy.
+   * @param backoffFactory backoff decider factory per host.
+   */
+  public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy,
+      Function<S, BackoffDecider> backoffFactory) {
+    this(wrappedStrategy, backoffFactory, Predicates.<S>alwaysTrue());
+  }
+
+  @Override
+  public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> 
onBackendsChosen) {
+    this.onBackendsChosen = onBackendsChosen;
+    targets.keySet().retainAll(offeredBackends);
+    for (S backend : offeredBackends) {
+      if (!targets.containsKey(backend)) {
+        targets.put(backend, backoffFactory.apply(backend));
+      }
+    }
+
+    adjustBackends();
+  }
+
+  @Override
+  public void addConnectResult(S backendKey, ConnectionResult result, long 
connectTimeNanos) {
+    Preconditions.checkNotNull(backendKey);
+    Preconditions.checkNotNull(result);
+
+    BackoffDecider decider = targets.get(backendKey);
+    Preconditions.checkNotNull(decider);
+
+    addResult(decider, result);
+    if (shouldNotifyFor(backendKey)) {
+      wrappedStrategy.addConnectResult(backendKey, result, connectTimeNanos);
+    }
+  }
+
+  @Override
+  public void connectionReturned(S backendKey) {
+    Preconditions.checkNotNull(backendKey);
+
+    if (shouldNotifyFor(backendKey)) {
+      wrappedStrategy.connectionReturned(backendKey);
+    }
+  }
+
+  @Override
+  public void addRequestResult(S requestKey, RequestResult result,
+      long requestTimeNanos) {
+    Preconditions.checkNotNull(requestKey);
+    Preconditions.checkNotNull(result);
+
+    BackoffDecider decider = targets.get(requestKey);
+    Preconditions.checkNotNull(decider);
+
+    addResult(decider, result);
+    if (shouldNotifyFor(requestKey)) {
+      wrappedStrategy.addRequestResult(requestKey, result, requestTimeNanos);
+    }
+  }
+
+  private void addResult(BackoffDecider decider, ConnectionResult result) {
+    switch (result) {
+      case FAILED:
+      case TIMEOUT:
+        addResult(decider, false);
+        break;
+      case SUCCESS:
+        addResult(decider, true);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unhandled result type " + 
result);
+    }
+  }
+
+  private void addResult(BackoffDecider decider, RequestTracker.RequestResult 
result) {
+    switch (result) {
+      case FAILED:
+      case TIMEOUT:
+        addResult(decider, false);
+        break;
+      case SUCCESS:
+        addResult(decider, true);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unhandled result type " + 
result);
+    }
+  }
+
+  private void addResult(BackoffDecider decider, boolean success) {
+    if (success) {
+      decider.addSuccess();
+    } else {
+      decider.addFailure();
+    }
+
+    // Check if any of the backends have moved into or out of dead state.
+    for (Map.Entry<S, BackoffDecider> entry : targets.entrySet()) {
+      boolean dead = entry.getValue().shouldBackOff();
+      boolean markedDead = !liveBackends.contains(entry.getKey());
+
+      // only check the servers that were marked dead before and see if we can
+      // connect to them, otherwise set dead to true.
+      if (markedDead && !dead) {
+        boolean alive = hostChecker.apply(entry.getKey());
+        if (!alive) {
+          entry.getValue().transitionToBackOff(0, true);
+        }
+        dead = !alive;
+      }
+
+      if (dead && !markedDead && forcedLive) {
+        // Do nothing here.  Since we have forced all backends to be live, we 
don't want to
+        // continually advertise the backend list to the wrapped strategy.
+      } else if (dead != markedDead || !dead && forcedLive) {
+        adjustBackends();
+        break;
+      }
+    }
+  }
+
+  private boolean shouldNotifyFor(S backend) {
+    return liveBackends.contains(backend);
+  }
+
+  private final Predicate<S> deadTargetFilter = new Predicate<S>() {
+      @Override public boolean apply(S backend) {
+        return !targets.get(backend).shouldBackOff();
+      }
+    };
+
+  private void adjustBackends() {
+    liveBackends = Sets.newHashSet(Iterables.filter(targets.keySet(), 
deadTargetFilter));
+    if (liveBackends.isEmpty()) {
+      liveBackends = targets.keySet();
+      forcedLive = true;
+    } else {
+      forcedLive = false;
+    }
+    LOG.info("Observed backend state change, changing live backends to " + 
liveBackends);
+    wrappedStrategy.offerBackends(liveBackends, onBackendsChosen);
+  }
+
+  @Override
+  public S nextBackend() throws ResourceExhaustedException {
+    return wrappedStrategy.nextBackend();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java
new file mode 100644
index 0000000..8170167
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.common.util.BackoffDecider;
+
+/**
+ * A load balancing strategy that extends the functionality of the mark dead 
strategy by
+ * integrating a hostChecker that allows hosts to transition out of a dead 
state
+ * if the most recent connection to the host was successful.
+ *
+ * @param <S> typically socket address of a backend host.
+ * @author Krishna Gade
+ */
+public class MarkDeadStrategyWithHostCheck<S> extends MarkDeadStrategy<S> {
+
+  /**
+   * LiveHostChecker implements Filter to determine whether a host is alive 
based on the
+   * result of the most recent connection attempt to that host. It keeps a map 
of
+   * backend -> last connection result, which gets updated every time someone 
tries to
+   * add to connection result.
+   */
+  protected static class LiveHostChecker<S> implements Predicate<S> {
+    private final Map<S, ConnectionResult> lastConnectionResult = 
Maps.newHashMap();
+
+    /**
+     * Adds the connection result of this backend to the last connection 
result map.
+     *
+     * @param backend typically the socket address of the backend.
+     * @param result result of what happened when the client tried to connect 
to this backend.
+     */
+    public void addConnectResult(S backend, ConnectionResult result) {
+      lastConnectionResult.put(backend, result);
+    }
+
+    /**
+     * Checks if the last connection result for this backend and returns 
{@code true} if it
+     * was {@link LoadBalancingStrategy.ConnectionResult#SUCCESS} otherwise 
returns {@code false}.
+     *
+     * @param backend typically the socket address of the backend.
+     */
+    @Override public boolean apply(S backend) {
+      ConnectionResult result = lastConnectionResult.get(backend);
+      return result != null && result == ConnectionResult.SUCCESS;
+    }
+  }
+
+  // Reference to the host checker we pass to the super class.
+  // We keep it here to avoid casting on every access to it.
+  protected final LiveHostChecker<S> liveHostChecker;
+
+  /**
+   * Creates a mark dead strategy with the given wrapped strategy and backoff 
decider factory.
+   * It uses a hostChecker {@link Predicate} that allows hosts to transition 
out
+   * of a dead state if the most recent connection to the host was successful.
+   *
+   * @param wrappedStrategy one of the implementations of the load balancing 
strategy.
+   * @param backoffFactory backoff decider factory per host.
+   */
+  public MarkDeadStrategyWithHostCheck(LoadBalancingStrategy<S> 
wrappedStrategy,
+      Function<S, BackoffDecider> backoffFactory) {
+    super(wrappedStrategy, backoffFactory, new LiveHostChecker<S>());
+    // Casting to LiveHostChecker is safe here as that's the only predicate 
that we pass to super.
+    this.liveHostChecker = ((LiveHostChecker<S>) hostChecker);
+  }
+
+
+  /**
+   * Overrides the base class implementation by adding this connection result 
to the
+   * host checker.
+   *
+   * @param backendKey typically the socket address of the backend.
+   * @param result result of what happened when the client tried to connect to 
this backend.
+   * @param connectTimeNanos time took to connect to the backend in nano 
seconds.
+   */
+  @Override
+  public void addConnectResult(S backendKey, ConnectionResult result, long 
connectTimeNanos) {
+    liveHostChecker.addConnectResult(backendKey, result);
+    super.addConnectResult(backendKey, result, connectTimeNanos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java
new file mode 100644
index 0000000..a8da980
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * A load balancer that selects a random backend each time a request is made..
+ *
+ * @author William Farner
+ */
+public class RandomStrategy<S> extends StaticLoadBalancingStrategy<S> {
+
+  private List<S> targets = Lists.newArrayList();
+  private final Random random;
+
+  public RandomStrategy() {
+    this(new Random());
+  }
+
+  @VisibleForTesting
+  RandomStrategy(Random random) {
+    this.random = Preconditions.checkNotNull(random);
+  }
+
+  @Override
+  protected Collection<S> onBackendsOffered(Set<S> targets) {
+    this.targets = ImmutableList.copyOf(targets);
+    return this.targets;
+  }
+
+  @Override
+  public S nextBackend() throws ResourceExhaustedException {
+    if (targets.isEmpty()) throw new ResourceExhaustedException("No 
backends.");
+    return targets.get(random.nextInt(targets.size()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java
new file mode 100644
index 0000000..745e2f8
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+/**
+ * Tracks requests made to a backend service.
+ *
+ * @author William Farner
+ */
+public interface RequestTracker<T> {
+
+  /**
+   * Informs the tracker of a completed request.
+   *
+   * @param key Key to identify the owner of the request.
+   * @param result Result of the request.
+   * @param requestTimeNanos Time duration spent waiting for the request to 
complete.
+   */
+  void requestResult(T key, RequestResult result, long requestTimeNanos);
+
+  enum RequestResult {
+    FAILED,
+    TIMEOUT,
+    SUCCESS
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java
new file mode 100644
index 0000000..5678331
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A load balancer that distributes load by randomizing the list of available 
backends, and then
+ * rotating through them evenly.
+ *
+ * @author William Farner
+ */
+public class RoundRobinStrategy<S> extends StaticLoadBalancingStrategy<S> {
+
+  private Iterator<S> iterator = Iterators.emptyIterator();
+
+  @Override
+  protected Collection<S> onBackendsOffered(Set<S> targets) {
+    List<S> newTargets = Lists.newArrayList(targets);
+    Collections.shuffle(newTargets);
+    iterator = Iterators.cycle(newTargets);
+    return newTargets;
+  }
+
+  @Override
+  public S nextBackend() throws ResourceExhaustedException {
+    if (!iterator.hasNext()) throw new ResourceExhaustedException("No backends 
available!");
+    return iterator.next();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java
new file mode 100644
index 0000000..b333b44
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * A baseclass for LoadBalancingStrategies that use a static set of backends 
they are
+ * {@link #offerBackends(java.util.Set, Closure) offered}.  Also acts as an
+ * adapter, providing no-op implementations of all other LoadBalancingStrategy 
methods that only
+ * need be overridden as required by subclass features.
+ *
+ * @author John Sirois
+ */
+abstract class StaticLoadBalancingStrategy<K> implements 
LoadBalancingStrategy<K> {
+
+  @Override
+  public final void offerBackends(Set<K> offeredBackends, 
Closure<Collection<K>> onBackendsChosen) {
+    onBackendsChosen.execute(onBackendsOffered(offeredBackends));
+  }
+
+  /**
+   * Subclasses must override and return a collection of the backends actually 
chosen for use until
+   * the next offer round.
+   *
+   * @param offeredBackends The backends offered in a {@link
+   *     #offerBackends(java.util.Set, Closure)} event.
+   * @return The collection of backends that will be used until the next offer 
event.
+   */
+  protected abstract Collection<K> onBackendsOffered(Set<K> offeredBackends);
+
+  @Override
+  public void addConnectResult(K backendKey, ConnectionResult result, long 
connectTimeNanos) {
+    // No-op.
+  }
+
+  @Override
+  public void connectionReturned(K backendKey) {
+    // No-op.
+  }
+
+  @Override
+  public void addRequestResult(K requestKey, RequestResult result, long 
requestTimeNanos) {
+    // No-op.
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java
new file mode 100644
index 0000000..0b852cf
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+import org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A load balancer that maintains a fixed upper bound on the number of 
backends that will be made
+ * available for a wrapped load balancer.
+ *
+ * TODO(William Farner): May want to consider periodically swapping subsets.
+ *
+ * TODO(William Farner): May want to catch ResourceExhaustedExceptions from 
wrapped strategy and adjust
+ *    subset if possible.
+ *
+ * @author William Farner
+ */
+public class SubsetStrategy<S> implements LoadBalancingStrategy<S> {
+  private final LoadBalancingStrategy<S> wrapped;
+  private final int maxBackends;
+
+  private Set<S> backendSubset = Sets.newHashSet();
+
+  public SubsetStrategy(int maxBackends, LoadBalancingStrategy<S> wrapped) {
+    Preconditions.checkArgument(maxBackends > 0);
+    this.maxBackends = maxBackends;
+    this.wrapped = Preconditions.checkNotNull(wrapped);
+  }
+
+  @Override
+  public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> 
onBackendsChosen) {
+    List<S> allTargets = Lists.newArrayList(offeredBackends);
+    Collections.shuffle(allTargets);
+    backendSubset = ImmutableSet.copyOf(
+        allTargets.subList(0, Math.min(maxBackends, allTargets.size())));
+    wrapped.offerBackends(backendSubset, onBackendsChosen);
+  }
+
+  @Override
+  public void addConnectResult(S backendKey, ConnectionResult result,
+      long connectTimeNanos) {
+    if (backendSubset.contains(backendKey)) {
+      wrapped.addConnectResult(backendKey, result, connectTimeNanos);
+    }
+  }
+
+  @Override
+  public void connectionReturned(S backendKey) {
+    if (backendSubset.contains(backendKey)) {
+      wrapped.connectionReturned(backendKey);
+    }
+  }
+
+  @Override
+  public void addRequestResult(S requestKey, RequestResult result, long 
requestTimeNanos) {
+    Preconditions.checkNotNull(requestKey);
+
+    if (backendSubset.contains(requestKey)) {
+      wrapped.addRequestResult(requestKey, result, requestTimeNanos);
+    }
+  }
+
+  @Override
+  public S nextBackend() throws ResourceExhaustedException {
+    return wrapped.nextBackend();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java
 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java
new file mode 100644
index 0000000..e0c5c35
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.loadbalancing;
+
+import com.google.common.base.Preconditions;
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.net.monitoring.TrafficMonitor;
+import org.apache.aurora.common.net.pool.ResourceExhaustedException;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * @author William Farner
+ */
+public class TrafficMonitorAdapter<K> implements LoadBalancingStrategy<K> {
+  private final LoadBalancingStrategy<K> strategy;
+  private final TrafficMonitor<K> monitor;
+
+  public TrafficMonitorAdapter(LoadBalancingStrategy<K> strategy, 
TrafficMonitor<K> monitor) {
+    this.strategy = Preconditions.checkNotNull(strategy);
+    this.monitor = Preconditions.checkNotNull(monitor);
+  }
+
+  public static <K> TrafficMonitorAdapter<K> create(LoadBalancingStrategy<K> 
strategy,
+      TrafficMonitor<K> monitor) {
+    return new TrafficMonitorAdapter<K>(strategy, monitor);
+  }
+
+  @Override
+  public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> 
onBackendsChosen) {
+    strategy.offerBackends(offeredBackends, onBackendsChosen);
+  }
+
+  @Override
+  public K nextBackend() throws ResourceExhaustedException {
+    return strategy.nextBackend();
+  }
+
+  @Override
+  public void addConnectResult(K key, ConnectionResult result, long 
connectTimeNanos) {
+    strategy.addConnectResult(key, result, connectTimeNanos);
+    if (result == ConnectionResult.SUCCESS) monitor.connected(key);
+  }
+
+  @Override
+  public void connectionReturned(K key) {
+    strategy.connectionReturned(key);
+    monitor.released(key);
+  }
+
+  @Override
+  public void addRequestResult(K key, RequestTracker.RequestResult result, 
long requestTimeNanos) {
+    strategy.addRequestResult(key, result, requestTimeNanos);
+    monitor.requestResult(key, result, requestTimeNanos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java
 
b/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java
new file mode 100644
index 0000000..4d32a71
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.monitoring;
+
+/**
+ * Monitors active connections between two hosts..
+ *
+ * @author William Farner
+ */
+public interface ConnectionMonitor<K> {
+
+  /**
+   * Instructs the monitor that a connection was established.
+   *
+   * @param connectionKey Key for the host that a connection was established 
with.
+   */
+  public void connected(K connectionKey);
+
+  /**
+   * Informs the monitor that a connection was released.
+   *
+   * @param connectionKey Key for the host that a connection was released for.
+   */
+  public void released(K connectionKey);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java
 
b/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java
new file mode 100644
index 0000000..fba1e4b
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.monitoring;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.net.loadbalancing.RequestTracker;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.common.util.concurrent.ExecutorServiceShutdown;
+
+/**
+ * Monitors activity on established connections between two hosts.  This can 
be used for a server
+ * to track inbound clients, or for a client to track requests sent to 
different servers.
+ *
+ * The monitor will retain information for hosts that may no longer be active, 
but will expunge
+ * information for hosts that have been idle for more than five minutes.
+ *
+ * @author William Farner
+ */
+public class TrafficMonitor<K> implements ConnectionMonitor<K>, 
RequestTracker<K> {
+
+  @VisibleForTesting
+  static final Amount<Long, Time> DEFAULT_GC_INTERVAL = Amount.of(5L, 
Time.MINUTES);
+
+  @GuardedBy("this")
+  private final LoadingCache<K, TrafficInfo> trafficInfos;
+
+  private final String serviceName;
+  private final Amount<Long, Time> gcInterval;
+
+  private AtomicLong lifetimeRequests = new AtomicLong();
+  private final Clock clock;
+  private final ScheduledExecutorService gcExecutor;
+
+  /**
+   * Creates a new traffic monitor using the default cleanup interval.
+   *
+   * @param serviceName Name of the service to monitor, used for creating 
variable names.
+   */
+  public TrafficMonitor(final String serviceName) {
+    this(serviceName, DEFAULT_GC_INTERVAL);
+  }
+
+  /**
+   * Creates a new traffic monitor with a custom cleanup interval.
+   *
+   * @param serviceName Service name for the monitor.
+   * @param gcInterval Interval on which the remote host garbage collector 
should run.
+   */
+  public TrafficMonitor(final String serviceName, Amount<Long, Time> 
gcInterval) {
+    this(serviceName, gcInterval, Clock.SYSTEM_CLOCK);
+  }
+
+  /**
+   * Convenience method to create a typed traffic monitor.
+   *
+   * @param serviceName Service name for the monitor.
+   * @param <T> Monitor type.
+   * @return A new traffic monitor.
+   */
+  public static <T> TrafficMonitor<T> create(String serviceName) {
+    return new TrafficMonitor<T>(serviceName);
+  }
+
+  @VisibleForTesting
+  TrafficMonitor(final String serviceName, Clock clock) {
+    this(serviceName, DEFAULT_GC_INTERVAL, clock);
+  }
+
+  private TrafficMonitor(final String serviceName, Amount<Long, Time> 
gcInterval, Clock clock) {
+    this.serviceName = MorePreconditions.checkNotBlank(serviceName);
+    this.clock = Preconditions.checkNotNull(clock);
+    Preconditions.checkNotNull(gcInterval);
+    Preconditions.checkArgument(gcInterval.getValue() > 0, "GC interval must 
be > zero.");
+    this.gcInterval = gcInterval;
+
+    trafficInfos = CacheBuilder.newBuilder().build(new CacheLoader<K, 
TrafficInfo>() {
+      @Override public TrafficInfo load(K key) {
+        return new TrafficInfo(key);
+      }
+    });
+
+    Runnable gc = new Runnable() {
+        @Override public void run() { gc(); }
+    };
+
+    gcExecutor = new ScheduledThreadPoolExecutor(1, new 
ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("TrafficMonitor-gc-%d").build());
+    gcExecutor.scheduleAtFixedRate(gc, gcInterval.as(Time.SECONDS), 
gcInterval.as(Time.SECONDS),
+        TimeUnit.SECONDS);
+  }
+
+  /**
+   * Gets the name of the service that this monitor is monitoring.
+   *
+   * @return Monitor's service name.
+   */
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  /**
+   * Gets the total number of requests that this monitor has observed, for all 
remote hosts.
+   *
+   * @return Total number of requests observed.
+   */
+  public long getLifetimeRequestCount() {
+    return lifetimeRequests.get();
+  }
+
+  /**
+   * Fetches all current traffic information.
+   *
+   * @return A map from the host key type to information about that host.
+   */
+  public synchronized Map<K, TrafficInfo> getTrafficInfo() {
+    return ImmutableMap.copyOf(trafficInfos.asMap());
+  }
+
+  @Override
+  public synchronized void connected(K key) {
+    Preconditions.checkNotNull(key);
+
+    trafficInfos.getUnchecked(key).incConnections();
+  }
+
+  @Override
+  public synchronized void released(K key) {
+    Preconditions.checkNotNull(key);
+
+    TrafficInfo info = trafficInfos.getUnchecked(key);
+
+    Preconditions.checkState(info.getConnectionCount() > 0, "Double release 
detected!");
+    info.decConnections();
+  }
+
+  @Override
+  public void requestResult(K key, RequestResult result, long 
requestTimeNanos) {
+    Preconditions.checkNotNull(key);
+
+    lifetimeRequests.incrementAndGet();
+    trafficInfos.getUnchecked(key).addResult(result);
+  }
+
+  @VisibleForTesting
+  synchronized void gc() {
+    Iterables.removeIf(trafficInfos.asMap().entrySet(),
+        new Predicate<Map.Entry<K, TrafficInfo>>() {
+          @Override public boolean apply(Map.Entry<K, TrafficInfo> clientInfo) 
{
+            if (clientInfo.getValue().connections.get() > 0) return false;
+
+            long idlePeriod = clock.nowNanos() - 
clientInfo.getValue().getLastActiveTimestamp();
+
+            return idlePeriod > gcInterval.as(Time.NANOSECONDS);
+          }
+        });
+  }
+
+  /**
+   * Shuts down TrafficMonitor by stopping background gc task.
+   */
+  public void shutdown() {
+    new ExecutorServiceShutdown(gcExecutor, Amount.of(0L, 
Time.SECONDS)).execute();
+  }
+
+  /**
+   * Information about traffic obsserved to/from a specific host.
+   */
+  public class TrafficInfo {
+    private final K key;
+    private AtomicInteger requestSuccesses = new AtomicInteger();
+    private AtomicInteger requestFailures = new AtomicInteger();
+    private AtomicInteger connections = new AtomicInteger();
+    private AtomicLong lastActive = new AtomicLong();
+
+    TrafficInfo(K key) {
+      this.key = key;
+      pulse();
+    }
+
+    void pulse() {
+      lastActive.set(clock.nowNanos());
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    void addResult(RequestResult result) {
+      pulse();
+      switch (result) {
+        case SUCCESS:
+          requestSuccesses.incrementAndGet();
+          break;
+        case FAILED:
+        case TIMEOUT:
+          requestFailures.incrementAndGet();
+          break;
+      }
+    }
+
+    public int getRequestSuccessCount() {
+      return requestSuccesses.get();
+    }
+
+    public int getRequestFailureCount() {
+      return requestFailures.get();
+    }
+
+    int incConnections() {
+      pulse();
+      return connections.incrementAndGet();
+    }
+
+    int decConnections() {
+      pulse();
+      return connections.decrementAndGet();
+    }
+
+    public int getConnectionCount() {
+      return connections.get();
+    }
+
+    public long getLastActiveTimestamp() {
+      return lastActive.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java 
b/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java
new file mode 100644
index 0000000..9309c11
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.base.Supplier;
+
+import java.io.Closeable;
+
+/**
+ * An interface to a connection resource that may become invalid.
+ *
+ * @author John Sirois
+ */
+public interface Connection<T, E> extends Supplier<T>, Closeable {
+
+  /**
+   * This will always be the same underlying connection for the lifetime of 
this object.
+   *
+   * @return the connection
+   */
+  @Override T get();
+
+  /**
+   * @return {@code true} if the supplied connection is valid for use.
+   */
+  boolean isValid();
+
+  /**
+   * Closes this connection.
+   */
+  void close();
+
+  /**
+   * @return the endpoint this connection is connected to.
+   */
+  E getEndpoint();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java
 
b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java
new file mode 100644
index 0000000..cdaaeab
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * A factory for connections that also dictates policy for the size of the 
connection population.
+ *
+ * <p>TODO(John Sirois): separate concerns - mixing in willCreate/null 
protocol is already tangling
+ * implementation code
+ *
+ * @author John Sirois
+ */
+public interface ConnectionFactory<S extends Connection<?, ?>> {
+
+  /**
+   * Checks whether this factory might create a connection if requested.
+   *
+   * @return {@code} true if this factory might create a connection at this 
point in time; ie
+   * a call to {@link #create} might not have returned {@code null}.  May 
return true to multiple
+   * threads if concurrently creating connections.
+   */
+  boolean mightCreate();
+
+  /**
+   * Attempts to create a new connection within the given timeout and subject 
to this factory's
+   * connection population size policy.
+   *
+   * @param timeout the maximum amount of time to wait
+   * @return a new connection or null if there are too many connections already
+   * @throws Exception if there was a problem creating the connection or 
establishing the connection
+   *     takes too long
+   */
+  S create(Amount<Long, Time> timeout) throws Exception;
+
+  /**
+   * Destroys a connection.  It is an error to attempt to destroy a connection 
this factory did
+   * not {@link #create}
+   *
+   * @param connection The connection to destroy.
+   */
+  void destroy(S connection);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java 
b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java
new file mode 100644
index 0000000..316bf2b
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.net.pool;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.aurora.common.base.Supplier;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.stats.StatsProvider;
+
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A generic connection pool that delegates growth policy to a {@link 
ConnectionFactory} and
+ * connection choice to a supplied strategy.
+ *
+ * <p>TODO(John Sirois): implement a reaper to clean up connections that may 
become invalid when not in
+ * use.
+ *
+ * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
+ *
+ * @author John Sirois
+ */
+public final class ConnectionPool<S extends Connection<?, ?>> implements 
ObjectPool<S> {
+
+  private static final Logger LOG = 
Logger.getLogger(ConnectionPool.class.getName());
+
+  private final Set<S> leasedConnections =
+      Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap());
+  private final Set<S> availableConnections = Sets.newHashSet();
+  private final Lock poolLock;
+  private final Condition available;
+
+  private final ConnectionFactory<S> connectionFactory;
+  private final Executor executor;
+
+  private volatile boolean closed;
+  private final AtomicLong connectionsCreated;
+  private final AtomicLong connectionsDestroyed;
+  private final AtomicLong connectionsReturned;
+
+  /**
+   * Creates a connection pool with a connection picker that selects the first 
item in the set of
+   * available connections, exporting statistics to stats provider {@link 
Stats#STATS_PROVIDER}.
+   *
+   * @param connectionFactory Factory to create and destroy connections.
+   */
+  public ConnectionPool(ConnectionFactory<S> connectionFactory) {
+    this(connectionFactory, Stats.STATS_PROVIDER);
+  }
+
+  /**
+   * Creates a connection pool with a connection picker that selects the first 
item in the set of
+   * available connections and uses the supplied StatsProvider to register 
stats with.
+   *
+   * @param connectionFactory Factory to create and destroy connections.
+   * @param statsProvider Stats export provider.
+   */
+  public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider 
statsProvider) {
+    this(Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder()
+            .setNameFormat("CP-" + connectionFactory + "[%d]")
+            .setDaemon(true)
+            .build()),
+        new ReentrantLock(true), connectionFactory, statsProvider);
+  }
+
+  @VisibleForTesting
+  ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> 
connectionFactory,
+      StatsProvider statsProvider) {
+    Preconditions.checkNotNull(executor);
+    Preconditions.checkNotNull(poolLock);
+    Preconditions.checkNotNull(connectionFactory);
+    Preconditions.checkNotNull(statsProvider);
+
+    this.executor = executor;
+    this.poolLock = poolLock;
+    available = poolLock.newCondition();
+    this.connectionFactory = connectionFactory;
+
+    String cfName = Stats.normalizeName(connectionFactory.toString());
+    statsProvider.makeGauge("cp_leased_connections_" + cfName,
+        new Supplier<Integer>() {
+      @Override public Integer get() {
+        return leasedConnections.size();
+      }
+    });
+    statsProvider.makeGauge("cp_available_connections_" + cfName,
+        new Supplier<Integer>() {
+          @Override public Integer get() {
+            return availableConnections.size();
+          }
+        });
+    this.connectionsCreated =
+        statsProvider.makeCounter("cp_created_connections_" + cfName);
+    this.connectionsDestroyed =
+        statsProvider.makeCounter("cp_destroyed_connections_" + cfName);
+    this.connectionsReturned =
+        statsProvider.makeCounter("cp_returned_connections_" + cfName);
+  }
+
+  @Override
+  public String toString() {
+    return "CP-" + connectionFactory;
+  }
+
+  @Override
+  public S get() throws ResourceExhaustedException, TimeoutException {
+    checkNotClosed();
+    poolLock.lock();
+    try {
+      return leaseConnection(NO_TIMEOUT);
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  @Override
+  public S get(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+
+    checkNotClosed();
+    Preconditions.checkNotNull(timeout);
+    if (timeout.getValue() == 0) {
+      return get();
+    }
+
+    try {
+      long start = System.nanoTime();
+      long timeBudgetNs = timeout.as(Time.NANOSECONDS);
+      if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) {
+        try {
+          timeBudgetNs -= (System.nanoTime() - start);
+          return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS));
+        } finally {
+          poolLock.unlock();
+        }
+      } else {
+        throw new TimeoutException("Timed out waiting for pool lock");
+      }
+    } catch (InterruptedException e) {
+      throw new TimeoutException("Interrupted waiting for pool lock");
+    }
+  }
+
+  private S leaseConnection(Amount<Long, Time> timeout) throws 
ResourceExhaustedException,
+      TimeoutException {
+    S connection = getConnection(timeout);
+    if (connection == null) {
+      throw new ResourceExhaustedException("Connection pool resources 
exhausted");
+    }
+    return leaseConnection(connection);
+  }
+
+  @Override
+  public void release(S connection) {
+    release(connection, false);
+  }
+
+  /**
+   * Equivalent to releasing a Connection with isValid() == false.
+   * @see ObjectPool#remove(Object)
+   */
+  @Override
+  public void remove(S connection) {
+    release(connection, true);
+  }
+
+  // TODO(John Sirois): release could block indefinitely if someone is blocked 
in get() on a create
+  // connection - reason about this and potentially submit release to our 
executor
+  private void release(S connection, boolean remove) {
+    poolLock.lock();
+    try {
+      if (!leasedConnections.remove(connection)) {
+        throw new IllegalArgumentException("Connection not controlled by this 
connection pool: "
+                                           + connection);
+      }
+
+      if (!closed && !remove && connection.isValid()) {
+        addConnection(connection);
+        connectionsReturned.incrementAndGet();
+      } else {
+        connectionFactory.destroy(connection);
+        connectionsDestroyed.incrementAndGet();
+      }
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() {
+    poolLock.lock();
+    try {
+      for (S availableConnection : availableConnections) {
+        connectionFactory.destroy(availableConnection);
+      }
+    } finally {
+      closed = true;
+      poolLock.unlock();
+    }
+  }
+
+  private void checkNotClosed() {
+    Preconditions.checkState(!closed);
+  }
+
+  private S leaseConnection(S connection) {
+    leasedConnections.add(connection);
+    return connection;
+  }
+
+  // TODO(John Sirois): pool growth is serialized by poolLock currently - it 
seems like this could be
+  // fixed but there may be no need - do gedankanalysis
+  private S getConnection(final Amount<Long, Time> timeout) throws 
ResourceExhaustedException,
+      TimeoutException {
+    if (availableConnections.isEmpty()) {
+      if (leasedConnections.isEmpty()) {
+        // Completely empty pool
+        try {
+          return createConnection(timeout);
+        } catch (Exception e) {
+          throw new ResourceExhaustedException("failed to create a new 
connection", e);
+        }
+      } else {
+        // If the pool is allowed to grow - let the connection factory race a 
release
+        if (connectionFactory.mightCreate()) {
+          executor.execute(new Runnable() {
+            @Override public void run() {
+              try {
+                // The connection timeout is not needed here to honor the 
callers get requested
+                // timeout, but we don't want to have an infinite timeout 
which could exhaust a
+                // thread pool over many backgrounded create calls
+                S connection = createConnection(timeout);
+                if (connection != null) {
+                  addConnection(connection);
+                } else {
+                  LOG.log(Level.WARNING, "Failed to create a new connection 
for a waiting client " +
+                      "due to maximum pool size or timeout");
+                }
+              } catch (Exception e) {
+                LOG.log(Level.WARNING, "Failed to create a new connection for 
a waiting client", e);
+              }
+            }
+          });
+        }
+
+        try {
+          // We wait for a returned/new connection here in loops to guard 
against the
+          // "spurious wakeups" that are documented can occur with 
Condition.await()
+          if (timeout.getValue() == 0) {
+            while(availableConnections.isEmpty()) {
+              available.await();
+            }
+          } else {
+            long timeRemainingNs = timeout.as(Time.NANOSECONDS);
+            while(availableConnections.isEmpty()) {
+              long start = System.nanoTime();
+              if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) {
+                throw new TimeoutException(
+                    "timeout waiting for a connection to be released to the 
pool");
+              } else {
+                timeRemainingNs -= (System.nanoTime() - start);
+              }
+            }
+            if (availableConnections.isEmpty()) {
+              throw new TimeoutException(
+                  "timeout waiting for a connection to be released to the 
pool");
+            }
+          }
+        } catch (InterruptedException e) {
+          throw new TimeoutException("Interrupted while waiting for a 
connection.");
+        }
+      }
+    }
+
+    return getAvailableConnection();
+  }
+
+  private S getAvailableConnection() {
+    S connection = (availableConnections.size() == 1)
+        ? Iterables.getOnlyElement(availableConnections)
+        : availableConnections.iterator().next();
+    if (!availableConnections.remove(connection)) {
+      throw new IllegalArgumentException("Connection picked not in pool: " + 
connection);
+    }
+    return connection;
+  }
+
+  private S createConnection(Amount<Long, Time> timeout) throws Exception {
+    S connection = connectionFactory.create(timeout);
+    if (connection != null) {
+      connectionsCreated.incrementAndGet();
+    }
+    return connection;
+  }
+
+  private void addConnection(S connection) {
+    poolLock.lock();
+    try {
+      availableConnections.add(connection);
+      available.signal();
+    } finally {
+      poolLock.unlock();
+    }
+  }
+}

Reply via email to