http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java new file mode 100644 index 0000000..93f6610 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java @@ -0,0 +1,76 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; + +/** + * A host set that can be monitored for changes. + * + * @param <T> The type that is used to identify members of the host set. + */ +public interface DynamicHostSet<T> { + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @throws MonitorException if there is a problem monitoring the host set + * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)} + */ + @Deprecated + public void monitor(final HostChangeMonitor<T> monitor) throws MonitorException; + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @return A command which, when executed, will stop monitoring the host set. + * @throws MonitorException if there is a problem monitoring the host set + */ + public Command watch(final HostChangeMonitor<T> monitor) throws MonitorException; + + /** + * An interface to an object that is interested in receiving notification whenever the host set + * changes. + */ + public static interface HostChangeMonitor<T> { + + /** + * Called when either the available set of services changes (when a service dies or a new + * instance comes on-line) or when an existing service advertises a status or health change. + * + * @param hostSet the current set of available ServiceInstances + */ + void onChange(ImmutableSet<T> hostSet); + } + + public static class MonitorException extends Exception { + public MonitorException(String msg) { + super(msg); + } + + public MonitorException(String msg, Throwable cause) { + super(msg, cause); + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java new file mode 100644 index 0000000..4f75893 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSetUtil.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; + +/** + * Utility methods for dealing with dynamic sets of hosts. + */ +public final class DynamicHostSetUtil { + + /** + * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of + * the underlying actual endpoints. + * + * @param hostSet The hostSet to snapshot. + * @throws DynamicHostSet.MonitorException if there was a problem obtaining the snapshot. + */ + public static <T> ImmutableSet<T> getSnapshot(DynamicHostSet<T> hostSet) throws DynamicHostSet.MonitorException { + final ImmutableSet.Builder<T> snapshot = ImmutableSet.builder(); + Command unwatch = hostSet.watch(new DynamicHostSet.HostChangeMonitor<T>() { + @Override public void onChange(ImmutableSet<T> hostSet) { + snapshot.addAll(hostSet); + } + }); + unwatch.execute(); + return snapshot.build(); + } + + private DynamicHostSetUtil() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java new file mode 100644 index 0000000..2fd6046 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicPool.java @@ -0,0 +1,170 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.net.loadbalancing.LoadBalancer; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.ServerSet; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +/** + * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a + * {@link ServerSet}. + * + * @param <H> The type that contains metadata information about hosts, such as liveness and address. + * @param <T> The raw connection type that is being pooled. + * @param <E> The type that identifies the endpoint of the pool, such as an address. + * @author John Sirois + */ +public class DynamicPool<H, T, E> implements ObjectPool<Connection<T, E>> { + + private final MetaPool<T, E> pool; + + /** + * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools + * for the given {@code serverSet}. + * + * @param hostSet the dynamic set of available servers to pool connections for + * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint + * @param loadBalancer Load balancer to manage request flow. + * @param onBackendsChosen A callback to notify of chosen backends. + * @param restoreInterval the interval after connection errors start occurring for a target to + * begin checking to see if it has come back to a healthy state + * @param endpointExtractor Function that transforms a service instance into an endpoint instance. + * @param livenessChecker Filter that will determine whether a host indicates itself as available. + * @throws DynamicHostSet.MonitorException if there is a problem monitoring the host set + */ + public DynamicPool(DynamicHostSet<H> hostSet, + Function<E, ObjectPool<Connection<T, E>>> endpointPoolFactory, + LoadBalancer<E> loadBalancer, + Closure<Collection<E>> onBackendsChosen, + Amount<Long, Time> restoreInterval, + Function<H, E> endpointExtractor, + Predicate<H> livenessChecker) + throws DynamicHostSet.MonitorException { + Preconditions.checkNotNull(hostSet); + Preconditions.checkNotNull(endpointPoolFactory); + + pool = new MetaPool<T, E>(loadBalancer, onBackendsChosen, restoreInterval); + + // TODO(John Sirois): consider an explicit start/stop + hostSet.monitor(new PoolMonitor<H, Connection<T, E>>(endpointPoolFactory, endpointExtractor, + livenessChecker) { + @Override protected void onPoolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools, + Map<E, ObjectPool<Connection<T, E>>> livePools) { + poolRebuilt(deadPools, livePools); + } + }); + } + + @VisibleForTesting + void poolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools, + Map<E, ObjectPool<Connection<T, E>>> livePools) { + + pool.setBackends(livePools); + + for (ObjectPool<Connection<T, E>> deadTargetPool : deadPools) { + deadTargetPool.close(); + } + } + + @Override + public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException { + return pool.get(); + } + + @Override + public Connection<T, E> get(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + return pool.get(timeout); + } + + @Override + public void release(Connection<T, E> connection) { + pool.release(connection); + } + + @Override + public void remove(Connection<T, E> connection) { + pool.remove(connection); + } + + @Override + public void close() { + pool.close(); + } + + private abstract class PoolMonitor<H, S extends Connection<?, ?>> + implements DynamicHostSet.HostChangeMonitor<H> { + + private final Function<E, ObjectPool<S>> endpointPoolFactory; + private final Function<H, E> endpointExtractor; + private final Predicate<H> livenessTest; + + public PoolMonitor(Function<E, ObjectPool<S>> endpointPoolFactory, + Function<H, E> endpointExtractor, + Predicate<H> livenessTest) { + this.endpointPoolFactory = endpointPoolFactory; + this.endpointExtractor = endpointExtractor; + this.livenessTest = livenessTest; + } + + private final Map<E, ObjectPool<S>> endpointPools = Maps.newHashMap(); + + @Override + public synchronized void onChange(ImmutableSet<H> serverSet) { + // TODO(John Sirois): change onChange to pass the delta data since its already computed by + // ServerSet + + Map<E, H> newEndpoints = + Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor); + + Set<E> deadEndpoints = ImmutableSet.copyOf( + Sets.difference(endpointPools.keySet(), newEndpoints.keySet())); + Set<ObjectPool<S>> deadPools = Sets.newHashSet(); + for (E endpoint : deadEndpoints) { + ObjectPool<S> deadPool = endpointPools.remove(endpoint); + deadPools.add(deadPool); + } + + Set<E> addedEndpoints = ImmutableSet.copyOf( + Sets.difference(newEndpoints.keySet(), endpointPools.keySet())); + for (E endpoint : addedEndpoints) { + ObjectPool<S> endpointPool = endpointPoolFactory.apply(endpoint); + endpointPools.put(endpoint, endpointPool); + } + + onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools)); + } + + protected abstract void onPoolRebuilt(Set<ObjectPool<S>> deadPools, + Map<E, ObjectPool<S>> livePools); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java new file mode 100644 index 0000000..df1bd96 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/MetaPool.java @@ -0,0 +1,339 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.loadbalancing.LoadBalancer; +import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * A connection pool that picks connections from a set of backend pools. Backend pools are selected + * from randomly initially but then as they are used they are ranked according to how many + * connections they have available and whether or not the last used connection had an error or not. + * In this way, backends that are responsive should get selected in preference to those that are + * not. + * + * <p>Non-responsive backends are monitored after a configurable period in a background thread and + * if a connection can be obtained they start to float back up in the rankings. In this way, + * backends that are initially non-responsive but later become responsive should end up getting + * selected. + * + * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command + * + * @author John Sirois + */ +public class MetaPool<T, E> implements ObjectPool<Connection<T, E>> { + + private final Command stopBackendRestorer; + + private Map<E, ObjectPool<Connection<T, E>>> backends = null; + + // Locks to guard mutation of the backends set. + private final Lock backendsReadLock; + private final Lock backendsWriteLock; + + private final Closure<Collection<E>> onBackendsChosen; + + private final LoadBalancer<E> loadBalancer; + + /** + * Creates a connection pool with no backends. Backends may be added post-creation by calling + * {@link #setBackends(java.util.Map)} + * + * @param loadBalancer the load balancer to distribute requests among backends. + * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new + * set of backends to restrict its call distribution to + * @param restoreInterval the interval after a backend goes dead to begin checking the backend to + * see if it has come back to a healthy state + */ + public MetaPool(LoadBalancer<E> loadBalancer, + Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) { + this(ImmutableMap.<E, ObjectPool<Connection<T, E>>>of(), loadBalancer, + onBackendsChosen, restoreInterval); + } + + /** + * Creates a connection pool that balances connections across multiple backend pools. + * + * @param backends the connection pools for the backends + * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new + * set of backends to restrict its call distribution to + * @param loadBalancer the load balancer to distribute requests among backends. + * @param restoreInterval the interval after a backend goes dead to begin checking the backend to + * see if it has come back to a healthy state + */ + public MetaPool( + ImmutableMap<E, ObjectPool<Connection<T, E>>> backends, + LoadBalancer<E> loadBalancer, + Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) { + + this.loadBalancer = Preconditions.checkNotNull(loadBalancer); + this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen); + + ReadWriteLock backendsLock = new ReentrantReadWriteLock(true); + backendsReadLock = backendsLock.readLock(); + backendsWriteLock = backendsLock.writeLock(); + + setBackends(backends); + + Preconditions.checkNotNull(restoreInterval); + Preconditions.checkArgument(restoreInterval.getValue() > 0); + stopBackendRestorer = startDeadBackendRestorer(restoreInterval); + } + + /** + * Assigns the backend pools that this pool should draw from. + * + * @param pools New pools to use. + */ + public void setBackends(Map<E, ObjectPool<Connection<T, E>>> pools) { + backendsWriteLock.lock(); + try { + backends = Preconditions.checkNotNull(pools); + loadBalancer.offerBackends(pools.keySet(), onBackendsChosen); + } finally { + backendsWriteLock.unlock(); + } + } + + private Command startDeadBackendRestorer(final Amount<Long, Time> restoreInterval) { + + final AtomicBoolean shouldRestore = new AtomicBoolean(true); + Runnable restoreDeadBackends = new Runnable() { + @Override public void run() { + if (shouldRestore.get()) { + restoreDeadBackends(restoreInterval); + } + } + }; + final ScheduledExecutorService scheduledExecutorService = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("MTCP-DeadBackendRestorer[%s]") + .build()); + long restoreDelay = restoreInterval.getValue(); + scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay, + restoreDelay, restoreInterval.getUnit().getTimeUnit()); + + return new Command() { + @Override public void execute() { + shouldRestore.set(false); + scheduledExecutorService.shutdownNow(); + LOG.info("Backend restorer shut down"); + } + }; + } + + private static final Logger LOG = Logger.getLogger(MetaPool.class.getName()); + + private void restoreDeadBackends(Amount<Long, Time> restoreInterval) { + for (E backend : snapshotBackends()) { + ObjectPool<Connection<T, E>> pool; + backendsReadLock.lock(); + try { + pool = backends.get(backend); + } finally { + backendsReadLock.unlock(); + } + + // We can lose a race if the backends change - and that's fine, we'll restore the new set of + // backends in the next scheduled restoration run. + if (pool != null) { + try { + release(get(backend, pool, restoreInterval)); + } catch (TimeoutException e) { + LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); + } catch (ResourceExhaustedException e) { + LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); + } + } + } + } + + private Iterable<E> snapshotBackends() { + backendsReadLock.lock(); + try { + return ImmutableList.copyOf(backends.keySet()); + } finally { + backendsReadLock.unlock(); + } + } + + @Override + public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException { + return get(ObjectPool.NO_TIMEOUT); + } + + @Override + public Connection<T, E> get(Amount<Long, Time> timeout) + throws ResourceExhaustedException, TimeoutException { + + E backend; + ObjectPool<Connection<T, E>> pool; + + backendsReadLock.lock(); + try { + backend = loadBalancer.nextBackend(); + Preconditions.checkNotNull(backend, "Load balancer gave a null backend."); + + pool = backends.get(backend); + Preconditions.checkNotNull(backend, + "Given backend %s not found in tracked backends: %s", backend, backends); + } finally { + backendsReadLock.unlock(); + } + + return get(backend, pool, timeout); + } + + private static class ManagedConnection<T, E> implements Connection<T, E> { + private final Connection<T, E> connection; + private final ObjectPool<Connection<T, E>> pool; + + private ManagedConnection(Connection<T, E> connection, ObjectPool<Connection<T, E>> pool) { + this.connection = connection; + this.pool = pool; + } + + @Override + public void close() { + connection.close(); + } + + @Override + public T get() { + return connection.get(); + } + + @Override + public boolean isValid() { + return connection.isValid(); + } + + @Override + public E getEndpoint() { + return connection.getEndpoint(); + } + + @Override public String toString() { + return "ManagedConnection[" + connection.toString() + "]"; + } + + void release(boolean remove) { + if (remove) { + pool.remove(connection); + } else { + pool.release(connection); + } + } + } + + private Connection<T, E> get(E backend, ObjectPool<Connection<T, E>> pool, + Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException { + + long startNanos = System.nanoTime(); + try { + Connection<T, E> connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout); + + // BEWARE: We have leased a connection from the underlying pool here and must return it to the + // caller so they can later release it. If we fail to do so, the connection will leak. + // Catching intermediate exceptions ourselves and pro-actively returning the connection to the + // pool before re-throwing is not a viable option since the return would have to succeed, + // forcing us to ignore the timeout passed in. + + // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire + // a (backend, pool) pair atomically that has since been removed, we can safely let the lb + // know about backend events and it will just ignore us. + + try { + loadBalancer.connected(backend, System.nanoTime() - startNanos); + } catch (RuntimeException e) { + LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after " + + "leasing a connection - continuing", e); + } + return new ManagedConnection<T, E>(connection, pool); + } catch (TimeoutException e) { + loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT); + throw e; + } catch (ResourceExhaustedException e) { + loadBalancer.connectFailed(backend, ConnectionResult.FAILED); + throw e; + } + } + + @Override + public void release(Connection<T, E> connection) { + release(connection, false); + } + + /** + * Equivalent to releasing a Connection with isValid() == false. + * @see ObjectPool#remove(Object) + */ + @Override + public void remove(Connection<T, E> connection) { + release(connection, true); + } + + private void release(Connection<T, E> connection, boolean remove) { + backendsWriteLock.lock(); + try { + + if (!(connection instanceof ManagedConnection)) { + throw new IllegalArgumentException("Connection not controlled by this connection pool: " + + connection); + } + ((ManagedConnection) connection).release(remove); + + loadBalancer.released(connection.getEndpoint()); + } finally { + backendsWriteLock.unlock(); + } + } + + @Override + public void close() { + stopBackendRestorer.execute(); + + backendsWriteLock.lock(); + try { + for (ObjectPool<Connection<T, E>> backend : backends.values()) { + backend.close(); + } + } finally { + backendsWriteLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java new file mode 100644 index 0000000..a665903 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ObjectPool.java @@ -0,0 +1,82 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +import java.util.concurrent.TimeoutException; + +/** + * A generic object pool that provides object of a given type for exclusive use by the caller. + * Object pools generally pool expensive resources and so offer a {@link #close} method that should + * be used to free these resources when the pool is no longer needed. + * + * @author John Sirois + */ +public interface ObjectPool<T> { + + /** + * Gets a resource potentially blocking for as long as it takes to either create a new one or wait + * for one to be {@link #release(Object) released}. Callers must {@link #release(Object) release} + * the connection when they are done with it. + * + * @return a resource for exclusive use by the caller + * @throws ResourceExhaustedException if no resource could be obtained because this pool was + * exhausted + * @throws TimeoutException if we timed out while trying to fetch a resource + */ + T get() throws ResourceExhaustedException, TimeoutException; + + /** + * A convenience constant representing a no timeout. + */ + Amount<Long,Time> NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS); + + /** + * Gets a resource; timing out if there are none available and it takes longer than specified to + * create a new one or wait for one to be {@link #release(Object) released}. Callers must + * {@link #release (Object) release} the connection when they are done with it. + * + * @param timeout the maximum amount of time to wait + * @return a resource for exclusive use by the caller + * @throws TimeoutException if the specified timeout was reached before a resource became + * available + * @throws ResourceExhaustedException if no resource could be obtained because this pool was + * exhausted + */ + T get(Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException; + + /** + * Releases a resource obtained from this pool back into the pool of available resources. It is an + * error to release a resource not obtained from this pool. + * + * @param resource Resource to release. + */ + void release(T resource); + + /** + * Removes a resource obtained from this pool from its available resources. It is an error to + * remove a resource not obtained from this pool. + * + * @param resource Resource to remove. + */ + void remove(T resource); + + /** + * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects + * when they are released. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java new file mode 100644 index 0000000..fd48ddb --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/ResourceExhaustedException.java @@ -0,0 +1,27 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.net.pool; + +/** + * @author John Sirois + */ +public class ResourceExhaustedException extends Exception { + public ResourceExhaustedException(String message) { + super(message); + } + + public ResourceExhaustedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java new file mode 100644 index 0000000..95c8868 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/objectsize/ObjectSizeCalculator.java @@ -0,0 +1,427 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.objectsize; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Sets; + +/** + * Contains utility methods for calculating the memory usage of objects. It + * only works on the HotSpot JVM, and infers the actual memory layout (32 bit + * vs. 64 bit word size, compressed object pointers vs. uncompressed) from + * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM. + * It can only make an educated guess at whether compressed OOPs are used, + * though; specifically, it knows what the JVM's default choice of OOP + * compression would be based on HotSpot version and maximum heap sizes, but if + * the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line + * switch, it can not detect + * this fact and will report incorrect sizes, as it will presume the default JVM + * behavior. + * + * @author Attila Szegedi + */ +public class ObjectSizeCalculator { + + /** + * Describes constant memory overheads for various constructs in a JVM implementation. + */ + public interface MemoryLayoutSpecification { + + /** + * Returns the fixed overhead of an array of any type or length in this JVM. + * + * @return the fixed overhead of an array. + */ + int getArrayHeaderSize(); + + /** + * Returns the fixed overhead of for any {@link Object} subclass in this JVM. + * + * @return the fixed overhead of any object. + */ + int getObjectHeaderSize(); + + /** + * Returns the quantum field size for a field owned by an object in this JVM. + * + * @return the quantum field size for an object. + */ + int getObjectPadding(); + + /** + * Returns the fixed size of an object reference in this JVM. + * + * @return the size of all object references. + */ + int getReferenceSize(); + + /** + * Returns the quantum field size for a field owned by one of an object's ancestor superclasses + * in this JVM. + * + * @return the quantum field size for a superclass field. + */ + int getSuperclassFieldPadding(); + } + + private static class CurrentLayout { + private static final MemoryLayoutSpecification SPEC = + getEffectiveMemoryLayoutSpecification(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. Attempts to to detect the current JVM memory layout, + * but may fail with {@link UnsupportedOperationException}; + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + * @throws UnsupportedOperationException if the current vm memory layout cannot be detected. + */ + public static long getObjectSize(Object obj) throws UnsupportedOperationException { + return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj); + } + + // Fixed object header size for arrays. + private final int arrayHeaderSize; + // Fixed object header size for non-array objects. + private final int objectHeaderSize; + // Padding for the object size - if the object size is not an exact multiple + // of this, it is padded to the next multiple. + private final int objectPadding; + // Size of reference (pointer) fields. + private final int referenceSize; + // Padding for the fields of superclass before fields of subclasses are + // added. + private final int superclassFieldPadding; + + private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos = + CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() { + public ClassSizeInfo load(Class<?> clazz) { + return new ClassSizeInfo(clazz); + } + }); + + + private final Set<Object> alreadyVisited = Sets.newIdentityHashSet(); + private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024); + private long size; + + /** + * Creates an object size calculator that can calculate object sizes for a given + * {@code memoryLayoutSpecification}. + * + * @param memoryLayoutSpecification a description of the JVM memory layout. + */ + public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) { + Preconditions.checkNotNull(memoryLayoutSpecification); + arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize(); + objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize(); + objectPadding = memoryLayoutSpecification.getObjectPadding(); + referenceSize = memoryLayoutSpecification.getReferenceSize(); + superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + */ + public synchronized long calculateObjectSize(Object obj) { + // Breadth-first traversal instead of naive depth-first with recursive + // implementation, so we don't blow the stack traversing long linked lists. + try { + for (;;) { + visit(obj); + if (pending.isEmpty()) { + return size; + } + obj = pending.removeFirst(); + } + } finally { + alreadyVisited.clear(); + pending.clear(); + size = 0; + } + } + + private void visit(Object obj) { + if (alreadyVisited.contains(obj)) { + return; + } + final Class<?> clazz = obj.getClass(); + if (clazz == ArrayElementsVisitor.class) { + ((ArrayElementsVisitor) obj).visit(this); + } else { + alreadyVisited.add(obj); + if (clazz.isArray()) { + visitArray(obj); + } else { + classSizeInfos.getUnchecked(clazz).visit(obj, this); + } + } + } + + private void visitArray(Object array) { + final Class<?> componentType = array.getClass().getComponentType(); + final int length = Array.getLength(array); + if (componentType.isPrimitive()) { + increaseByArraySize(length, getPrimitiveFieldSize(componentType)); + } else { + increaseByArraySize(length, referenceSize); + // If we didn't use an ArrayElementsVisitor, we would be enqueueing every + // element of the array here instead. For large arrays, it would + // tremendously enlarge the queue. In essence, we're compressing it into + // a small command object instead. This is different than immediately + // visiting the elements, as their visiting is scheduled for the end of + // the current queue. + switch (length) { + case 0: { + break; + } + case 1: { + enqueue(Array.get(array, 0)); + break; + } + default: { + enqueue(new ArrayElementsVisitor((Object[]) array)); + } + } + } + } + + private void increaseByArraySize(int length, long elementSize) { + increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding)); + } + + private static class ArrayElementsVisitor { + private final Object[] array; + + ArrayElementsVisitor(Object[] array) { + this.array = array; + } + + public void visit(ObjectSizeCalculator calc) { + for (Object elem : array) { + if (elem != null) { + calc.visit(elem); + } + } + } + } + + void enqueue(Object obj) { + if (obj != null) { + pending.addLast(obj); + } + } + + void increaseSize(long objectSize) { + size += objectSize; + } + + @VisibleForTesting + static long roundTo(long x, int multiple) { + return ((x + multiple - 1) / multiple) * multiple; + } + + private class ClassSizeInfo { + // Padded fields + header size + private final long objectSize; + // Only the fields size - used to calculate the subclasses' memory + // footprint. + private final long fieldsSize; + private final Field[] referenceFields; + + public ClassSizeInfo(Class<?> clazz) { + long fieldsSize = 0; + final List<Field> referenceFields = new LinkedList<Field>(); + for (Field f : clazz.getDeclaredFields()) { + if (Modifier.isStatic(f.getModifiers())) { + continue; + } + final Class<?> type = f.getType(); + if (type.isPrimitive()) { + fieldsSize += getPrimitiveFieldSize(type); + } else { + f.setAccessible(true); + referenceFields.add(f); + fieldsSize += referenceSize; + } + } + final Class<?> superClass = clazz.getSuperclass(); + if (superClass != null) { + final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass); + fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding); + referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields)); + } + this.fieldsSize = fieldsSize; + this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding); + this.referenceFields = referenceFields.toArray( + new Field[referenceFields.size()]); + } + + void visit(Object obj, ObjectSizeCalculator calc) { + calc.increaseSize(objectSize); + enqueueReferencedObjects(obj, calc); + } + + public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) { + for (Field f : referenceFields) { + try { + calc.enqueue(f.get(obj)); + } catch (IllegalAccessException e) { + final AssertionError ae = new AssertionError( + "Unexpected denial of access to " + f); + ae.initCause(e); + throw ae; + } + } + } + } + + private static long getPrimitiveFieldSize(Class<?> type) { + if (type == boolean.class || type == byte.class) { + return 1; + } + if (type == char.class || type == short.class) { + return 2; + } + if (type == int.class || type == float.class) { + return 4; + } + if (type == long.class || type == double.class) { + return 8; + } + throw new AssertionError("Encountered unexpected primitive type " + + type.getName()); + } + + @VisibleForTesting + static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() { + final String vmName = System.getProperty("java.vm.name"); + if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") + || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) { + throw new UnsupportedOperationException( + "ObjectSizeCalculator only supported on HotSpot VM"); + } + + final String dataModel = System.getProperty("sun.arch.data.model"); + if ("32".equals(dataModel)) { + // Running with 32-bit data model + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 12; + } + @Override public int getObjectHeaderSize() { + return 8; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 4; + } + @Override public int getSuperclassFieldPadding() { + return 4; + } + }; + } else if (!"64".equals(dataModel)) { + throw new UnsupportedOperationException("Unrecognized value '" + + dataModel + "' of sun.arch.data.model system property"); + } + + final String strVmVersion = System.getProperty("java.vm.version"); + final int vmVersion = Integer.parseInt(strVmVersion.substring(0, + strVmVersion.indexOf('.'))); + if (vmVersion >= 17) { + long maxMemory = 0; + for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) { + maxMemory += mp.getUsage().getMax(); + } + if (maxMemory < 30L * 1024 * 1024 * 1024) { + // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total + // for all memory pools (yes, including code cache). + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 16; + } + @Override public int getObjectHeaderSize() { + return 12; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 4; + } + @Override public int getSuperclassFieldPadding() { + return 4; + } + }; + } + } + + // In other cases, it's a 64-bit uncompressed OOPs object model + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 24; + } + @Override public int getObjectHeaderSize() { + return 16; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 8; + } + @Override public int getSuperclassFieldPadding() { + return 8; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java b/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java new file mode 100644 index 0000000..11be7f5 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/quantity/Amount.java @@ -0,0 +1,208 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.quantity; + +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.collections.Pair; + +/** + * Represents a value in a unit system and facilitates unambiguous communication of amounts. + * Instances are created via static factory {@code of(...)} methods. + * + * @param <T> the type of number the amount value is expressed in + * @param <U> the type of unit that this amount quantifies + * + * @author John Sirois + */ +public abstract class Amount<T extends Number & Comparable<T>, U extends Unit<U>> + implements Comparable<Amount<T, U>> { + + /** + * Thrown when a checked operation on an amount would overflow. + */ + + public static class TypeOverflowException extends RuntimeException { + public TypeOverflowException() { + super(); + } + } + + private final Pair<T, U> amount; + private final T maxValue; + + private Amount(T value, U unit, T maxValue) { + Preconditions.checkNotNull(value); + Preconditions.checkNotNull(unit); + this.maxValue = maxValue; + this.amount = Pair.of(value, unit); + } + + public T getValue() { + return amount.getFirst(); + } + + public U getUnit() { + return amount.getSecond(); + } + + public T as(U unit) { + return asUnit(unit); + } + + /** + * Throws TypeOverflowException if an overflow occurs during scaling. + */ + public T asChecked(U unit) { + T retVal = asUnit(unit); + if (retVal.equals(maxValue)) { + throw new TypeOverflowException(); + } + return retVal; + } + + private T asUnit(Unit<?> unit) { + return sameUnits(unit) ? getValue() : scale(getUnit().multiplier() / unit.multiplier()); + } + + @Override + public int hashCode() { + return amount.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Amount)) { + return false; + } + + Amount<?, ?> other = (Amount<?, ?>) obj; + return amount.equals(other.amount) || isSameAmount(other); + } + + private boolean isSameAmount(Amount<?, ?> other) { + // Equals allows Object - so we have no compile time check that other has the right value type; + // ie: make sure they don't have Integer when we have Long. + Number value = other.getValue(); + if (!getValue().getClass().isInstance(value)) { + return false; + } + + Unit<?> unit = other.getUnit(); + if (!getUnit().getClass().isInstance(unit)) { + return false; + } + + @SuppressWarnings("unchecked") + U otherUnit = (U) other.getUnit(); + return isSameAmount(other, otherUnit); + } + + private boolean isSameAmount(Amount<?, ?> other, U otherUnit) { + // Compare in the more precise unit (the one with the lower multiplier). + if (otherUnit.multiplier() > getUnit().multiplier()) { + return getValue().equals(other.asUnit(getUnit())); + } else { + return as(otherUnit).equals(other.getValue()); + } + } + + @Override + public String toString() { + return amount.toString(); + } + + @Override + public int compareTo(Amount<T, U> other) { + // Compare in the more precise unit (the one with the lower multiplier). + if (other.getUnit().multiplier() > getUnit().multiplier()) { + return getValue().compareTo(other.as(getUnit())); + } else { + return as(other.getUnit()).compareTo(other.getValue()); + } + } + + private boolean sameUnits(Unit<? extends Unit<?>> unit) { + return getUnit().equals(unit); + } + + protected abstract T scale(double multiplier); + + /** + * Creates an amount that uses a {@code double} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Double, U> of(double number, U unit) { + return new Amount<Double, U>(number, unit, Double.MAX_VALUE) { + @Override protected Double scale(double multiplier) { + return getValue() * multiplier; + } + }; + } + + /** + * Creates an amount that uses a {@code float} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Float, U> of(float number, U unit) { + return new Amount<Float, U>(number, unit, Float.MAX_VALUE) { + @Override protected Float scale(double multiplier) { + return (float) (getValue() * multiplier); + } + }; + } + + /** + * Creates an amount that uses a {@code long} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Long, U> of(long number, U unit) { + return new Amount<Long, U>(number, unit, Long.MAX_VALUE) { + @Override protected Long scale(double multiplier) { + return (long) (getValue() * multiplier); + } + }; + } + + /** + * Creates an amount that uses an {@code int} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param <U> the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static <U extends Unit<U>> Amount<Integer, U> of(int number, U unit) { + return new Amount<Integer, U>(number, unit, Integer.MAX_VALUE) { + @Override protected Integer scale(double multiplier) { + return (int) (getValue() * multiplier); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Data.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Data.java b/commons/src/main/java/org/apache/aurora/common/quantity/Data.java new file mode 100644 index 0000000..80d077b --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/quantity/Data.java @@ -0,0 +1,51 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.quantity; + +/** + * Provides a unit to allow conversions and unambiguous passing around of data {@link Amount}s. + * The kilo/mega/giga/... hierarchy is built on base 2 so that the hierarchy increases by a factor + * of 1024 instead of 1000 as typical in metric units. Additionally, units are divided in 2 + * hierarchies one based on bits and the other on bytes. Thus {@link #Kb} represents kilobits; so + * 1 Kb = 1024 bits, and {@link #KB} represents kilobytes so 1 KB = 1024 bytes or 8192 bits. + * + * @author John Sirois + */ +public enum Data implements Unit<Data> { + BITS(1), + Kb(1024, BITS), + Mb(1024, Kb), + Gb(1024, Mb), + BYTES(8, BITS), + KB(1024, BYTES), + MB(1024, KB), + GB(1024, MB), + TB(1024, GB), + PB(1024, TB); + + private final double multiplier; + + private Data(double multiplier) { + this.multiplier = multiplier; + } + + private Data(double multiplier, Data base) { + this(multiplier * base.multiplier); + } + + @Override + public double multiplier() { + return multiplier; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Time.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Time.java b/commons/src/main/java/org/apache/aurora/common/quantity/Time.java new file mode 100644 index 0000000..ebf77eb --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/quantity/Time.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.quantity; + +import java.util.concurrent.TimeUnit; + +/** + * Provides a unit to allow conversions and unambiguous passing around of time {@link Amount}s. + * + * @author John Sirois + */ +public enum Time implements Unit<Time> { + NANOSECONDS(1, TimeUnit.NANOSECONDS, "ns"), + MICROSECONDS(1000, NANOSECONDS, TimeUnit.MICROSECONDS, "us"), + MILLISECONDS(1000, MICROSECONDS, TimeUnit.MILLISECONDS, "ms"), + SECONDS(1000, MILLISECONDS, TimeUnit.SECONDS, "secs"), + MINUTES(60, SECONDS, TimeUnit.MINUTES, "mins"), + HOURS(60, MINUTES, TimeUnit.HOURS, "hrs"), + DAYS(24, HOURS, TimeUnit.DAYS, "days"); + + private final double multiplier; + private final TimeUnit timeUnit; + private final String display; + + private Time(double multiplier, TimeUnit timeUnit, String display) { + this.multiplier = multiplier; + this.timeUnit = timeUnit; + this.display = display; + } + + private Time(double multiplier, Time base, TimeUnit timeUnit, String display) { + this(multiplier * base.multiplier, timeUnit, display); + } + + @Override + public double multiplier() { + return multiplier; + } + + /** + * Returns the equivalent {@code TimeUnit}. + */ + public TimeUnit getTimeUnit() { + return timeUnit; + } + + @Override + public String toString() { + return display; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java b/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java new file mode 100644 index 0000000..dd9b9ec --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/quantity/Unit.java @@ -0,0 +1,33 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.quantity; + +/** + * Represents a unit hierarchy for a given unit of measure; eg: time. Instances represent specific + * units from the hierarchy; eg: seconds. + * + * @param <U> the type of the concrete unit implementation + * + * @author John Sirois + */ +public interface Unit<U extends Unit<U>> { + + /** + * Returns the weight of this unit relative to other units in the same hierarchy. Typically the + * smallest unit in the hierarchy returns 1, but this need not be the case. It is only required + * that each unit of the hierarchy return a multiplier relative to a common base unit for the + * hierarchy. + */ + double multiplier(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java new file mode 100644 index 0000000..cfbf04e --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/ApproximateHistogram.java @@ -0,0 +1,563 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.util.Arrays; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Data; + +/** + * <p> + * Implements Histogram structure for computing approximate quantiles. + * The implementation is based on the following paper: + * + * <pre> + * [MP80] Munro & Paterson, "Selection and Sorting with Limited Storage", + * Theoretical Computer Science, Vol 12, p 315-323, 1980. + * </pre> + * </p> + * <p> + * You could read a detailed description of the same algorithm here: + * + * <pre> + * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other + * Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM + * SIGMOD, Vol 27, No 2, p 426-435, June 1998. + * </pre> + * </p> + * <p> + * There's a good explanation of the algorithm in the Sawzall source code + * See: http://szl.googlecode.com/svn-history/r36/trunk/src/emitters/szlquantile.cc + * </p> + * Here's a schema of the tree: + * <pre> + * [4] level 3, weight=rootWeight=8 + * | + * [3] level 2, weight=4 + * | + * [2] level 1, weight=2 + * / \ + * [0] [1] level 0, weight=1 + * </pre> + * <p> + * {@code [i]} represents {@code buffer[i]} + * The depth of the tree is limited to a maximum value + * Every buffer has the same size + * </p> + * <p> + * We add element in {@code [0]} or {@code [1]}. + * When {@code [0]} and {@code [1]} are full, we collapse them, it generates a temporary buffer + * of weight 2, if {@code [2]} is empty, we put the collapsed buffer into {@code [2]} otherwise + * we collapse {@code [2]} with the temporary buffer and put it in {@code [3]} if it's empty and + * so on... + * </p> + */ +public final class ApproximateHistogram implements Histogram { + @VisibleForTesting + public static final Precision DEFAULT_PRECISION = new Precision(0.02, 100 * 1000); + @VisibleForTesting + public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of(12L, Data.KB); + @VisibleForTesting static final long ELEM_SIZE = 8; // sizeof long + + // See above + @VisibleForTesting long[][] buffer; + @VisibleForTesting long count = 0L; + @VisibleForTesting int leafCount = 0; // number of elements in the bottom two leaves + @VisibleForTesting int currentTop = 1; + @VisibleForTesting int[] indices; // member for optimization reason + private boolean leavesSorted = true; + private int rootWeight = 1; + private long[][] bufferPool; // pool of 2 buffers (used for merging) + private int bufferSize; + private int maxDepth; + + /** + * Private init method that is called only by constructors. + * All allocations are done in this method. + * + * @param bufSize size of each buffer + * @param depth maximum depth of the tree of buffers + */ + @VisibleForTesting + void init(int bufSize, int depth) { + bufferSize = bufSize; + maxDepth = depth; + bufferPool = new long[2][bufferSize]; + indices = new int[depth + 1]; + buffer = new long[depth + 1][bufferSize]; + // only allocate the first 2 buffers, lazily allocate the others. + allocate(0); + allocate(1); + Arrays.fill(buffer, 2, buffer.length, null); + clear(); + } + + @VisibleForTesting + ApproximateHistogram(int bufSize, int depth) { + init(bufSize, depth); + } + + /** + * Constructor with precision constraint, it will allocated as much memory as require to match + * this precision constraint. + * @param precision the requested precision + */ + public ApproximateHistogram(Precision precision) { + Preconditions.checkNotNull(precision); + int depth = computeDepth(precision.getEpsilon(), precision.getN()); + int bufSize = computeBufferSize(depth, precision.getN()); + init(bufSize, depth); + } + + /** + * Constructor with memory constraint, it will find the best possible precision that satisfied + * the memory constraint. + * @param maxMemory the maximum amount of memory that the instance will take + */ + public ApproximateHistogram(Amount<Long, Data> maxMemory, int expectedSize) { + Preconditions.checkNotNull(maxMemory); + Preconditions.checkArgument(1024 <= maxMemory.as(Data.BYTES), + "at least 1KB is required for an Histogram"); + + double epsilon = DEFAULT_PRECISION.getEpsilon(); + int n = expectedSize; + int depth = computeDepth(epsilon, n); + int bufSize = computeBufferSize(depth, n); + long maxBytes = maxMemory.as(Data.BYTES); + + // Increase precision if the maxMemory allow it, otherwise reduce precision. (by 5% steps) + boolean tooMuchMem = memoryUsage(bufSize, depth) > maxBytes; + double multiplier = tooMuchMem ? 1.05 : 0.95; + while((maxBytes < memoryUsage(bufSize, depth)) == tooMuchMem) { + epsilon *= multiplier; + if (epsilon < 0.00001) { + // for very high memory constraint increase N as well + n *= 10; + epsilon = DEFAULT_PRECISION.getEpsilon(); + } + depth = computeDepth(epsilon, n); + bufSize = computeBufferSize(depth, n); + } + if (!tooMuchMem) { + // It's ok to consume less memory than the constraint + // but we never have to consume more! + depth = computeDepth(epsilon / multiplier, n); + bufSize = computeBufferSize(depth, n); + } + + init(bufSize, depth); + } + + /** + * Constructor with memory constraint. + * @see #ApproximateHistogram(Amount, int) + */ + public ApproximateHistogram(Amount<Long, Data> maxMemory) { + this(maxMemory, DEFAULT_PRECISION.getN()); + } + + /** + * Default Constructor. + * @see #ApproximateHistogram(Amount) + */ + public ApproximateHistogram() { + this(DEFAULT_MAX_MEMORY); + } + + @Override + public synchronized void add(long x) { + // if the leaves of the tree are full, "collapse" recursively the tree + if (leafCount == 2 * bufferSize) { + Arrays.sort(buffer[0]); + Arrays.sort(buffer[1]); + recCollapse(buffer[0], 1); + leafCount = 0; + } + + // Now we're sure there is space for adding x + if (leafCount < bufferSize) { + buffer[0][leafCount] = x; + } else { + buffer[1][leafCount - bufferSize] = x; + } + leafCount++; + count++; + leavesSorted = (leafCount == 1); + } + + @Override + public synchronized long getQuantile(double q) { + Preconditions.checkArgument(0.0 <= q && q <= 1.0, + "quantile must be in the range 0.0 to 1.0 inclusive"); + if (count == 0) { + return 0L; + } + + // the two leaves are the only buffer that can be partially filled + int buf0Size = Math.min(bufferSize, leafCount); + int buf1Size = Math.max(0, leafCount - buf0Size); + long sum = 0; + long target = (long) Math.ceil(count * (1.0 - q)); + int i; + + if (! leavesSorted) { + Arrays.sort(buffer[0], 0, buf0Size); + Arrays.sort(buffer[1], 0, buf1Size); + leavesSorted = true; + } + Arrays.fill(indices, bufferSize - 1); + indices[0] = buf0Size - 1; + indices[1] = buf1Size - 1; + + do { + i = biggest(indices); + indices[i]--; + sum += weight(i); + } while (sum < target); + return buffer[i][indices[i] + 1]; + } + + @Override + public synchronized long[] getQuantiles(double[] quantiles) { + return Histograms.extractQuantiles(this, quantiles); + } + + @Override + public synchronized void clear() { + count = 0L; + leafCount = 0; + currentTop = 1; + rootWeight = 1; + leavesSorted = true; + } + + /** + * MergedHistogram is a Wrapper on top of multiple histograms, it gives a view of all the + * underlying histograms as it was just one. + * Note: Should only be used for querying the underlying histograms. + */ + private static class MergedHistogram implements Histogram { + private final ApproximateHistogram[] histograms; + + private MergedHistogram(ApproximateHistogram[] histograms) { + this.histograms = histograms; + } + + @Override + public void add(long x) { + /* Ignore, Shouldn't be used */ + assert(false); + } + + @Override + public void clear() { + /* Ignore, Shouldn't be used */ + assert(false); + } + + @Override + public long getQuantile(double quantile) { + Preconditions.checkArgument(0.0 <= quantile && quantile <= 1.0, + "quantile must be in the range 0.0 to 1.0 inclusive"); + + long count = initIndices(); + if (count == 0) { + return 0L; + } + + long sum = 0; + long target = (long) Math.ceil(count * (1.0 - quantile)); + int iHist = -1; + int iBiggest = -1; + do { + long biggest = Long.MIN_VALUE; + for (int i = 0; i < histograms.length; i++) { + ApproximateHistogram hist = histograms[i]; + int indexBiggest = hist.biggest(hist.indices); + if (indexBiggest >= 0) { + long value = hist.buffer[indexBiggest][hist.indices[indexBiggest]]; + if (iBiggest == -1 || biggest <= value) { + iBiggest = indexBiggest; + biggest = value; + iHist = i; + } + } + } + histograms[iHist].indices[iBiggest]--; + sum += histograms[iHist].weight(iBiggest); + } while (sum < target); + + ApproximateHistogram hist = histograms[iHist]; + int i = hist.indices[iBiggest]; + return hist.buffer[iBiggest][i + 1]; + } + + @Override + public synchronized long[] getQuantiles(double[] quantiles) { + return Histograms.extractQuantiles(this, quantiles); + } + + /** + * Initialize the indices array for each Histogram and return the global count. + */ + private long initIndices() { + long count = 0L; + for (int i = 0; i < histograms.length; i++) { + ApproximateHistogram h = histograms[i]; + int[] indices = h.indices; + count += h.count; + int buf0Size = Math.min(h.bufferSize, h.leafCount); + int buf1Size = Math.max(0, h.leafCount - buf0Size); + + if (! h.leavesSorted) { + Arrays.sort(h.buffer[0], 0, buf0Size); + Arrays.sort(h.buffer[1], 0, buf1Size); + h.leavesSorted = true; + } + Arrays.fill(indices, h.bufferSize - 1); + indices[0] = buf0Size - 1; + indices[1] = buf1Size - 1; + } + return count; + } + } + + /** + * Return a MergedHistogram + * @param histograms array of histograms to merged together + * @return a new Histogram + */ + public static Histogram merge(ApproximateHistogram[] histograms) { + return new MergedHistogram(histograms); + } + + /** + * We compute the "smallest possible b" satisfying two inequalities: + * 1) (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * N + * 2) k * (2 ^ (b - 1)) >= N + * + * For an explanation of these inequalities, please read the Munro-Paterson or + * the Manku-Rajagopalan-Linday papers. + */ + @VisibleForTesting static int computeDepth(double epsilon, long n) { + int b = 2; + while ((b - 2) * (1L << (b - 2)) + 0.5 <= epsilon * n) { + b += 1; + } + return b; + } + + @VisibleForTesting static int computeBufferSize(int depth, long n) { + return (int) (n / (1L << (depth - 1))); + } + + /** + * Return an estimation of the memory used by an instance. + * The size is due to: + * - a fix cost (76 bytes) for the class + fields + * - bufferPool: 16 + 2 * (16 + bufferSize * ELEM_SIZE) + * - indices: 16 + sizeof(Integer) * (depth + 1) + * - buffer: 16 + (depth + 1) * (16 + bufferSize * ELEM_SIZE) + * + * Note: This method is tested with unit test, it will break if you had new fields. + * @param bufferSize the size of a buffer + * @param depth the depth of the tree of buffer (depth + 1 buffers) + */ + @VisibleForTesting + static long memoryUsage(int bufferSize, int depth) { + return 176 + (24 * depth) + (bufferSize * ELEM_SIZE * (depth + 3)); + } + + /** + * Return the level of the biggest element (using the indices array 'ids' + * to track which elements have been already returned). Every buffer has + * already been sorted at this point. + * @return the level of the biggest element or -1 if no element has been found + */ + @VisibleForTesting + int biggest(final int[] ids) { + long biggest = Long.MIN_VALUE; + final int id0 = ids[0], id1 = ids[1]; + int iBiggest = -1; + + if (0 < leafCount && 0 <= id0) { + biggest = buffer[0][id0]; + iBiggest = 0; + } + if (bufferSize < leafCount && 0 <= id1) { + long x = buffer[1][id1]; + if (x > biggest) { + biggest = x; + iBiggest = 1; + } + } + for (int i = 2; i < currentTop + 1; i++) { + if (!isBufferEmpty(i) && 0 <= ids[i]) { + long x = buffer[i][ids[i]]; + if (x > biggest) { + biggest = x; + iBiggest = i; + } + } + } + return iBiggest; + } + + + /** + * Based on the number of elements inserted we can easily know if a buffer + * is empty or not + */ + @VisibleForTesting + boolean isBufferEmpty(int level) { + if (level == currentTop) { + return false; // root buffer (if present) is always full + } else { + long levelWeight = 1 << (level - 1); + return (((count - leafCount) / bufferSize) & levelWeight) == 0; + } + } + + /** + * Return the weight of the level ie. 2^(i-1) except for the two tree + * leaves (weight=1) and for the root + */ + private int weight(int level) { + if (level == 0) { + return 1; + } else if (level == maxDepth) { + return rootWeight; + } else { + return 1 << (level - 1); + } + } + + private void allocate(int i) { + if (buffer[i] == null) { + buffer[i] = new long[bufferSize]; + } + } + + /** + * Recursively collapse the buffers of the tree. + * Upper buffers will be allocated on first access in this method. + */ + private void recCollapse(long[] buf, int level) { + // if we reach the root, we can't add more buffer + if (level == maxDepth) { + // weight() return the weight of the root, in that case we need the + // weight of merge result + int mergeWeight = 1 << (level - 1); + int idx = level % 2; + long[] merged = bufferPool[idx]; + long[] tmp = buffer[level]; + collapse(buf, mergeWeight, buffer[level], rootWeight, merged); + buffer[level] = merged; + bufferPool[idx] = tmp; + rootWeight += mergeWeight; + } else { + allocate(level + 1); // lazy allocation (if needed) + if (level == currentTop) { + // if we reach the top, add a new buffer + collapse1(buf, buffer[level], buffer[level + 1]); + currentTop += 1; + rootWeight *= 2; + } else if (isBufferEmpty(level + 1)) { + // if the upper buffer is empty, use it + collapse1(buf, buffer[level], buffer[level + 1]); + } else { + // it the upper buffer isn't empty, collapse with it + long[] merged = bufferPool[level % 2]; + collapse1(buf, buffer[level], merged); + recCollapse(merged, level + 1); + } + } + } + + /** + * collapse two sorted Arrays of different weight + * ex: [2,5,7] weight 2 and [3,8,9] weight 3 + * weight x array + concat = [2,2,5,5,7,7,3,3,3,8,8,8,9,9,9] + * sort = [2,2,3,3,3,5,5,7,7,8,8,8,9,9,9] + * select every nth elems = [3,7,9] (n = sum weight / 2) + */ + @VisibleForTesting + static void collapse( + long[] left, + int leftWeight, + long[] right, + int rightWeight, + long[] output) { + + int totalWeight = leftWeight + rightWeight; + int halfTotalWeight = (totalWeight / 2) - 1; + int i = 0, j = 0, k = 0, cnt = 0; + + int weight; + long smallest; + + while (i < left.length || j < right.length) { + if (i < left.length && (j == right.length || left[i] < right[j])) { + smallest = left[i]; + weight = leftWeight; + i++; + } else { + smallest = right[j]; + weight = rightWeight; + j++; + } + + int cur = (cnt + halfTotalWeight) / totalWeight; + cnt += weight; + int next = (cnt + halfTotalWeight) / totalWeight; + + for(; cur < next; cur++) { + output[k] = smallest; + k++; + } + } + } + +/** + * Optimized version of collapse for collapsing two array of the same weight + * (which is what we want most of the time) + */ + private static void collapse1( + long[] left, + long[] right, + long[] output) { + + int i = 0, j = 0, k = 0, cnt = 0; + long smallest; + + while (i < left.length || j < right.length) { + if (i < left.length && (j == right.length || left[i] < right[j])) { + smallest = left[i]; + i++; + } else { + smallest = right[j]; + j++; + } + if (cnt % 2 == 1) { + output[k] = smallest; + k++; + } + cnt++; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java new file mode 100644 index 0000000..024e67b --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/CounterMap.java @@ -0,0 +1,138 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; + +/** + * A map from a key type to integers. This simplifies the process of storing counters for multiple + * values of the same type. + */ +public class CounterMap <K> implements Iterable<Map.Entry<K, Integer>>, Cloneable { + private final Map<K, Integer> map = Maps.newHashMap(); + + private static Logger log = Logger.getLogger(CounterMap.class.getName()); + + /** + * Increments the counter value associated with {@code key}, and returns the new value. + * + * @param key The key to increment + * @return The incremented value. + */ + public int incrementAndGet(K key) { + return incrementAndGet(key, 1); + } + + /** + * Increments the value associated with {@code key} by {@code value}, returning the new value. + * + * @param key The key to increment + * @return The incremented value. + */ + public int incrementAndGet(K key, int count) { + Integer value = map.get(key); + if (value == null) { + value = 0; + } + int newValue = count + value; + map.put(key, newValue); + return newValue; + } + + /** + * Gets the value associated with a key. + * + * @param key The key to look up. + * @return The counter value stored for {@code key}, or 0 if no mapping exists. + */ + public int get(K key) { + if (!map.containsKey(key)) { + return 0; + } + + return map.get(key); + } + + /** + * Assigns a value to a key. + * + * @param key The key to assign a value to. + * @param newValue The value to assign. + */ + public void set(K key, int newValue) { + Preconditions.checkNotNull(key); + map.put(key, newValue); + } + + /** + * Resets the value for {@code key}. This will remove the key from the counter. + * + * @param key The key to reset. + */ + public void reset(K key) { + map.remove(key); + } + + /** + * Gets the number of entries stored in the map. + * + * @return The size of the map. + */ + public int size() { + return map.size(); + } + + /** + * Gets an iterator for the mapped values. + * + * @return Iterator for mapped values. + */ + public Iterator<Map.Entry<K, Integer>> iterator() { + return map.entrySet().iterator(); + } + + public Collection<Integer> values() { + return map.values(); + } + + public Set<K> keySet() { + return map.keySet(); + } + + public String toString() { + StringBuilder strVal = new StringBuilder(); + for (Map.Entry<K, Integer> entry : this) { + strVal.append(entry.getKey().toString()).append(": ").append(entry.getValue()).append('\n'); + } + return strVal.toString(); + } + + public Map<K, Integer> toMap() { + return map; + } + + @Override + public CounterMap<K> clone() { + CounterMap<K> newInstance = new CounterMap<K>(); + newInstance.map.putAll(map); + return newInstance; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java b/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java new file mode 100644 index 0000000..1e90e85 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/CounterMapWithTopKey.java @@ -0,0 +1,89 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.util.Map; + +/** + * Same as CounterMap<K>, but also keeps track of the item with the highest count. + */ +public class CounterMapWithTopKey<K> extends CounterMap<K> { + + private K mostCommonKey = null; + + /** + * Updates the most common key, if needed. + * + * @param key The key to check. + * @param count The count for the key. + * @return The count. + */ + private int updateMostCommon(K key, int count) { + if (count > get(mostCommonKey)) { + mostCommonKey = key; + } + return count; + } + + /** + * Increments the counter value associated with {@code key}, and returns the new value. + * + * @param key The key to increment + * @return The incremented value. + */ + @Override + public int incrementAndGet(K key) { + return updateMostCommon(key, super.incrementAndGet(key)); + } + + /** + * Assigns a value to a key. + * + * @param key The key to assign a value to. + * @param newValue The value to assign. + */ + @Override + public void set(K key, int newValue) { + super.set(key, updateMostCommon(key, newValue)); + } + + /** + * Resets the value for {@code key}. This will simply set the stored value to 0. + * The most common key is updated by scanning the entire map. + * + * @param key The key to reset. + */ + @Override + public void reset(K key) { + super.reset(key); + for (Map.Entry<K, Integer> entry : this) { + updateMostCommon(entry.getKey(), entry.getValue()); + } + } + + /** + * + * @return The key with the highest count in the map. If multiple keys have this count, return + * an arbitrary one. + */ + public K getMostCommonKey() { + return mostCommonKey; + } + + @Override + public String toString() { + return new StringBuilder(super.toString()).append(String.format("Most common key: %s\n", + mostCommonKey.toString())).toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java b/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java new file mode 100644 index 0000000..859ca7e --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/stats/Elapsed.java @@ -0,0 +1,81 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.stats; + +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; + +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * A stat that exports the amount of time since it was last reset. + * + * @author William Farner + */ +public class Elapsed { + + private final Ticker ticker; + private final AtomicLong lastEventNs = new AtomicLong(); + + /** + * Calls {@link #Elapsed(String, Time)} using a default granularity of nanoseconds. + * + * @param name Name of the stat to export. + */ + public Elapsed(String name) { + this(name, Time.NANOSECONDS); + } + + /** + * Equivalent to calling {@link #Elapsed(String, Time, Ticker)} passing {@code name}, + * {@code granularity} and {@link com.google.common.base.Ticker#systemTicker()}. + * <br/> + * @param name Name of the stat to export. + * @param granularity Time unit granularity to export. + */ + public Elapsed(String name, Time granularity) { + this(name, granularity, Ticker.systemTicker()); + } + + /** + * Creates and exports a new stat that maintains the difference between the tick time + * and the time since it was last reset. Upon export, the counter will act as though it were just + * reset. + * <br/> + * @param name Name of stat to export + * @param granularity Time unit granularity to export. + * @param ticker Ticker implementation + */ + public Elapsed(String name, final Time granularity, final Ticker ticker) { + MorePreconditions.checkNotBlank(name); + Preconditions.checkNotNull(granularity); + this.ticker = Preconditions.checkNotNull(ticker); + + reset(); + + Stats.export(new StatImpl<Long>(name) { + @Override public Long read() { + return Amount.of(ticker.read() - lastEventNs.get(), Time.NANOSECONDS).as(granularity); + } + }); + } + + public void reset() { + lastEventNs.set(ticker.read()); + } +}