http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java new file mode 100644 index 0000000..e87fe2c --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/TimeSeriesDataSource.java @@ -0,0 +1,134 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.http.handlers; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; + +import javax.annotation.Nullable; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Splitter; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.net.MediaType; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.inject.Inject; + +import org.apache.aurora.common.collections.Iterables2; +import org.apache.aurora.common.stats.TimeSeries; +import org.apache.aurora.common.stats.TimeSeriesRepository; + +/** + * A servlet that provides time series data in JSON format. + */ +public class TimeSeriesDataSource extends HttpServlet { + + @VisibleForTesting static final String TIME_METRIC = "time"; + + private static final String METRICS = "metrics"; + private static final String SINCE = "since"; + + private final TimeSeriesRepository timeSeriesRepo; + private final Gson gson = new Gson(); + + @Inject + public TimeSeriesDataSource(TimeSeriesRepository timeSeriesRepo) { + this.timeSeriesRepo = Preconditions.checkNotNull(timeSeriesRepo); + } + + @VisibleForTesting + String getResponse( + @Nullable String metricsQuery, + @Nullable String sinceQuery) throws MetricException { + + if (metricsQuery == null) { + // Return metric listing. + return gson.toJson(ImmutableList.copyOf(timeSeriesRepo.getAvailableSeries())); + } + + List<Iterable<Number>> tsData = Lists.newArrayList(); + tsData.add(timeSeriesRepo.getTimestamps()); + // Ignore requests for "time" since it is implicitly returned. + Iterable<String> names = Iterables.filter( + Splitter.on(",").split(metricsQuery), + Predicates.not(Predicates.equalTo(TIME_METRIC))); + for (String metric : names) { + TimeSeries series = timeSeriesRepo.get(metric); + if (series == null) { + JsonObject response = new JsonObject(); + response.addProperty("error", "Unknown metric " + metric); + throw new MetricException(gson.toJson(response)); + } + tsData.add(series.getSamples()); + } + + final long since = Long.parseLong(Optional.fromNullable(sinceQuery).or("0")); + Predicate<List<Number>> sinceFilter = new Predicate<List<Number>>() { + @Override public boolean apply(List<Number> next) { + return next.get(0).longValue() > since; + } + }; + + ResponseStruct response = new ResponseStruct( + ImmutableList.<String>builder().add(TIME_METRIC).addAll(names).build(), + FluentIterable.from(Iterables2.zip(tsData, 0)).filter(sinceFilter).toList()); + return gson.toJson(response); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + + resp.setContentType(MediaType.JSON_UTF_8.toString()); + PrintWriter out = resp.getWriter(); + try { + out.write(getResponse(req.getParameter(METRICS), req.getParameter(SINCE))); + } catch (MetricException e) { + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + out.write(e.getMessage()); + } + } + + @VisibleForTesting + static class ResponseStruct { + // Fields must be non-final for deserialization. + List<String> names; + List<List<Number>> data; + + ResponseStruct(List<String> names, List<List<Number>> data) { + this.names = names; + this.data = data; + } + } + + @VisibleForTesting + static class MetricException extends Exception { + MetricException(String message) { + super(message); + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java new file mode 100644 index 0000000..bf04525 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsHandler.java @@ -0,0 +1,61 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.http.handlers; + +import java.util.Collections; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.inject.Inject; + +import org.apache.aurora.common.stats.Stat; + +/** + * HTTP handler that prints all registered variables and their current values. + * + * @author William Farner + */ +public class VarsHandler extends TextResponseHandler { + + private static final Function<Stat, String> VAR_PRINTER = new Function<Stat, String>() { + @Override public String apply(Stat stat) { + return stat.getName() + " " + stat.read(); + } + }; + + private final Supplier<Iterable<Stat<?>>> statSupplier; + + /** + * Creates a new handler that will report stats from the provided supplier. + * + * @param statSupplier Stats supplier. + */ + @Inject + public VarsHandler(Supplier<Iterable<Stat<?>>> statSupplier) { + this.statSupplier = Preconditions.checkNotNull(statSupplier); + } + + @Override + public Iterable<String> getLines(HttpServletRequest request) { + List<String> lines = Lists.newArrayList(Iterables.transform(statSupplier.get(), VAR_PRINTER)); + Collections.sort(lines); + return lines; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java new file mode 100644 index 0000000..e97ec60 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/http/handlers/VarsJsonHandler.java @@ -0,0 +1,90 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.http.handlers; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Map; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.inject.Inject; + +import org.apache.aurora.common.stats.Stat; + +/** + * A servlet that returns the current value of all variables in JSON format. + * The format returns a JSON object with string fields and typed values: + * <pre> + * { + * "var_a": 1, + * "var_b": 126.0, + * "var_c": "a string value", + * } + * </pre> + * If the optional URL parameter 'pretty' is used, the output will be pretty-printed + * (similar to the above example). + * + * @author William Farner + */ +public class VarsJsonHandler extends HttpServlet { + + private final Supplier<Iterable<Stat<?>>> statSupplier; + + /** + * Creates a new handler that will report stats from the provided supplier. + * + * @param statSupplier Stats supplier. + */ + @Inject + public VarsJsonHandler(Supplier<Iterable<Stat<?>>> statSupplier) { + this.statSupplier = Preconditions.checkNotNull(statSupplier); + } + + @VisibleForTesting + String getBody(boolean pretty) { + Map<String, Object> vars = Maps.newLinkedHashMap(); + for (Stat<?> var : statSupplier.get()) { + vars.put(var.getName(), var.read()); + } + return getGson(pretty).toJson(vars); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + + resp.setContentType("application/json"); + resp.setStatus(HttpServletResponse.SC_OK); + PrintWriter responseBody = resp.getWriter(); + try { + responseBody.print(getBody(req.getParameter("pretty") != null)); + } finally { + responseBody.close(); + } + } + + private Gson getGson(boolean pretty) { + return pretty ? new GsonBuilder().setPrettyPrinting().create() : new Gson(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java new file mode 100644 index 0000000..e0beb25 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LeastConnectedStrategy.java @@ -0,0 +1,170 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.aurora.common.net.pool.ResourceExhaustedException; + +/** + * A load balancer that attempts to direct load towards a backend that has the fewest leased + * connections. + * + * @author William Farner + */ +public class LeastConnectedStrategy<S> extends StaticLoadBalancingStrategy<S> { + private static final Logger LOG = Logger.getLogger(LeastConnectedStrategy.class.getName()); + + // Maps from backends to the number of connections made to them. + private final Map<S, ConnectionStats> connections = Maps.newHashMap(); + + // Manages sorting of connection counts, with a reference back to the backend. + private final SortedSet<ConnectionStats> connectionStats = Sets.newTreeSet(); + + /** + * Encapsulates a set of connection stats that allow connections to be sorted as per the least + * connected strategy. + */ + private class ConnectionStats implements Comparable<ConnectionStats> { + final S connectionKey; + final int connectionId; + int activeCount = 0; // Stores the total number of active connections. + long useCount = 0; // Stores the total number times a connection has been used. + + ConnectionStats(S connectionKey, int connectionId) { + this.connectionKey = connectionKey; + this.connectionId = connectionId; + } + + @Override + public int compareTo(ConnectionStats other) { + // Sort by number of active connections first. + int difference = activeCount - other.activeCount; + if (difference != 0) { + return difference; + } + + // Sub-sort by total number of times a connection has been used (this will ensure that + // all backends are exercised). + long useDifference = useCount - other.useCount; + if (useDifference != 0) { + return Long.signum(useDifference); + } + + // If the above two are equal, break the tie using the connection id. + return connectionId - other.connectionId; + } + + @Override + public boolean equals(Object o) { + // We use ConnectionStats in a sorted container and so we need to have an equals + // implementation consistent with compareTo, ie: + // (x.compareTo(y) == 0) == x.equals(y) + // We accomplish this directly. + + @SuppressWarnings("unchecked") + ConnectionStats other = (ConnectionStats) o; + return compareTo(other) == 0; + } + + @Override + public String toString() { + return String.format("%d-%d", activeCount, useCount); + } + } + + @Override + protected Collection<S> onBackendsOffered(Set<S> backends) { + Map<S, ConnectionStats> newConnections = Maps.newHashMapWithExpectedSize(backends.size()); + Collection<ConnectionStats> newConnectionStats = + Lists.newArrayListWithCapacity(backends.size()); + + // Recreate all connection stats since their ordering may have changed and this is used for + // comparison tie breaks. + int backendId = 0; + for (S backend : backends) { + ConnectionStats stats = new ConnectionStats(backend, backendId++); + + // Retain the activeCount for existing backends to prevent dogpiling existing active servers + ConnectionStats existing = connections.get(backend); + if (existing != null) { + stats.activeCount = existing.activeCount; + } + + newConnections.put(backend, stats); + newConnectionStats.add(stats); + } + + connections.clear(); + connections.putAll(newConnections); + connectionStats.clear(); + connectionStats.addAll(newConnectionStats); + + return connections.keySet(); + } + + @Override + public S nextBackend() throws ResourceExhaustedException { + Preconditions.checkState(connections.size() == connectionStats.size()); + + if (connectionStats.isEmpty()) { + throw new ResourceExhaustedException("No backends."); + } + + return connectionStats.first().connectionKey; + } + + @Override + public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) { + Preconditions.checkNotNull(backendKey); + Preconditions.checkState(connections.size() == connectionStats.size()); + Preconditions.checkNotNull(result); + + ConnectionStats stats = connections.get(backendKey); + Preconditions.checkNotNull(stats); + + Preconditions.checkState(connectionStats.remove(stats)); + if (result == ConnectionResult.SUCCESS) { + stats.activeCount++; + } + stats.useCount++; + Preconditions.checkState(connectionStats.add(stats)); + } + + @Override + public void connectionReturned(S backendKey) { + Preconditions.checkNotNull(backendKey); + Preconditions.checkState(connections.size() == connectionStats.size()); + + ConnectionStats stats = connections.get(backendKey); + Preconditions.checkNotNull(stats); + + if (stats.activeCount > 0) { + Preconditions.checkState(connectionStats.remove(stats)); + stats.activeCount--; + Preconditions.checkState(connectionStats.add(stats)); + } else { + LOG.warning("connection stats dropped below zero, ignoring"); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java new file mode 100644 index 0000000..b15137d --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancer.java @@ -0,0 +1,76 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; +import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; + +import java.util.Collection; +import java.util.Set; + +/** + * A load balancer, which will be used to determine which of a set of backends should be connected + * to for service calls. It is expected that the backends themselves can be changed at any time, + * and the load balancer should immediately restrict itself to using only those backends. + * + * It is likely that the load balancer implementation will periodically receive information about + * backends that it technically should no longer know about. An example is calls to + * {@link #requestResult(Object, RequestResult, long)} and {@link #released(Object)} for + * in-flight requests after backends were changed by {@link #offerBackends(Set, Closure)}. + * + * @author William Farner + */ +public interface LoadBalancer<K> extends RequestTracker<K> { + + /** + * Offers a set of backends that the load balancer should choose from to distribute load amongst. + * + * @param offeredBackends Backends to choose from. + * @param onBackendsChosen A callback that should be notified when the offered backends have been + * (re)chosen from. + */ + void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen); + + /** + * Gets the next backend that a request should be sent to. + * + * @return Next backend to send a request. + * @throws ResourceExhaustedException If there are no available backends. + */ + K nextBackend() throws ResourceExhaustedException; + + /** + * Signals the load balancer that a connection was made. + * + * @param backend The backend that was connected to. + * @param connectTimeNanos The time spent waiting for the connection to be established. + */ + void connected(K backend, long connectTimeNanos); + + /** + * Signals the load balancer that a connection was attempted, but failed. + * + * @param backend The backend to which connection attempt was made. + * @param result The result of the connection attempt (only FAILED and TIMEOUT are permitted). + */ + void connectFailed(K backend, ConnectionResult result); + + /** + * Signals the load balancer that a connection was released, and is idle. + * + * @param connection Idle connection. + */ + void released(K connection); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java new file mode 100644 index 0000000..30e77c9 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancerImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import java.util.Collection; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; + +/** + * Implementation of a load balancer, that uses a pluggable {@link LoadBalancingStrategy} to define + * actual load balancing behavior. This class handles the responsibility of associating connections + * with backends. + * + * Calls to {@link #connected(Object, long)}, + * {@link #requestResult(Object, RequestResult, long)}, and {@link #released(Object)} will not + * be forwarded for unknown backends/connections. + * + * @author William Farner + */ +public class LoadBalancerImpl<K> implements LoadBalancer<K> { + + private final LoadBalancingStrategy<K> strategy; + + private Set<K> offeredBackends = ImmutableSet.of(); + + /** + * Creates a new load balancer that will use the given strategy. + * + * @param strategy Strategy to delegate load balancing work to. + */ + public LoadBalancerImpl(LoadBalancingStrategy<K> strategy) { + this.strategy = Preconditions.checkNotNull(strategy); + } + + @Override + public synchronized void offerBackends(Set<K> offeredBackends, + final Closure<Collection<K>> onBackendsChosen) { + this.offeredBackends = ImmutableSet.copyOf(offeredBackends); + strategy.offerBackends(offeredBackends, new Closure<Collection<K>>() { + @Override public void execute(Collection<K> chosenBackends) { + onBackendsChosen.execute(chosenBackends); + } + }); + } + + @Override + public synchronized K nextBackend() throws ResourceExhaustedException { + return strategy.nextBackend(); + } + + @Override + public synchronized void connected(K backend, long connectTimeNanos) { + Preconditions.checkNotNull(backend); + + if (!hasBackend(backend)) return; + + strategy.addConnectResult(backend, ConnectionResult.SUCCESS, connectTimeNanos); + } + + private boolean hasBackend(K backend) { + return offeredBackends.contains(backend); + } + + @Override + public synchronized void connectFailed(K backend, ConnectionResult result) { + Preconditions.checkNotNull(backend); + Preconditions.checkNotNull(result); + Preconditions.checkArgument(result != ConnectionResult.SUCCESS); + + if (!hasBackend(backend)) return; + + strategy.addConnectResult(backend, result, 0); + } + + @Override + public synchronized void released(K backend) { + Preconditions.checkNotNull(backend); + + if (!hasBackend(backend)) return; + + strategy.connectionReturned(backend); + } + + @Override + public synchronized void requestResult(K backend, RequestResult result, long requestTimeNanos) { + Preconditions.checkNotNull(backend); + Preconditions.checkNotNull(result); + + if (!hasBackend(backend)) return; + + strategy.addRequestResult(backend, result, requestTimeNanos); + } + + /** + * Convenience method to create a new load balancer. + * + * @param strategy Strategy to use. + * @param <K> Backend type. + * @return A new load balancer. + */ + public static <K> LoadBalancerImpl<K> + create(LoadBalancingStrategy<K> strategy) { + return new LoadBalancerImpl<K>(strategy); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java new file mode 100644 index 0000000..7f33416 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/LoadBalancingStrategy.java @@ -0,0 +1,79 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; + +import java.util.Collection; +import java.util.Set; + +/** + * A strategy for balancing request load among backends. + * + * Strategies should be externally synchronized, and therefore do not have to worry about reentrant + * access. + * + * @author William Farner + */ +public interface LoadBalancingStrategy<K> { + + /** + * Offers a set of backends that the load balancer should choose from to distribute load amongst. + * + * @param offeredBackends Backends to choose from. + * @param onBackendsChosen A callback that should be notified when the offered backends have been + * (re)chosen from. + */ + public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen); + + /** + * Gets the next backend that a request should be sent to. + * + * @return Next backend to send a request. + * @throws ResourceExhaustedException If there are no available backends. + */ + public K nextBackend() throws ResourceExhaustedException; + + /** + * Offers information about a connection result. + * + * @param key Backend key. + * @param result Connection result. + * @param connectTimeNanos Time spent waiting for connection to be established. + */ + public void addConnectResult(K key, ConnectionResult result, long connectTimeNanos); + + /** + * Offers information about a connection that was returned. + * + * @param key Backend key. + */ + public void connectionReturned(K key); + + /** + * Offers information about a request result. + * + * @param key Backend key. + * @param result Request result. + * @param requestTimeNanos Time spent waiting for a connection to be established. + */ + public void addRequestResult(K key, RequestTracker.RequestResult result, long requestTimeNanos); + + enum ConnectionResult { + FAILED, + TIMEOUT, + SUCCESS + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java new file mode 100644 index 0000000..05a4056 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategy.java @@ -0,0 +1,220 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; +import org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult; +import org.apache.aurora.common.util.BackoffDecider; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; + +/** + * A load balancer that serves as a layer above another load balancer to mark hosts as dead, and + * prevent them from being visible to the wrapped load balancer. + * If all backends become marked as dead, they will all be unmarked. + * + * @author William Farner + */ +public class MarkDeadStrategy<S> implements LoadBalancingStrategy<S> { + private static final Logger LOG = Logger.getLogger(MarkDeadStrategy.class.getName()); + + private final LoadBalancingStrategy<S> wrappedStrategy; + private final Map<S, BackoffDecider> targets = Maps.newHashMap(); + private final Function<S, BackoffDecider> backoffFactory; + protected final Predicate<S> hostChecker; + + private Set<S> liveBackends = null; + private Closure<Collection<S>> onBackendsChosen = null; + + // Flipped when we are in "forced live" mode, where all backends are considered dead and we + // send them all traffic as a last-ditch effort. + private boolean forcedLive = false; + + /** + * Creates a mark dead strategy with a wrapped strategy, backoff decider factory + * and a predicate host checker. Use this constructor if you want to pass in the + * your own implementation of the host checker. + * + * @param wrappedStrategy one of the implementations of the load balancing strategy. + * @param backoffFactory backoff decider factory per host. + * @param hostChecker predicate that returns {@code true} if the host is alive, otherwise returns {@code false}. + */ + public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy, + Function<S, BackoffDecider> backoffFactory, Predicate<S> hostChecker) { + this.wrappedStrategy = Preconditions.checkNotNull(wrappedStrategy); + this.backoffFactory = Preconditions.checkNotNull(backoffFactory); + this.hostChecker = Preconditions.checkNotNull(hostChecker); + } + + /** + * Constructor that uses a default predicate host checker that always returns true. + * This is the default constructor that all consumers of MarkDeadStrategy currently use. + * + * @param wrappedStrategy one of the implementations of the load balancing strategy. + * @param backoffFactory backoff decider factory per host. + */ + public MarkDeadStrategy(LoadBalancingStrategy<S> wrappedStrategy, + Function<S, BackoffDecider> backoffFactory) { + this(wrappedStrategy, backoffFactory, Predicates.<S>alwaysTrue()); + } + + @Override + public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> onBackendsChosen) { + this.onBackendsChosen = onBackendsChosen; + targets.keySet().retainAll(offeredBackends); + for (S backend : offeredBackends) { + if (!targets.containsKey(backend)) { + targets.put(backend, backoffFactory.apply(backend)); + } + } + + adjustBackends(); + } + + @Override + public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) { + Preconditions.checkNotNull(backendKey); + Preconditions.checkNotNull(result); + + BackoffDecider decider = targets.get(backendKey); + Preconditions.checkNotNull(decider); + + addResult(decider, result); + if (shouldNotifyFor(backendKey)) { + wrappedStrategy.addConnectResult(backendKey, result, connectTimeNanos); + } + } + + @Override + public void connectionReturned(S backendKey) { + Preconditions.checkNotNull(backendKey); + + if (shouldNotifyFor(backendKey)) { + wrappedStrategy.connectionReturned(backendKey); + } + } + + @Override + public void addRequestResult(S requestKey, RequestResult result, + long requestTimeNanos) { + Preconditions.checkNotNull(requestKey); + Preconditions.checkNotNull(result); + + BackoffDecider decider = targets.get(requestKey); + Preconditions.checkNotNull(decider); + + addResult(decider, result); + if (shouldNotifyFor(requestKey)) { + wrappedStrategy.addRequestResult(requestKey, result, requestTimeNanos); + } + } + + private void addResult(BackoffDecider decider, ConnectionResult result) { + switch (result) { + case FAILED: + case TIMEOUT: + addResult(decider, false); + break; + case SUCCESS: + addResult(decider, true); + break; + default: + throw new UnsupportedOperationException("Unhandled result type " + result); + } + } + + private void addResult(BackoffDecider decider, RequestTracker.RequestResult result) { + switch (result) { + case FAILED: + case TIMEOUT: + addResult(decider, false); + break; + case SUCCESS: + addResult(decider, true); + break; + default: + throw new UnsupportedOperationException("Unhandled result type " + result); + } + } + + private void addResult(BackoffDecider decider, boolean success) { + if (success) { + decider.addSuccess(); + } else { + decider.addFailure(); + } + + // Check if any of the backends have moved into or out of dead state. + for (Map.Entry<S, BackoffDecider> entry : targets.entrySet()) { + boolean dead = entry.getValue().shouldBackOff(); + boolean markedDead = !liveBackends.contains(entry.getKey()); + + // only check the servers that were marked dead before and see if we can + // connect to them, otherwise set dead to true. + if (markedDead && !dead) { + boolean alive = hostChecker.apply(entry.getKey()); + if (!alive) { + entry.getValue().transitionToBackOff(0, true); + } + dead = !alive; + } + + if (dead && !markedDead && forcedLive) { + // Do nothing here. Since we have forced all backends to be live, we don't want to + // continually advertise the backend list to the wrapped strategy. + } else if (dead != markedDead || !dead && forcedLive) { + adjustBackends(); + break; + } + } + } + + private boolean shouldNotifyFor(S backend) { + return liveBackends.contains(backend); + } + + private final Predicate<S> deadTargetFilter = new Predicate<S>() { + @Override public boolean apply(S backend) { + return !targets.get(backend).shouldBackOff(); + } + }; + + private void adjustBackends() { + liveBackends = Sets.newHashSet(Iterables.filter(targets.keySet(), deadTargetFilter)); + if (liveBackends.isEmpty()) { + liveBackends = targets.keySet(); + forcedLive = true; + } else { + forcedLive = false; + } + LOG.info("Observed backend state change, changing live backends to " + liveBackends); + wrappedStrategy.offerBackends(liveBackends, onBackendsChosen); + } + + @Override + public S nextBackend() throws ResourceExhaustedException { + return wrappedStrategy.nextBackend(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java new file mode 100644 index 0000000..8170167 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/MarkDeadStrategyWithHostCheck.java @@ -0,0 +1,98 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import java.util.Map; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Maps; + +import org.apache.aurora.common.util.BackoffDecider; + +/** + * A load balancing strategy that extends the functionality of the mark dead strategy by + * integrating a hostChecker that allows hosts to transition out of a dead state + * if the most recent connection to the host was successful. + * + * @param <S> typically socket address of a backend host. + * @author Krishna Gade + */ +public class MarkDeadStrategyWithHostCheck<S> extends MarkDeadStrategy<S> { + + /** + * LiveHostChecker implements Filter to determine whether a host is alive based on the + * result of the most recent connection attempt to that host. It keeps a map of + * backend -> last connection result, which gets updated every time someone tries to + * add to connection result. + */ + protected static class LiveHostChecker<S> implements Predicate<S> { + private final Map<S, ConnectionResult> lastConnectionResult = Maps.newHashMap(); + + /** + * Adds the connection result of this backend to the last connection result map. + * + * @param backend typically the socket address of the backend. + * @param result result of what happened when the client tried to connect to this backend. + */ + public void addConnectResult(S backend, ConnectionResult result) { + lastConnectionResult.put(backend, result); + } + + /** + * Checks if the last connection result for this backend and returns {@code true} if it + * was {@link LoadBalancingStrategy.ConnectionResult#SUCCESS} otherwise returns {@code false}. + * + * @param backend typically the socket address of the backend. + */ + @Override public boolean apply(S backend) { + ConnectionResult result = lastConnectionResult.get(backend); + return result != null && result == ConnectionResult.SUCCESS; + } + } + + // Reference to the host checker we pass to the super class. + // We keep it here to avoid casting on every access to it. + protected final LiveHostChecker<S> liveHostChecker; + + /** + * Creates a mark dead strategy with the given wrapped strategy and backoff decider factory. + * It uses a hostChecker {@link Predicate} that allows hosts to transition out + * of a dead state if the most recent connection to the host was successful. + * + * @param wrappedStrategy one of the implementations of the load balancing strategy. + * @param backoffFactory backoff decider factory per host. + */ + public MarkDeadStrategyWithHostCheck(LoadBalancingStrategy<S> wrappedStrategy, + Function<S, BackoffDecider> backoffFactory) { + super(wrappedStrategy, backoffFactory, new LiveHostChecker<S>()); + // Casting to LiveHostChecker is safe here as that's the only predicate that we pass to super. + this.liveHostChecker = ((LiveHostChecker<S>) hostChecker); + } + + + /** + * Overrides the base class implementation by adding this connection result to the + * host checker. + * + * @param backendKey typically the socket address of the backend. + * @param result result of what happened when the client tried to connect to this backend. + * @param connectTimeNanos time took to connect to the backend in nano seconds. + */ + @Override + public void addConnectResult(S backendKey, ConnectionResult result, long connectTimeNanos) { + liveHostChecker.addConnectResult(backendKey, result); + super.addConnectResult(backendKey, result, connectTimeNanos); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java new file mode 100644 index 0000000..a8da980 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RandomStrategy.java @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; + +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.Set; + +/** + * A load balancer that selects a random backend each time a request is made.. + * + * @author William Farner + */ +public class RandomStrategy<S> extends StaticLoadBalancingStrategy<S> { + + private List<S> targets = Lists.newArrayList(); + private final Random random; + + public RandomStrategy() { + this(new Random()); + } + + @VisibleForTesting + RandomStrategy(Random random) { + this.random = Preconditions.checkNotNull(random); + } + + @Override + protected Collection<S> onBackendsOffered(Set<S> targets) { + this.targets = ImmutableList.copyOf(targets); + return this.targets; + } + + @Override + public S nextBackend() throws ResourceExhaustedException { + if (targets.isEmpty()) throw new ResourceExhaustedException("No backends."); + return targets.get(random.nextInt(targets.size())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java new file mode 100644 index 0000000..745e2f8 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RequestTracker.java @@ -0,0 +1,37 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +/** + * Tracks requests made to a backend service. + * + * @author William Farner + */ +public interface RequestTracker<T> { + + /** + * Informs the tracker of a completed request. + * + * @param key Key to identify the owner of the request. + * @param result Result of the request. + * @param requestTimeNanos Time duration spent waiting for the request to complete. + */ + void requestResult(T key, RequestResult result, long requestTimeNanos); + + enum RequestResult { + FAILED, + TIMEOUT, + SUCCESS + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java new file mode 100644 index 0000000..5678331 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/RoundRobinStrategy.java @@ -0,0 +1,49 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * A load balancer that distributes load by randomizing the list of available backends, and then + * rotating through them evenly. + * + * @author William Farner + */ +public class RoundRobinStrategy<S> extends StaticLoadBalancingStrategy<S> { + + private Iterator<S> iterator = Iterators.emptyIterator(); + + @Override + protected Collection<S> onBackendsOffered(Set<S> targets) { + List<S> newTargets = Lists.newArrayList(targets); + Collections.shuffle(newTargets); + iterator = Iterators.cycle(newTargets); + return newTargets; + } + + @Override + public S nextBackend() throws ResourceExhaustedException { + if (!iterator.hasNext()) throw new ResourceExhaustedException("No backends available!"); + return iterator.next(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java new file mode 100644 index 0000000..b333b44 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/StaticLoadBalancingStrategy.java @@ -0,0 +1,61 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult; + +import java.util.Collection; +import java.util.Set; + +/** + * A baseclass for LoadBalancingStrategies that use a static set of backends they are + * {@link #offerBackends(java.util.Set, Closure) offered}. Also acts as an + * adapter, providing no-op implementations of all other LoadBalancingStrategy methods that only + * need be overridden as required by subclass features. + * + * @author John Sirois + */ +abstract class StaticLoadBalancingStrategy<K> implements LoadBalancingStrategy<K> { + + @Override + public final void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen) { + onBackendsChosen.execute(onBackendsOffered(offeredBackends)); + } + + /** + * Subclasses must override and return a collection of the backends actually chosen for use until + * the next offer round. + * + * @param offeredBackends The backends offered in a {@link + * #offerBackends(java.util.Set, Closure)} event. + * @return The collection of backends that will be used until the next offer event. + */ + protected abstract Collection<K> onBackendsOffered(Set<K> offeredBackends); + + @Override + public void addConnectResult(K backendKey, ConnectionResult result, long connectTimeNanos) { + // No-op. + } + + @Override + public void connectionReturned(K backendKey) { + // No-op. + } + + @Override + public void addRequestResult(K requestKey, RequestResult result, long requestTimeNanos) { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java new file mode 100644 index 0000000..0b852cf --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/SubsetStrategy.java @@ -0,0 +1,89 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; +import org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * A load balancer that maintains a fixed upper bound on the number of backends that will be made + * available for a wrapped load balancer. + * + * TODO(William Farner): May want to consider periodically swapping subsets. + * + * TODO(William Farner): May want to catch ResourceExhaustedExceptions from wrapped strategy and adjust + * subset if possible. + * + * @author William Farner + */ +public class SubsetStrategy<S> implements LoadBalancingStrategy<S> { + private final LoadBalancingStrategy<S> wrapped; + private final int maxBackends; + + private Set<S> backendSubset = Sets.newHashSet(); + + public SubsetStrategy(int maxBackends, LoadBalancingStrategy<S> wrapped) { + Preconditions.checkArgument(maxBackends > 0); + this.maxBackends = maxBackends; + this.wrapped = Preconditions.checkNotNull(wrapped); + } + + @Override + public void offerBackends(Set<S> offeredBackends, Closure<Collection<S>> onBackendsChosen) { + List<S> allTargets = Lists.newArrayList(offeredBackends); + Collections.shuffle(allTargets); + backendSubset = ImmutableSet.copyOf( + allTargets.subList(0, Math.min(maxBackends, allTargets.size()))); + wrapped.offerBackends(backendSubset, onBackendsChosen); + } + + @Override + public void addConnectResult(S backendKey, ConnectionResult result, + long connectTimeNanos) { + if (backendSubset.contains(backendKey)) { + wrapped.addConnectResult(backendKey, result, connectTimeNanos); + } + } + + @Override + public void connectionReturned(S backendKey) { + if (backendSubset.contains(backendKey)) { + wrapped.connectionReturned(backendKey); + } + } + + @Override + public void addRequestResult(S requestKey, RequestResult result, long requestTimeNanos) { + Preconditions.checkNotNull(requestKey); + + if (backendSubset.contains(requestKey)) { + wrapped.addRequestResult(requestKey, result, requestTimeNanos); + } + } + + @Override + public S nextBackend() throws ResourceExhaustedException { + return wrapped.nextBackend(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java new file mode 100644 index 0000000..e0c5c35 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/loadbalancing/TrafficMonitorAdapter.java @@ -0,0 +1,68 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.loadbalancing; + +import com.google.common.base.Preconditions; +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.monitoring.TrafficMonitor; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; + +import java.util.Collection; +import java.util.Set; + +/** + * @author William Farner + */ +public class TrafficMonitorAdapter<K> implements LoadBalancingStrategy<K> { + private final LoadBalancingStrategy<K> strategy; + private final TrafficMonitor<K> monitor; + + public TrafficMonitorAdapter(LoadBalancingStrategy<K> strategy, TrafficMonitor<K> monitor) { + this.strategy = Preconditions.checkNotNull(strategy); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public static <K> TrafficMonitorAdapter<K> create(LoadBalancingStrategy<K> strategy, + TrafficMonitor<K> monitor) { + return new TrafficMonitorAdapter<K>(strategy, monitor); + } + + @Override + public void offerBackends(Set<K> offeredBackends, Closure<Collection<K>> onBackendsChosen) { + strategy.offerBackends(offeredBackends, onBackendsChosen); + } + + @Override + public K nextBackend() throws ResourceExhaustedException { + return strategy.nextBackend(); + } + + @Override + public void addConnectResult(K key, ConnectionResult result, long connectTimeNanos) { + strategy.addConnectResult(key, result, connectTimeNanos); + if (result == ConnectionResult.SUCCESS) monitor.connected(key); + } + + @Override + public void connectionReturned(K key) { + strategy.connectionReturned(key); + monitor.released(key); + } + + @Override + public void addRequestResult(K key, RequestTracker.RequestResult result, long requestTimeNanos) { + strategy.addRequestResult(key, result, requestTimeNanos); + monitor.requestResult(key, result, requestTimeNanos); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java b/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java new file mode 100644 index 0000000..4d32a71 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/monitoring/ConnectionMonitor.java @@ -0,0 +1,36 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.monitoring; + +/** + * Monitors active connections between two hosts.. + * + * @author William Farner + */ +public interface ConnectionMonitor<K> { + + /** + * Instructs the monitor that a connection was established. + * + * @param connectionKey Key for the host that a connection was established with. + */ + public void connected(K connectionKey); + + /** + * Informs the monitor that a connection was released. + * + * @param connectionKey Key for the host that a connection was released for. + */ + public void released(K connectionKey); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java b/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java new file mode 100644 index 0000000..fba1e4b --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/monitoring/TrafficMonitor.java @@ -0,0 +1,259 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.monitoring; + +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.net.loadbalancing.RequestTracker; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.common.util.concurrent.ExecutorServiceShutdown; + +/** + * Monitors activity on established connections between two hosts. This can be used for a server + * to track inbound clients, or for a client to track requests sent to different servers. + * + * The monitor will retain information for hosts that may no longer be active, but will expunge + * information for hosts that have been idle for more than five minutes. + * + * @author William Farner + */ +public class TrafficMonitor<K> implements ConnectionMonitor<K>, RequestTracker<K> { + + @VisibleForTesting + static final Amount<Long, Time> DEFAULT_GC_INTERVAL = Amount.of(5L, Time.MINUTES); + + @GuardedBy("this") + private final LoadingCache<K, TrafficInfo> trafficInfos; + + private final String serviceName; + private final Amount<Long, Time> gcInterval; + + private AtomicLong lifetimeRequests = new AtomicLong(); + private final Clock clock; + private final ScheduledExecutorService gcExecutor; + + /** + * Creates a new traffic monitor using the default cleanup interval. + * + * @param serviceName Name of the service to monitor, used for creating variable names. + */ + public TrafficMonitor(final String serviceName) { + this(serviceName, DEFAULT_GC_INTERVAL); + } + + /** + * Creates a new traffic monitor with a custom cleanup interval. + * + * @param serviceName Service name for the monitor. + * @param gcInterval Interval on which the remote host garbage collector should run. + */ + public TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval) { + this(serviceName, gcInterval, Clock.SYSTEM_CLOCK); + } + + /** + * Convenience method to create a typed traffic monitor. + * + * @param serviceName Service name for the monitor. + * @param <T> Monitor type. + * @return A new traffic monitor. + */ + public static <T> TrafficMonitor<T> create(String serviceName) { + return new TrafficMonitor<T>(serviceName); + } + + @VisibleForTesting + TrafficMonitor(final String serviceName, Clock clock) { + this(serviceName, DEFAULT_GC_INTERVAL, clock); + } + + private TrafficMonitor(final String serviceName, Amount<Long, Time> gcInterval, Clock clock) { + this.serviceName = MorePreconditions.checkNotBlank(serviceName); + this.clock = Preconditions.checkNotNull(clock); + Preconditions.checkNotNull(gcInterval); + Preconditions.checkArgument(gcInterval.getValue() > 0, "GC interval must be > zero."); + this.gcInterval = gcInterval; + + trafficInfos = CacheBuilder.newBuilder().build(new CacheLoader<K, TrafficInfo>() { + @Override public TrafficInfo load(K key) { + return new TrafficInfo(key); + } + }); + + Runnable gc = new Runnable() { + @Override public void run() { gc(); } + }; + + gcExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("TrafficMonitor-gc-%d").build()); + gcExecutor.scheduleAtFixedRate(gc, gcInterval.as(Time.SECONDS), gcInterval.as(Time.SECONDS), + TimeUnit.SECONDS); + } + + /** + * Gets the name of the service that this monitor is monitoring. + * + * @return Monitor's service name. + */ + public String getServiceName() { + return serviceName; + } + + /** + * Gets the total number of requests that this monitor has observed, for all remote hosts. + * + * @return Total number of requests observed. + */ + public long getLifetimeRequestCount() { + return lifetimeRequests.get(); + } + + /** + * Fetches all current traffic information. + * + * @return A map from the host key type to information about that host. + */ + public synchronized Map<K, TrafficInfo> getTrafficInfo() { + return ImmutableMap.copyOf(trafficInfos.asMap()); + } + + @Override + public synchronized void connected(K key) { + Preconditions.checkNotNull(key); + + trafficInfos.getUnchecked(key).incConnections(); + } + + @Override + public synchronized void released(K key) { + Preconditions.checkNotNull(key); + + TrafficInfo info = trafficInfos.getUnchecked(key); + + Preconditions.checkState(info.getConnectionCount() > 0, "Double release detected!"); + info.decConnections(); + } + + @Override + public void requestResult(K key, RequestResult result, long requestTimeNanos) { + Preconditions.checkNotNull(key); + + lifetimeRequests.incrementAndGet(); + trafficInfos.getUnchecked(key).addResult(result); + } + + @VisibleForTesting + synchronized void gc() { + Iterables.removeIf(trafficInfos.asMap().entrySet(), + new Predicate<Map.Entry<K, TrafficInfo>>() { + @Override public boolean apply(Map.Entry<K, TrafficInfo> clientInfo) { + if (clientInfo.getValue().connections.get() > 0) return false; + + long idlePeriod = clock.nowNanos() - clientInfo.getValue().getLastActiveTimestamp(); + + return idlePeriod > gcInterval.as(Time.NANOSECONDS); + } + }); + } + + /** + * Shuts down TrafficMonitor by stopping background gc task. + */ + public void shutdown() { + new ExecutorServiceShutdown(gcExecutor, Amount.of(0L, Time.SECONDS)).execute(); + } + + /** + * Information about traffic obsserved to/from a specific host. + */ + public class TrafficInfo { + private final K key; + private AtomicInteger requestSuccesses = new AtomicInteger(); + private AtomicInteger requestFailures = new AtomicInteger(); + private AtomicInteger connections = new AtomicInteger(); + private AtomicLong lastActive = new AtomicLong(); + + TrafficInfo(K key) { + this.key = key; + pulse(); + } + + void pulse() { + lastActive.set(clock.nowNanos()); + } + + public K getKey() { + return key; + } + + void addResult(RequestResult result) { + pulse(); + switch (result) { + case SUCCESS: + requestSuccesses.incrementAndGet(); + break; + case FAILED: + case TIMEOUT: + requestFailures.incrementAndGet(); + break; + } + } + + public int getRequestSuccessCount() { + return requestSuccesses.get(); + } + + public int getRequestFailureCount() { + return requestFailures.get(); + } + + int incConnections() { + pulse(); + return connections.incrementAndGet(); + } + + int decConnections() { + pulse(); + return connections.decrementAndGet(); + } + + public int getConnectionCount() { + return connections.get(); + } + + public long getLastActiveTimestamp() { + return lastActive.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java b/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java new file mode 100644 index 0000000..9309c11 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/Connection.java @@ -0,0 +1,48 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import com.google.common.base.Supplier; + +import java.io.Closeable; + +/** + * An interface to a connection resource that may become invalid. + * + * @author John Sirois + */ +public interface Connection<T, E> extends Supplier<T>, Closeable { + + /** + * This will always be the same underlying connection for the lifetime of this object. + * + * @return the connection + */ + @Override T get(); + + /** + * @return {@code true} if the supplied connection is valid for use. + */ + boolean isValid(); + + /** + * Closes this connection. + */ + void close(); + + /** + * @return the endpoint this connection is connected to. + */ + E getEndpoint(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java new file mode 100644 index 0000000..cdaaeab --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionFactory.java @@ -0,0 +1,56 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * A factory for connections that also dictates policy for the size of the connection population. + * + * <p>TODO(John Sirois): separate concerns - mixing in willCreate/null protocol is already tangling + * implementation code + * + * @author John Sirois + */ +public interface ConnectionFactory<S extends Connection<?, ?>> { + + /** + * Checks whether this factory might create a connection if requested. + * + * @return {@code} true if this factory might create a connection at this point in time; ie + * a call to {@link #create} might not have returned {@code null}. May return true to multiple + * threads if concurrently creating connections. + */ + boolean mightCreate(); + + /** + * Attempts to create a new connection within the given timeout and subject to this factory's + * connection population size policy. + * + * @param timeout the maximum amount of time to wait + * @return a new connection or null if there are too many connections already + * @throws Exception if there was a problem creating the connection or establishing the connection + * takes too long + */ + S create(Amount<Long, Time> timeout) throws Exception; + + /** + * Destroys a connection. It is an error to attempt to destroy a connection this factory did + * not {@link #create} + * + * @param connection The connection to destroy. + */ + void destroy(S connection); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java new file mode 100644 index 0000000..316bf2b --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ConnectionPool.java @@ -0,0 +1,334 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.aurora.common.base.Supplier; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.common.stats.StatsProvider; + +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and + * connection choice to a supplied strategy. + * + * <p>TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in + * use. + * + * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command + * + * @author John Sirois + */ +public final class ConnectionPool<S extends Connection<?, ?>> implements ObjectPool<S> { + + private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName()); + + private final Set<S> leasedConnections = + Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap()); + private final Set<S> availableConnections = Sets.newHashSet(); + private final Lock poolLock; + private final Condition available; + + private final ConnectionFactory<S> connectionFactory; + private final Executor executor; + + private volatile boolean closed; + private final AtomicLong connectionsCreated; + private final AtomicLong connectionsDestroyed; + private final AtomicLong connectionsReturned; + + /** + * Creates a connection pool with a connection picker that selects the first item in the set of + * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}. + * + * @param connectionFactory Factory to create and destroy connections. + */ + public ConnectionPool(ConnectionFactory<S> connectionFactory) { + this(connectionFactory, Stats.STATS_PROVIDER); + } + + /** + * Creates a connection pool with a connection picker that selects the first item in the set of + * available connections and uses the supplied StatsProvider to register stats with. + * + * @param connectionFactory Factory to create and destroy connections. + * @param statsProvider Stats export provider. + */ + public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider statsProvider) { + this(Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("CP-" + connectionFactory + "[%d]") + .setDaemon(true) + .build()), + new ReentrantLock(true), connectionFactory, statsProvider); + } + + @VisibleForTesting + ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> connectionFactory, + StatsProvider statsProvider) { + Preconditions.checkNotNull(executor); + Preconditions.checkNotNull(poolLock); + Preconditions.checkNotNull(connectionFactory); + Preconditions.checkNotNull(statsProvider); + + this.executor = executor; + this.poolLock = poolLock; + available = poolLock.newCondition(); + this.connectionFactory = connectionFactory; + + String cfName = Stats.normalizeName(connectionFactory.toString()); + statsProvider.makeGauge("cp_leased_connections_" + cfName, + new Supplier<Integer>() { + @Override public Integer get() { + return leasedConnections.size(); + } + }); + statsProvider.makeGauge("cp_available_connections_" + cfName, + new Supplier<Integer>() { + @Override public Integer get() { + return availableConnections.size(); + } + }); + this.connectionsCreated = + statsProvider.makeCounter("cp_created_connections_" + cfName); + this.connectionsDestroyed = + statsProvider.makeCounter("cp_destroyed_connections_" + cfName); + this.connectionsReturned = + statsProvider.makeCounter("cp_returned_connections_" + cfName); + } + + @Override + public String toString() { + return "CP-" + connectionFactory; + } + + @Override + public S get() throws ResourceExhaustedException, TimeoutException { + checkNotClosed(); + poolLock.lock(); + try { + return leaseConnection(NO_TIMEOUT); + } finally { + poolLock.unlock(); + } + } + + @Override + public S get(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + + checkNotClosed(); + Preconditions.checkNotNull(timeout); + if (timeout.getValue() == 0) { + return get(); + } + + try { + long start = System.nanoTime(); + long timeBudgetNs = timeout.as(Time.NANOSECONDS); + if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) { + try { + timeBudgetNs -= (System.nanoTime() - start); + return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS)); + } finally { + poolLock.unlock(); + } + } else { + throw new TimeoutException("Timed out waiting for pool lock"); + } + } catch (InterruptedException e) { + throw new TimeoutException("Interrupted waiting for pool lock"); + } + } + + private S leaseConnection(Amount<Long, Time> timeout) throws ResourceExhaustedException, + TimeoutException { + S connection = getConnection(timeout); + if (connection == null) { + throw new ResourceExhaustedException("Connection pool resources exhausted"); + } + return leaseConnection(connection); + } + + @Override + public void release(S connection) { + release(connection, false); + } + + /** + * Equivalent to releasing a Connection with isValid() == false. + * @see ObjectPool#remove(Object) + */ + @Override + public void remove(S connection) { + release(connection, true); + } + + // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create + // connection - reason about this and potentially submit release to our executor + private void release(S connection, boolean remove) { + poolLock.lock(); + try { + if (!leasedConnections.remove(connection)) { + throw new IllegalArgumentException("Connection not controlled by this connection pool: " + + connection); + } + + if (!closed && !remove && connection.isValid()) { + addConnection(connection); + connectionsReturned.incrementAndGet(); + } else { + connectionFactory.destroy(connection); + connectionsDestroyed.incrementAndGet(); + } + } finally { + poolLock.unlock(); + } + } + + @Override + public void close() { + poolLock.lock(); + try { + for (S availableConnection : availableConnections) { + connectionFactory.destroy(availableConnection); + } + } finally { + closed = true; + poolLock.unlock(); + } + } + + private void checkNotClosed() { + Preconditions.checkState(!closed); + } + + private S leaseConnection(S connection) { + leasedConnections.add(connection); + return connection; + } + + // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be + // fixed but there may be no need - do gedankanalysis + private S getConnection(final Amount<Long, Time> timeout) throws ResourceExhaustedException, + TimeoutException { + if (availableConnections.isEmpty()) { + if (leasedConnections.isEmpty()) { + // Completely empty pool + try { + return createConnection(timeout); + } catch (Exception e) { + throw new ResourceExhaustedException("failed to create a new connection", e); + } + } else { + // If the pool is allowed to grow - let the connection factory race a release + if (connectionFactory.mightCreate()) { + executor.execute(new Runnable() { + @Override public void run() { + try { + // The connection timeout is not needed here to honor the callers get requested + // timeout, but we don't want to have an infinite timeout which could exhaust a + // thread pool over many backgrounded create calls + S connection = createConnection(timeout); + if (connection != null) { + addConnection(connection); + } else { + LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " + + "due to maximum pool size or timeout"); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e); + } + } + }); + } + + try { + // We wait for a returned/new connection here in loops to guard against the + // "spurious wakeups" that are documented can occur with Condition.await() + if (timeout.getValue() == 0) { + while(availableConnections.isEmpty()) { + available.await(); + } + } else { + long timeRemainingNs = timeout.as(Time.NANOSECONDS); + while(availableConnections.isEmpty()) { + long start = System.nanoTime(); + if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) { + throw new TimeoutException( + "timeout waiting for a connection to be released to the pool"); + } else { + timeRemainingNs -= (System.nanoTime() - start); + } + } + if (availableConnections.isEmpty()) { + throw new TimeoutException( + "timeout waiting for a connection to be released to the pool"); + } + } + } catch (InterruptedException e) { + throw new TimeoutException("Interrupted while waiting for a connection."); + } + } + } + + return getAvailableConnection(); + } + + private S getAvailableConnection() { + S connection = (availableConnections.size() == 1) + ? Iterables.getOnlyElement(availableConnections) + : availableConnections.iterator().next(); + if (!availableConnections.remove(connection)) { + throw new IllegalArgumentException("Connection picked not in pool: " + connection); + } + return connection; + } + + private S createConnection(Amount<Long, Time> timeout) throws Exception { + S connection = connectionFactory.create(timeout); + if (connection != null) { + connectionsCreated.incrementAndGet(); + } + return connection; + } + + private void addConnection(S connection) { + poolLock.lock(); + try { + availableConnections.add(connection); + available.signal(); + } finally { + poolLock.unlock(); + } + } +}