http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java deleted file mode 100644 index 3a30b58..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/callers/RetryingCaller.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.callers; - -import java.lang.reflect.Method; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import org.apache.thrift.async.AsyncMethodCallback; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.thrift.TResourceExhaustedException; - -/** -* A caller that will retry calls to the wrapped caller. -* -* @author William Farner -*/ -public class RetryingCaller extends CallerDecorator { - private static final Logger LOG = Logger.getLogger(RetryingCaller.class.getName()); - - @VisibleForTesting - public static final Amount<Long, Time> NONBLOCKING_TIMEOUT = Amount.of(-1L, Time.MILLISECONDS); - - private final StatsProvider statsProvider; - private final String serviceName; - private final int retries; - private final ImmutableSet<Class<? extends Exception>> retryableExceptions; - private final boolean debug; - - /** - * Creates a new retrying caller. The retrying caller will attempt to call invoked methods on the - * underlying caller at most {@code retries} times. A retry will be performed only when one of - * the {@code retryableExceptions} is caught. - * - * @param decoratedCall The caller to decorate with retries. - * @param async Whether the caller is asynchronous. - * @param statsProvider The stat provider to export retry statistics through. - * @param serviceName The service name that calls are being invoked on. - * @param retries The maximum number of retries to perform. - * @param retryableExceptions The exceptions that can be retried. - * @param debug Whether to include debugging information when retries are being performed. - */ - public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsProvider, - String serviceName, int retries, ImmutableSet<Class<? extends Exception>> retryableExceptions, - boolean debug) { - super(decoratedCall, async); - this.statsProvider = statsProvider; - this.serviceName = serviceName; - this.retries = retries; - this.retryableExceptions = retryableExceptions; - this.debug = debug; - } - - private final LoadingCache<Method, AtomicLong> stats = - CacheBuilder.newBuilder().build(new CacheLoader<Method, AtomicLong>() { - @Override public AtomicLong load(Method method) { - // Thrift does not support overloads - so just the name disambiguates all calls. - return statsProvider.makeCounter(serviceName + "_" + method.getName() + "_retries"); - } - }); - - @Override public Object call(final Method method, final Object[] args, - @Nullable final AsyncMethodCallback callback, - @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Throwable { - final AtomicLong retryCounter = stats.get(method); - final AtomicInteger attempts = new AtomicInteger(); - final List<Throwable> exceptions = Lists.newArrayList(); - - final ResultCapture capture = new ResultCapture() { - @Override public void success() { - // No-op. - } - - @Override public boolean fail(Throwable t) { - if (!isRetryable(t)) { - if (debug) { - LOG.warning(String.format( - "Call failed with un-retryable exception of [%s]: %s, previous exceptions: %s", - t.getClass().getName(), t.getMessage(), combineStackTraces(exceptions))); - } - - return true; - } else if (attempts.get() >= retries) { - exceptions.add(t); - - if (debug) { - LOG.warning(String.format("Retried %d times, last error: %s, exceptions: %s", - attempts.get(), t, combineStackTraces(exceptions))); - } - - return true; - } else { - exceptions.add(t); - - if (isAsync() && attempts.incrementAndGet() <= retries) { - try { - retryCounter.incrementAndGet(); - // override connect timeout in ThriftCaller to prevent blocking for a connection - // for async retries (since this is within the callback in the selector thread) - invoke(method, args, callback, this, NONBLOCKING_TIMEOUT); - } catch (Throwable throwable) { - return fail(throwable); - } - } - - return false; - } - } - }; - - boolean continueLoop; - do { - try { - // If this is an async call, the looping will be handled within the capture. - return invoke(method, args, callback, capture, connectTimeoutOverride); - } catch (Throwable t) { - if (!isRetryable(t)) { - Throwable propagated = t; - - if (!exceptions.isEmpty() && (t instanceof TResourceExhaustedException)) { - // If we've been trucking along through retries that have had remote call failures - // and we suddenly can't immediately get a connection on the next retry, throw the - // previous remote call failure - the idea here is that the remote call failure is - // more interesting than a transient inability to get an immediate connection. - propagated = exceptions.remove(exceptions.size() - 1); - } - - if (isAsync()) { - callback.onError((Exception) propagated); - } else { - throw propagated; - } - } - } - - continueLoop = !isAsync() && attempts.incrementAndGet() <= retries; - if (continueLoop) retryCounter.incrementAndGet(); - } while (continueLoop); - - Throwable lastRetriedException = Iterables.getLast(exceptions); - if (debug) { - if (!exceptions.isEmpty()) { - LOG.warning( - String.format("Retried %d times, last error: %s, previous exceptions: %s", - attempts.get(), lastRetriedException, combineStackTraces(exceptions))); - } else { - LOG.warning( - String.format("Retried 1 time, last error: %s", lastRetriedException)); - } - } - - if (!isAsync()) throw lastRetriedException; - return null; - } - - private boolean isRetryable(Throwable throwable) { - return isRetryable.getUnchecked(throwable.getClass()); - } - - private final LoadingCache<Class<? extends Throwable>, Boolean> isRetryable = - CacheBuilder.newBuilder().build(new CacheLoader<Class<? extends Throwable>, Boolean>() { - @Override public Boolean load(Class<? extends Throwable> exceptionClass) { - return isRetryable(exceptionClass); - } - }); - - private boolean isRetryable(final Class<? extends Throwable> exceptionClass) { - if (retryableExceptions.contains(exceptionClass)) { - return true; - } - return Iterables.any(retryableExceptions, new Predicate<Class<? extends Exception>>() { - @Override public boolean apply(Class<? extends Exception> retryableExceptionClass) { - return retryableExceptionClass.isAssignableFrom(exceptionClass); - } - }); - } - - private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n'); - - private static String combineStackTraces(List<Throwable> exceptions) { - if (exceptions.isEmpty()) { - return "none"; - } else { - return STACK_TRACE_JOINER.join(Iterables.transform(exceptions, - new Function<Throwable, String>() { - private int index = 1; - @Override public String apply(Throwable exception) { - return String.format("[%d] %s", - index++, Throwables.getStackTraceAsString(exception)); - } - })); - } - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java deleted file mode 100644 index 083a748..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/callers/StatTrackingCaller.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.callers; - -import java.lang.reflect.Method; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import javax.annotation.Nullable; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - -import org.apache.thrift.async.AsyncMethodCallback; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.stats.StatsProvider.RequestTimer; -import com.twitter.common.thrift.TResourceExhaustedException; -import com.twitter.common.thrift.TTimeoutException; - -/** - * A caller that exports statistics about calls made to the wrapped caller. - * - * @author William Farner - */ -public class StatTrackingCaller extends CallerDecorator { - - private final StatsProvider statsProvider; - private final String serviceName; - - private final LoadingCache<Method, RequestTimer> stats = - CacheBuilder.newBuilder().build(new CacheLoader<Method, RequestTimer>() { - @Override public RequestTimer load(Method method) { - // Thrift does not support overloads - so just the name disambiguates all calls. - return statsProvider.makeRequestTimer(serviceName + "_" + method.getName()); - } - }); - - /** - * Creates a new stat tracking caller, which will export stats to the given {@link StatsProvider}. - * - * @param decoratedCaller The caller to decorate with a deadline. - * @param async Whether the caller is asynchronous. - * @param statsProvider The stat provider to export statistics to. - * @param serviceName The name of the service that methods are being called on. - */ - public StatTrackingCaller(Caller decoratedCaller, boolean async, StatsProvider statsProvider, - String serviceName) { - super(decoratedCaller, async); - - this.statsProvider = statsProvider; - this.serviceName = serviceName; - } - - @Override - public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, - @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable { - final RequestTimer requestStats = stats.get(method); - final long startTime = System.nanoTime(); - - ResultCapture capture = new ResultCapture() { - @Override public void success() { - requestStats.requestComplete(TimeUnit.NANOSECONDS.toMicros( - System.nanoTime() - startTime)); - } - - @Override public boolean fail(Throwable t) { - // TODO(John Sirois): the ruby client reconnects for timeouts too - this provides a natural - // backoff mechanism - consider how to plumb something similar. - if (t instanceof TTimeoutException || t instanceof TimeoutException) { - requestStats.incTimeouts(); - return true; - } - - // TODO(John Sirois): consider ditching reconnects since its nearly redundant with errors as - // it stands. - if (!(t instanceof TResourceExhaustedException)) { - requestStats.incReconnects(); - } - // TODO(John Sirois): provide more detailed stats: track counts for distinct exceptions types, - // track retries-per-method, etc... - requestStats.incErrors(); - return true; - } - }; - - return invoke(method, args, callback, capture, connectTimeoutOverride); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java deleted file mode 100644 index 24a10b0..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/callers/ThriftCaller.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.callers; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.twitter.common.net.pool.Connection; -import com.twitter.common.net.pool.ObjectPool; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.net.pool.ResourceExhaustedException; -import com.twitter.common.thrift.TResourceExhaustedException; -import com.twitter.common.thrift.TTimeoutException; -import com.twitter.common.net.loadbalancing.RequestTracker; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.transport.TTransport; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; - -/** - * A caller that issues calls to a target that is assumed to be a client to a thrift service. - * - * @author William Farner - */ -public class ThriftCaller<T> implements Caller { - private static final Logger LOG = Logger.getLogger(ThriftCaller.class.getName()); - - private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool; - private final RequestTracker<InetSocketAddress> requestTracker; - private final Function<TTransport, T> clientFactory; - private final Amount<Long, Time> timeout; - private final boolean debug; - - /** - * Creates a new thrift caller. - * - * @param connectionPool The connection pool to use. - * @param requestTracker The request tracker to nofify of request results. - * @param clientFactory Factory to use for building client object instances. - * @param timeout The timeout to use when requesting objects from the connection pool. - * @param debug Whether to use the caller in debug mode. - */ - public ThriftCaller(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, Function<TTransport, T> clientFactory, - Amount<Long, Time> timeout, boolean debug) { - - this.connectionPool = connectionPool; - this.requestTracker = requestTracker; - this.clientFactory = clientFactory; - this.timeout = timeout; - this.debug = debug; - } - - @Override - public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, - @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable { - - final Connection<TTransport, InetSocketAddress> connection = getConnection(connectTimeoutOverride); - final long startNanos = System.nanoTime(); - - ResultCapture capture = new ResultCapture() { - @Override public void success() { - try { - requestTracker.requestResult(connection.getEndpoint(), - RequestTracker.RequestResult.SUCCESS, System.nanoTime() - startNanos); - } finally { - connectionPool.release(connection); - } - } - - @Override public boolean fail(Throwable t) { - if (debug) { - LOG.warning(String.format("Call to endpoint: %s failed: %s", connection, t)); - } - - try { - requestTracker.requestResult(connection.getEndpoint(), - RequestTracker.RequestResult.FAILED, System.nanoTime() - startNanos); - } finally { - connectionPool.remove(connection); - } - return true; - } - }; - - return invokeMethod(clientFactory.apply(connection.get()), method, args, callback, capture); - } - - private static Object invokeMethod(Object target, Method method, Object[] args, - AsyncMethodCallback callback, final ResultCapture capture) throws Throwable { - - // Swap the wrapped callback out for ours. - if (callback != null) { - callback = new WrappedMethodCallback(callback, capture); - - List<Object> argsList = Lists.newArrayList(args); - argsList.add(callback); - args = argsList.toArray(); - } - - try { - Object result = method.invoke(target, args); - if (callback == null) capture.success(); - - return result; - } catch (InvocationTargetException t) { - // We allow this one to go to both sync and async captures. - if (callback != null) { - callback.onError((Exception) t.getCause()); - return null; - } else { - capture.fail(t.getCause()); - throw t.getCause(); - } - } - } - - private Connection<TTransport, InetSocketAddress> getConnection( - Amount<Long, Time> connectTimeoutOverride) - throws TResourceExhaustedException, TTimeoutException { - try { - Connection<TTransport, InetSocketAddress> connection; - if (connectTimeoutOverride != null) { - connection = connectionPool.get(connectTimeoutOverride); - } else { - connection = (timeout.getValue() > 0) - ? connectionPool.get(timeout) : connectionPool.get(); - } - - if (connection == null) { - throw new TResourceExhaustedException("no connection was available"); - } - return connection; - } catch (ResourceExhaustedException e) { - throw new TResourceExhaustedException(e); - } catch (TimeoutException e) { - throw new TTimeoutException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java b/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java deleted file mode 100644 index 5c4d841..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.monitoring; - -import com.google.common.base.Preconditions; -import com.twitter.common.net.monitoring.ConnectionMonitor; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.transport.TTransportException; - -import java.net.InetSocketAddress; - -/** - * Extension of TNonblockingServerSocket that allows for tracking of connected clients. - * - * @author William Farner - */ -public class TMonitoredNonblockingServerSocket extends TNonblockingServerSocket { - private final ConnectionMonitor monitor; - - public TMonitoredNonblockingServerSocket(int port, ConnectionMonitor monitor) - throws TTransportException { - super(port); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredNonblockingServerSocket(int port, int clientTimeout, ConnectionMonitor monitor) - throws TTransportException { - super(port, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, ConnectionMonitor monitor) - throws TTransportException { - super(bindAddr); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout, - ConnectionMonitor monitor) throws TTransportException { - super(bindAddr, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - @Override - protected TNonblockingSocket acceptImpl() throws TTransportException { - /* TODO(William Farner): Finish implementing...may require an object proxy. - final TNonblockingSocket socket = super.acceptImpl(); - - TNonblockingSocket wrappedSocket = new TNonblockingSocket(socket.get) { - @Override public void close() { - super.close(); - monitor.disconnected(this); - } - }; - - monitor.connected(wrappedSocket, socket.getSocket().getInetAddress()); - - return wrappedSocket; - - */ - return super.acceptImpl(); - } - - @Override - public void close() { - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java b/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java deleted file mode 100644 index bf0c6d3..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredProcessor.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.monitoring; - -import com.google.common.base.Preconditions; -import com.twitter.common.net.loadbalancing.RequestTracker; -import org.apache.thrift.TException; -import org.apache.thrift.TProcessor; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSocket; - -import java.net.InetSocketAddress; - -import static com.twitter.common.net.loadbalancing.RequestTracker.RequestResult.*; - -/** - * A TProcessor that joins a wrapped TProcessor with a monitor. - * - * @author William Farner - */ -public class TMonitoredProcessor implements TProcessor { - private final TProcessor wrapped; - private final TMonitoredServerSocket monitoredServerSocket; - private final RequestTracker<InetSocketAddress> monitor; - - public TMonitoredProcessor(TProcessor wrapped, TMonitoredServerSocket monitoredServerSocket, - RequestTracker<InetSocketAddress> monitor) { - this.wrapped = Preconditions.checkNotNull(wrapped); - this.monitoredServerSocket = Preconditions.checkNotNull(monitoredServerSocket); - this.monitor = Preconditions.checkNotNull(monitor); - } - - @Override - public boolean process(TProtocol in, TProtocol out) throws TException { - long startNanos = System.nanoTime(); - boolean exceptionThrown = false; - try { - return wrapped.process(in, out); - } catch (TException e) { - exceptionThrown = true; - throw e; - } finally { - InetSocketAddress address = monitoredServerSocket.getAddress((TSocket) in.getTransport()); - Preconditions.checkState(address != null, - "Address unknown for transport " + in.getTransport()); - - monitor.requestResult(address, exceptionThrown ? FAILED : SUCCESS, - System.nanoTime() - startNanos); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java b/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java deleted file mode 100644 index 38b3c73..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/monitoring/TMonitoredServerSocket.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.monitoring; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.twitter.common.net.monitoring.ConnectionMonitor; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransportException; - -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.Collections; -import java.util.Map; - -/** - * Extension of TServerSocket that allows for tracking of connected clients. - * - * @author William Farner - */ -public class TMonitoredServerSocket extends TServerSocket { - private ConnectionMonitor<InetSocketAddress> monitor; - - public TMonitoredServerSocket(ServerSocket serverSocket, - ConnectionMonitor<InetSocketAddress> monitor) { - super(serverSocket); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(ServerSocket serverSocket, int clientTimeout, - ConnectionMonitor<InetSocketAddress> monitor) { - super(serverSocket, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(int port, ConnectionMonitor<InetSocketAddress> monitor) - throws TTransportException { - super(port); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(int port, int clientTimeout, - ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException { - super(port, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(InetSocketAddress bindAddr, - ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException { - super(bindAddr); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(InetSocketAddress bindAddr, int clientTimeout, - ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException { - super(bindAddr, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - private final Map<TSocket, InetSocketAddress> addressMap = - Collections.synchronizedMap(Maps.<TSocket, InetSocketAddress>newHashMap()); - - public InetSocketAddress getAddress(TSocket socket) { - return addressMap.get(socket); - } - - @Override - protected TSocket acceptImpl() throws TTransportException { - final TSocket socket = super.acceptImpl(); - final InetSocketAddress remoteAddress = - (InetSocketAddress) socket.getSocket().getRemoteSocketAddress(); - - TSocket monitoredSocket = new TSocket(socket.getSocket()) { - boolean closed = false; - - @Override public void close() { - try { - super.close(); - } finally { - if (!closed) { - monitor.released(remoteAddress); - addressMap.remove(this); - } - closed = true; - } - } - }; - - addressMap.put(monitoredSocket, remoteAddress); - - monitor.connected(remoteAddress); - return monitoredSocket; - } - - @Override - public void close() { - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java b/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java deleted file mode 100644 index 330403b..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/testing/MockTSocket.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.testing; - -import org.apache.thrift.transport.TSocket; - -/** - * @author William Farner - */ -public class MockTSocket extends TSocket { - public static final String HOST = "dummyHost"; - public static final int PORT = 1000; - - private boolean connected = false; - - public MockTSocket() { - super(HOST, PORT); - } - - @Override - public void open() { - connected = true; - // TODO(William Farner): Allow for failure injection here by throwing TTransportException. - } - - @Override - public boolean isOpen() { - return connected; - } - - public void close() { - connected = false; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java b/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java deleted file mode 100644 index 5225060..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/testing/TestThriftTypes.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.testing; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import org.apache.thrift.TBase; -import org.apache.thrift.TBaseHelper; -import org.apache.thrift.TException; -import org.apache.thrift.TFieldIdEnum; -import org.apache.thrift.protocol.TField; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TStruct; -import org.apache.thrift.protocol.TType; - -import java.util.Map; -import java.util.Map.Entry; - -/** - * Hand-coded thrift types for use in tests. - * - * @author John Sirois - */ -public class TestThriftTypes { - public static class Field implements TFieldIdEnum { - private static final Map<Short, Field> FIELDS_BY_ID = Maps.newHashMap(); - public static Field forId(int id) { - Field field = FIELDS_BY_ID.get((short) id); - Preconditions.checkArgument(field != null, "No Field with id: %s", id); - return field; - } - - public static final Field NAME = new Field((short) 0, "name"); - public static final Field VALUE = new Field((short) 1, "value"); - - private final short fieldId; - private final String fieldName; - - private Field(short fieldId, String fieldName) { - this.fieldId = fieldId; - this.fieldName = fieldName; - FIELDS_BY_ID.put(fieldId, this); - } - - @Override - public short getThriftFieldId() { - return fieldId; - } - - @Override - public String getFieldName() { - return fieldName; - } - } - - public static class Struct implements TBase<Struct, Field> { - private final Map<Field, Object> fields = Maps.newHashMap(); - - public Struct() {} - - public Struct(String name, String value) { - fields.put(Field.NAME, name); - fields.put(Field.VALUE, value); - } - - public String getName() { - Object name = getFieldValue(Field.NAME); - return name == null ? null : (String) name; - } - - public String getValue() { - Object value = getFieldValue(Field.VALUE); - return value == null ? null : (String) value; - } - - @Override - public void read(TProtocol tProtocol) throws TException { - tProtocol.readStructBegin(); - TField field; - while((field = tProtocol.readFieldBegin()).type != TType.STOP) { - fields.put(fieldForId(field.id), tProtocol.readString()); - tProtocol.readFieldEnd(); - } - tProtocol.readStructEnd(); - } - - @Override - public void write(TProtocol tProtocol) throws TException { - tProtocol.writeStructBegin(new TStruct("Field")); - for (Entry<Field, Object> entry : fields.entrySet()) { - Field field = entry.getKey(); - tProtocol.writeFieldBegin( - new TField(field.getFieldName(), TType.STRING, field.getThriftFieldId())); - tProtocol.writeString(entry.getValue().toString()); - tProtocol.writeFieldEnd(); - } - tProtocol.writeFieldStop(); - tProtocol.writeStructEnd(); - } - - @Override - public boolean isSet(Field field) { - return fields.containsKey(field); - } - - @Override - public Object getFieldValue(Field field) { - return fields.get(field); - } - - @Override - public void setFieldValue(Field field, Object o) { - fields.put(field, o); - } - - @Override - public TBase<Struct, Field> deepCopy() { - Struct struct = new Struct(); - struct.fields.putAll(fields); - return struct; - } - - @Override - public int compareTo(Struct other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison; - - lastComparison = Integer.valueOf(fields.size()).compareTo(other.fields.size()); - if (lastComparison != 0) { - return lastComparison; - } - - for (Map.Entry<Field, Object> entry : fields.entrySet()) { - Field field = entry.getKey(); - lastComparison = Boolean.TRUE.compareTo(other.isSet(field)); - if (lastComparison != 0) { - return lastComparison; - } - lastComparison = TBaseHelper.compareTo(entry.getValue(), other.getFieldValue(field)); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - @Override - public void clear() { - fields.clear(); - } - - @Override - public Field fieldForId(int fieldId) { - return Field.forId(fieldId); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BackoffDecider.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BackoffDecider.java b/commons/src/main/java/com/twitter/common/util/BackoffDecider.java deleted file mode 100644 index 117970a..0000000 --- a/commons/src/main/java/com/twitter/common/util/BackoffDecider.java +++ /dev/null @@ -1,663 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stats; -import com.twitter.common.stats.StatsProvider; - -import javax.annotation.Nullable; -import java.util.Deque; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -/** - * Handles logic for deciding whether to back off from calls to a backend. - * - * This works by offering a guard method {@link #shouldBackOff()}, which instructs the caller - * whether they should avoid making the call. The backoff logic will maintain statistics about - * the failure rate, and push into a backoff state (silent period) when the failure rate exceeds - * the configured threshold. At the end of the quiet period, a recovery state will be entered, - * during which the decider will allow traffic to ramp back up to full capacity. - * - * The expected use case looks something like this: - * - * <pre> - * void sendRequestGuarded() { - * if (!decider.shouldBackOff()) { - * boolean success = sendRequestUnguarded(); - * if (success) { - * decider.addSuccess(); - * } else { - * decider.addFailure(); - * } - * } - * } - * </pre> - * - * @author William Farner - */ -public class BackoffDecider { - private static final Logger LOG = Logger.getLogger(BackoffDecider.class.getName()); - - // The group that this decider is a part of. - private final Iterable<BackoffDecider> deciderGroup; - - private final TimedStateMachine stateMachine; - - private final String name; - - private final double toleratedFailureRate; - - @VisibleForTesting final RequestWindow requests; - - // Used to calculate backoff durations when in backoff state. - private final BackoffStrategy strategy; - - private final Amount<Long, Time> recoveryPeriod; - private long previousBackoffPeriodNs = 0; - - // Used for random selection during recovery period. - private final Random random; - - private final Clock clock; - private final AtomicLong backoffs; - private final RecoveryType recoveryType; - - /** - * Different types of recovery mechanisms to use after exiting the backoff state. - */ - public static enum RecoveryType { - // Randomly allows traffic to flow through, with a linearly-ascending probability. - RANDOM_LINEAR, - // Allows full traffic capacity to flow during the recovery period. - FULL_CAPACITY - } - - private BackoffDecider(String name, int seedSize, double toleratedFailureRate, - @Nullable Iterable<BackoffDecider> deciderGroup, BackoffStrategy strategy, - @Nullable Amount<Long, Time> recoveryPeriod, - long requestWindowNs, int numBuckets, RecoveryType recoveryType, StatsProvider statsProvider, - Random random, Clock clock) { - MorePreconditions.checkNotBlank(name); - Preconditions.checkArgument(seedSize > 0); - Preconditions.checkArgument(toleratedFailureRate >= 0 && toleratedFailureRate < 1.0); - Preconditions.checkNotNull(strategy); - Preconditions.checkArgument(recoveryPeriod == null || recoveryPeriod.getValue() > 0); - Preconditions.checkArgument(requestWindowNs > 0); - Preconditions.checkArgument(numBuckets > 0); - Preconditions.checkNotNull(recoveryType); - Preconditions.checkNotNull(statsProvider); - Preconditions.checkNotNull(random); - Preconditions.checkNotNull(clock); - - this.name = name; - this.toleratedFailureRate = toleratedFailureRate; - this.deciderGroup = deciderGroup; - this.strategy = strategy; - this.recoveryPeriod = recoveryPeriod; - this.recoveryType = recoveryType; - - this.random = random; - this.clock = clock; - - this.backoffs = statsProvider.makeCounter(name + "_backoffs"); - this.requests = new RequestWindow(requestWindowNs, numBuckets, seedSize); - - this.stateMachine = new TimedStateMachine(name); - } - - /** - * Checks whether the caller should back off and if not then returns immediately; otherwise the - * method blocks until it is safe for the caller to proceed without backing off further based on - * all data available at the time of this call. - * - * @return the amount of time in nanoseconds spent awaiting backoff - * @throws InterruptedException if the calling thread was interrupted while backing off - */ - public long awaitBackoff() throws InterruptedException { - if (shouldBackOff()) { - long backoffTimeMs = stateMachine.getStateRemainingMs(); - - if (backoffTimeMs > 0) { - // Wait without holding any external locks. - Object waitCondition = new Object(); - synchronized (waitCondition) { - waitCondition.wait(backoffTimeMs); - } - return backoffTimeMs; - } - } - return 0; - } - - /** - * Checks whether this decider instructs the caller that it should back off from the associated - * backend. This is determined based on the response history for the backend as well as the - * backoff state of the decider group (if configured). - * - * @return {@code true} if the decider is in backoff mode, otherwise {@code false}. - */ - @SuppressWarnings("fallthrough") - public synchronized boolean shouldBackOff() { - - boolean preventRequest; - switch (stateMachine.getState()) { - case NORMAL: - preventRequest = false; - break; - - case BACKOFF: - if (deciderGroup != null && allOthersBackingOff()) { - LOG.info("Backends in group with " + name + " down, forcing back up."); - stateMachine.transitionUnbounded(State.FORCED_NORMAL); - return false; - } else if (stateMachine.isStateExpired()) { - long recoveryPeriodNs = recoveryPeriod == null ? stateMachine.getStateDurationNs() - : recoveryPeriod.as(Time.NANOSECONDS); - - // The silent period has expired, move to recovery state (and drop to its case block). - stateMachine.transition(State.RECOVERY, recoveryPeriodNs); - LOG.info(String.format("%s recovering for %s ms", name, - Amount.of(recoveryPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); - } else { - preventRequest = true; - break; - } - - case RECOVERY: - if (deciderGroup != null && allOthersBackingOff()) { - return false; - } else if (stateMachine.isStateExpired()) { - // We have reached the end of the recovery period, return to normal. - stateMachine.transitionUnbounded(State.NORMAL); - previousBackoffPeriodNs = 0; - preventRequest = false; - } else { - switch (recoveryType) { - case RANDOM_LINEAR: - // In the recovery period, allow request rate to return linearly to the full load. - preventRequest = random.nextDouble() > stateMachine.getStateFractionComplete(); - break; - case FULL_CAPACITY: - preventRequest = false; - break; - default: - throw new IllegalStateException("Unhandled recovery type " + recoveryType); - } - } - - break; - - case FORCED_NORMAL: - if (!allOthersBackingOff()) { - // We were in forced normal state, but at least one other backend is up, try recovering. - stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs()); - preventRequest = false; - } else { - preventRequest = true; - } - - break; - - default: - LOG.severe("Unrecognized state: " + stateMachine.getState()); - preventRequest = false; - } - - if (preventRequest) { - backoffs.incrementAndGet(); - } - return preventRequest; - } - - private boolean allOthersBackingOff() { - // Search for another decider that is not backing off. - for (BackoffDecider decider : deciderGroup) { - State deciderState = decider.stateMachine.getState(); - boolean inBackoffState = deciderState == State.BACKOFF || deciderState == State.FORCED_NORMAL; - if ((decider != this) && !inBackoffState) { - return false; - } - } - - return true; - } - - /** - * Records a failed request to the backend. - */ - public void addFailure() { - addResult(false); - } - - /** - * Records a successful request to the backend. - */ - public void addSuccess() { - addResult(true); - } - - /** - * Transitions the state to BACKOFF and logs a message appropriately if it is doing so because of high fail rate - * or by force. - * - * @param failRate rate of request failures on this host. - * @param force if {@code true}, forces the transition to BACKOFF. Typically used in cases when the host - * was not found to be alive by LiveHostChecker. - */ - public synchronized void transitionToBackOff(double failRate, boolean force) { - long prevBackoffMs = Amount.of(previousBackoffPeriodNs, Time.NANOSECONDS) - .as(Time.MILLISECONDS); - - long backoffPeriodNs = Amount.of(strategy.calculateBackoffMs(prevBackoffMs), Time.MILLISECONDS) - .as(Time.NANOSECONDS); - if (!force) { - LOG.info(String.format("%s failure rate at %g, backing off for %s ms", name,failRate, - Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); - } else { - LOG.info(String.format("%s forced to back off for %s ms", name, - Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); - } - stateMachine.transition(State.BACKOFF, backoffPeriodNs); - previousBackoffPeriodNs = backoffPeriodNs; - } - - @SuppressWarnings("fallthrough") - private synchronized void addResult(boolean success) { - // Disallow statistics updating if we are in backoff state. - if (stateMachine.getState() == State.BACKOFF) { - return; - } - - requests.addResult(success); - double failRate = requests.getFailureRate(); - boolean highFailRate = requests.isSeeded() && (failRate > toleratedFailureRate); - - switch (stateMachine.getState()) { - case NORMAL: - if (!highFailRate) { - // No-op. - break; - } else { - // Artificially move into recovery state (by falling through) with a zero-duration - // time window, to trigger the initial backoff period. - stateMachine.setStateDurationNs(0); - } - - case RECOVERY: - if (highFailRate) { - // We were trying to recover, and the failure rate is still too high. Go back to - // backoff state for a longer duration. - requests.reset(); - - // transition the state machine to BACKOFF state, due to high fail rate. - transitionToBackOff(failRate, false); - } else { - // Do nothing. We only exit the recovery state by expiration. - } - break; - - case FORCED_NORMAL: - if (!highFailRate) { - stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs()); - } - break; - - case BACKOFF: - throw new IllegalStateException("Backoff state may only be exited by expiration."); - } - } - - /** - * Creates a builder object. - * - * @param name Name for the backoff decider to build. - * @return A builder. - */ - public static Builder builder(String name) { - return new Builder(name); - } - - /** - * Builder class to configure a BackoffDecider. - * - * The builder allows for customization of many different parameters to the BackoffDecider, while - * defining defaults wherever possible. The following defaults are used: - * - * <ul> - * <li> seed size - The number of requests to accumulate before a backoff will be considered. - * 100 - * - * <li> tolerated failure rate - Maximum failure rate before backing off. - * 0.5 - * - * <li> decider group - Group this decider is a part of, to prevent complete backend failure. - * null (disabled) - * - * <li> strategy - Used to calculate subsequent backoff durations. - * TruncatedBinaryBackoff, initial 100 ms, max 10s - * - * <li> recovery period - Fixed recovery period while ramping traffic back to full capacity.. - * null (use last backoff period) - * - * <li> request window - Duration of the sliding window of requests to track statistics for. - * 10 seconds - * - * <li> num buckets - The number of time slices within the request window, for stat expiration. - * The sliding request window advances in intervals of request window / num buckets. - * 100 - * - * <li> recovery type - Defines behavior during the recovery period, and how traffic is permitted. - * random linear - * - * <li> stat provider - The stats provider to export statistics to. - * Stats.STATS_PROVIDER - * </ul> - * - */ - public static class Builder { - private String name; - private int seedSize = 100; - private double toleratedFailureRate = 0.5; - private Set<BackoffDecider> deciderGroup = null; - private BackoffStrategy strategy = new TruncatedBinaryBackoff( - Amount.of(100L, Time.MILLISECONDS), Amount.of(10L, Time.SECONDS)); - private Amount<Long, Time> recoveryPeriod = null; - private long requestWindowNs = Amount.of(10L, Time.SECONDS).as(Time.NANOSECONDS); - private int numBuckets = 100; - private RecoveryType recoveryType = RecoveryType.RANDOM_LINEAR; - private StatsProvider statsProvider = Stats.STATS_PROVIDER; - private Random random = Random.Util.newDefaultRandom(); - private Clock clock = Clock.SYSTEM_CLOCK; - - Builder(String name) { - this.name = name; - } - - /** - * Sets the number of requests that must be accumulated before the error rate will be - * calculated. This improves the genesis problem where the first few requests are errors, - * causing flapping in and out of backoff state. - * - * @param seedSize Request seed size. - * @return A reference to the builder. - */ - public Builder withSeedSize(int seedSize) { - this.seedSize = seedSize; - return this; - } - - /** - * Sets the tolerated failure rate for the decider. If the rate is exceeded for the time - * window, the decider begins backing off. - * - * @param toleratedRate The tolerated failure rate (between 0.0 and 1.0, exclusive). - * @return A reference to the builder. - */ - public Builder withTolerateFailureRate(double toleratedRate) { - this.toleratedFailureRate = toleratedRate; - return this; - } - - /** - * Makes the decider a part of a group. When a decider is a part of a group, it will monitor - * the other deciders to ensure that all deciders do not back off at once. - * - * @param deciderGroup Group to make this decider a part of. More deciders may be added to the - * group after this call is made. - * @return A reference to the builder. - */ - public Builder groupWith(Set<BackoffDecider> deciderGroup) { - this.deciderGroup = deciderGroup; - return this; - } - - /** - * Overrides the default backoff strategy. - * - * @param strategy Backoff strategy to use. - * @return A reference to the builder. - */ - public Builder withStrategy(BackoffStrategy strategy) { - this.strategy = strategy; - return this; - } - - /** - * Overrides the default recovery period behavior. By default, the recovery period is equal - * to the previous backoff period (which is equivalent to setting the recovery period to null - * here). A non-null value here will assign a fixed recovery period. - * - * @param recoveryPeriod Fixed recovery period. - * @return A reference to the builder. - */ - public Builder withRecoveryPeriod(@Nullable Amount<Long, Time> recoveryPeriod) { - this.recoveryPeriod = recoveryPeriod; - return this; - } - - /** - * Sets the time window over which to analyze failures. Beyond the time window, request history - * is discarded (and ignored). - * - * @param requestWindow The analysis time window. - * @return A reference to the builder. - */ - public Builder withRequestWindow(Amount<Long, Time> requestWindow) { - this.requestWindowNs = requestWindow.as(Time.NANOSECONDS); - return this; - } - - /** - * Sets the number of time slices that the decider will use to partition aggregate statistics. - * - * @param numBuckets Bucket count. - * @return A reference to the builder. - */ - public Builder withBucketCount(int numBuckets) { - this.numBuckets = numBuckets; - return this; - } - - /** - * Sets the recovery mechanism to use when in the recovery period. - * - * @param recoveryType The recovery mechanism to use. - * @return A reference to the builder. - */ - public Builder withRecoveryType(RecoveryType recoveryType) { - this.recoveryType = recoveryType; - return this; - } - - /** - * Sets the stats provider that statistics should be exported to. - * - * @param statsProvider Stats provider to use. - * @return A reference to the builder. - */ - public Builder withStatsProvider(StatsProvider statsProvider) { - this.statsProvider = statsProvider; - return this; - } - - @VisibleForTesting public Builder withRandom(Random random) { - this.random = random; - return this; - } - - @VisibleForTesting public Builder withClock(Clock clock) { - this.clock = clock; - return this; - } - - /** - * Gets a reference to the built decider object. - * @return A decider object. - */ - public BackoffDecider build() { - BackoffDecider decider = new BackoffDecider(name, seedSize, toleratedFailureRate, - deciderGroup, strategy, recoveryPeriod, requestWindowNs, numBuckets, recoveryType, - statsProvider, random, clock); - if (deciderGroup != null) deciderGroup.add(decider); - return decider; - } - } - - private class TimeSlice { - int requestCount = 0; - int failureCount = 0; - final long bucketStartNs; - - public TimeSlice() { - bucketStartNs = clock.nowNanos(); - } - } - - class RequestWindow { - // These store the sum of the respective fields contained within buckets. Doing so removes the - // need to accumulate the counts within the buckets every time the backoff state is - // recalculated. - @VisibleForTesting long totalRequests = 0; - @VisibleForTesting long totalFailures = 0; - - private final long durationNs; - private final long bucketLengthNs; - private final int seedSize; - - // Stores aggregate request/failure counts for time slices. - private final Deque<TimeSlice> buckets = Lists.newLinkedList(); - - RequestWindow(long durationNs, int bucketCount, int seedSize) { - this.durationNs = durationNs; - this.bucketLengthNs = durationNs / bucketCount; - buckets.addFirst(new TimeSlice()); - this.seedSize = seedSize; - } - - void reset() { - totalRequests = 0; - totalFailures = 0; - buckets.clear(); - buckets.addFirst(new TimeSlice()); - } - - void addResult(boolean success) { - maybeShuffleBuckets(); - buckets.peekFirst().requestCount++; - totalRequests++; - - if (!success) { - buckets.peekFirst().failureCount++; - totalFailures++; - } - } - - void maybeShuffleBuckets() { - // Check if the first bucket is still relevant. - if (clock.nowNanos() - buckets.peekFirst().bucketStartNs >= bucketLengthNs) { - - // Remove old buckets. - while (!buckets.isEmpty() - && buckets.peekLast().bucketStartNs < clock.nowNanos() - durationNs) { - TimeSlice removed = buckets.removeLast(); - totalRequests -= removed.requestCount; - totalFailures -= removed.failureCount; - } - - buckets.addFirst(new TimeSlice()); - } - } - - boolean isSeeded() { - return totalRequests >= seedSize; - } - - double getFailureRate() { - return totalRequests == 0 ? 0 : ((double) totalFailures) / totalRequests; - } - } - - private static enum State { - NORMAL, // All requests are being permitted. - BACKOFF, // Quiet period while waiting for backend to recover/improve. - RECOVERY, // Ramping period where an ascending fraction of requests is being permitted. - FORCED_NORMAL // All other backends in the group are backing off, so this one is forced normal. - } - private class TimedStateMachine { - final StateMachine<State> stateMachine; - - private long stateEndNs; - private long stateDurationNs; - - TimedStateMachine(String name) { - stateMachine = StateMachine.<State>builder(name + "_backoff_state_machine") - .addState(State.NORMAL, State.BACKOFF, State.FORCED_NORMAL) - .addState(State.BACKOFF, State.RECOVERY, State.FORCED_NORMAL) - .addState(State.RECOVERY, State.NORMAL, State.BACKOFF, State.FORCED_NORMAL) - .addState(State.FORCED_NORMAL, State.RECOVERY) - .initialState(State.NORMAL) - .build(); - } - - State getState() { - return stateMachine.getState(); - } - - void transitionUnbounded(State state) { - stateMachine.transition(state); - } - - void transition(State state, long durationNs) { - transitionUnbounded(state); - this.stateEndNs = clock.nowNanos() + durationNs; - this.stateDurationNs = durationNs; - } - - long getStateDurationNs() { - return stateDurationNs; - } - - long getStateDurationMs() { - return Amount.of(stateDurationNs, Time.NANOSECONDS).as(Time.MILLISECONDS); - } - - void setStateDurationNs(long stateDurationNs) { - this.stateDurationNs = stateDurationNs; - } - - long getStateRemainingNs() { - return stateEndNs - clock.nowNanos(); - } - - long getStateRemainingMs() { - return Amount.of(getStateRemainingNs(), Time.NANOSECONDS).as(Time.MILLISECONDS); - } - - double getStateFractionComplete() { - return 1.0 - ((double) getStateRemainingNs()) / stateDurationNs; - } - - boolean isStateExpired() { - return clock.nowNanos() > stateEndNs; - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BackoffHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BackoffHelper.java b/commons/src/main/java/com/twitter/common/util/BackoffHelper.java deleted file mode 100644 index 7a2023a..0000000 --- a/commons/src/main/java/com/twitter/common/util/BackoffHelper.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.twitter.common.base.ExceptionalSupplier; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -import java.util.logging.Logger; - -/** - * A utility for dealing with backoffs of retryable actions. - * - * <p>TODO(John Sirois): investigate synergies with BackoffDecider. - * - * @author John Sirois - */ -public class BackoffHelper { - private static final Logger LOG = Logger.getLogger(BackoffHelper.class.getName()); - - private static final Amount<Long,Time> DEFAULT_INITIAL_BACKOFF = Amount.of(1L, Time.SECONDS); - private static final Amount<Long,Time> DEFAULT_MAX_BACKOFF = Amount.of(1L, Time.MINUTES); - - private final Clock clock; - private final BackoffStrategy backoffStrategy; - - /** - * Creates a new BackoffHelper that uses truncated binary backoff starting at a 1 second backoff - * and maxing out at a 1 minute backoff. - */ - public BackoffHelper() { - this(DEFAULT_INITIAL_BACKOFF, DEFAULT_MAX_BACKOFF); - } - - /** - * Creates a new BackoffHelper that uses truncated binary backoff starting at the given - * {@code initialBackoff} and maxing out at the given {@code maxBackoff}. - * - * @param initialBackoff the initial amount of time to back off - * @param maxBackoff the maximum amount of time to back off - */ - public BackoffHelper(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff) { - this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff)); - } - - /** - * Creates a new BackoffHelper that uses truncated binary backoff starting at the given - * {@code initialBackoff} and maxing out at the given {@code maxBackoff}. This will either: - * <ul> - * <li>{@code stopAtMax == true} : throw {@code BackoffExpiredException} when maxBackoff is - * reached</li> - * <li>{@code stopAtMax == false} : continue backing off with maxBackoff</li> - * </ul> - * - * @param initialBackoff the initial amount of time to back off - * @param maxBackoff the maximum amount of time to back off - * @param stopAtMax if true, this will throw {@code BackoffStoppedException} when the max backoff is - * reached - */ - public BackoffHelper(Amount<Long, Time> initialBackoff, Amount<Long, Time> maxBackoff, - boolean stopAtMax) { - this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff, stopAtMax)); - } - - /** - * Creates a BackoffHelper that uses the given {@code backoffStrategy} to calculate backoffs - * between retries. - * - * @param backoffStrategy the backoff strategy to use - */ - public BackoffHelper(BackoffStrategy backoffStrategy) { - this(Clock.SYSTEM_CLOCK, backoffStrategy); - } - - @VisibleForTesting BackoffHelper(Clock clock, BackoffStrategy backoffStrategy) { - this.clock = Preconditions.checkNotNull(clock); - this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy); - } - - /** - * Executes the given task using the configured backoff strategy until the task succeeds as - * indicated by returning {@code true}. - * - * @param task the retryable task to execute until success - * @throws InterruptedException if interrupted while waiting for the task to execute successfully - * @throws BackoffStoppedException if the backoff stopped unsuccessfully - * @throws E if the task throws - */ - public <E extends Exception> void doUntilSuccess(final ExceptionalSupplier<Boolean, E> task) - throws InterruptedException, BackoffStoppedException, E { - doUntilResult(new ExceptionalSupplier<Boolean, E>() { - @Override public Boolean get() throws E { - Boolean result = task.get(); - return Boolean.TRUE.equals(result) ? result : null; - } - }); - } - - /** - * Executes the given task using the configured backoff strategy until the task succeeds as - * indicated by returning a non-null value. - * - * @param task the retryable task to execute until success - * @return the result of the successfully executed task - * @throws InterruptedException if interrupted while waiting for the task to execute successfully - * @throws BackoffStoppedException if the backoff stopped unsuccessfully - * @throws E if the task throws - */ - public <T, E extends Exception> T doUntilResult(ExceptionalSupplier<T, E> task) - throws InterruptedException, BackoffStoppedException, E { - T result = task.get(); // give an immediate try - return (result != null) ? result : retryWork(task); - } - - private <T, E extends Exception> T retryWork(ExceptionalSupplier<T, E> work) - throws E, InterruptedException, BackoffStoppedException { - long currentBackoffMs = 0; - while (backoffStrategy.shouldContinue(currentBackoffMs)) { - currentBackoffMs = backoffStrategy.calculateBackoffMs(currentBackoffMs); - LOG.fine("Operation failed, backing off for " + currentBackoffMs + "ms"); - clock.waitFor(currentBackoffMs); - - T result = work.get(); - if (result != null) { - return result; - } - } - throw new BackoffStoppedException(String.format("Backoff stopped without succeeding.")); - } - - /** - * Occurs after the backoff strategy should stop. - */ - public static class BackoffStoppedException extends RuntimeException { - public BackoffStoppedException(String msg) { - super(msg); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java b/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java deleted file mode 100644 index a90d3f6..0000000 --- a/commons/src/main/java/com/twitter/common/util/BackoffStrategy.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -/** - * Encapsulates a strategy for backing off from an operation that repeatedly fails. - */ -public interface BackoffStrategy { - - /** - * Calculates the amount of time to backoff from an operation. - * - * @param lastBackoffMs the last used backoff in milliseconds where 0 signifies no backoff has - * been performed yet - * @return the amount of time in milliseconds to back off before retrying the operation - */ - long calculateBackoffMs(long lastBackoffMs); - - /** - * Returns whether to continue backing off. - * - * @param lastBackoffMs the last used backoff in milliseconds - * @return whether to continue backing off - */ - boolean shouldContinue(long lastBackoffMs); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/BuildInfo.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/BuildInfo.java b/commons/src/main/java/com/twitter/common/util/BuildInfo.java deleted file mode 100644 index 7fd1c4c..0000000 --- a/commons/src/main/java/com/twitter/common/util/BuildInfo.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import java.io.InputStream; -import java.util.Properties; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.annotations.VisibleForTesting; - -import com.twitter.common.base.MorePreconditions; - -/** - * Handles loading of a build properties file, and provides keys to look up known values in the - * properties. - */ -public class BuildInfo { - - private static final Logger LOG = Logger.getLogger(BuildInfo.class.getName()); - - private static final String DEFAULT_BUILD_PROPERTIES_PATH = "build.properties"; - - private final String resourcePath; - - private Properties properties = null; - - /** - * Creates a build info container that will use the default properties file path. - */ - public BuildInfo() { - this(DEFAULT_BUILD_PROPERTIES_PATH); - } - - /** - * Creates a build info container, reading from the given path. - * - * @param resourcePath The resource path to read build properties from. - */ - public BuildInfo(String resourcePath) { - this.resourcePath = MorePreconditions.checkNotBlank(resourcePath); - } - - @VisibleForTesting - public BuildInfo(Properties properties) { - this.resourcePath = null; - this.properties = properties; - } - - private void fetchProperties() { - properties = new Properties(); - LOG.info("Fetching build properties from " + resourcePath); - InputStream in = ClassLoader.getSystemResourceAsStream(resourcePath); - if (in == null) { - LOG.warning("Failed to fetch build properties from " + resourcePath); - return; - } - - try { - properties.load(in); - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to load properties file " + resourcePath, e); - } - } - - /** - * Fetches the properties stored in the resource location. - * - * @return The loaded properties, or a default properties object if there was a problem loading - * the specified properties resource. - */ - public Properties getProperties() { - if (properties == null) fetchProperties(); - return properties; - } - - /** - * Values of keys that are expected to exist in the loaded properties file. - */ - public enum Key { - PATH("build.path"), - USER("build.user.name"), - MACHINE("build.machine"), - DATE("build.date"), - TIME("build.time"), - TIMESTAMP("build.timestamp"), - GIT_TAG("build.git.tag"), - GIT_REVISION("build.git.revision"), - GIT_REVISION_NUMBER("build.git.revision.number"), - GIT_BRANCHNAME("build.git.branchname"); - - public final String value; - private Key(String value) { - this.value = value; - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/Clock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/Clock.java b/commons/src/main/java/com/twitter/common/util/Clock.java deleted file mode 100644 index f23c349..0000000 --- a/commons/src/main/java/com/twitter/common/util/Clock.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import java.io.Serializable; - -/** - * An abstraction of the system clock. - * - * @author John Sirois - */ -public interface Clock { - - /** - * A clock that returns the the actual time reported by the system. - * This clock is guaranteed to be serializable. - */ - Clock SYSTEM_CLOCK = new SerializableClock() { - @Override public long nowMillis() { - return System.currentTimeMillis(); - } - @Override public long nowNanos() { - return System.nanoTime(); - } - @Override public void waitFor(long millis) throws InterruptedException { - Thread.sleep(millis); - } - }; - - /** - * Returns the current time in milliseconds since the epoch. - * - * @return The current time in milliseconds since the epoch. - * @see System#currentTimeMillis() - */ - long nowMillis(); - - /** - * Returns the current time in nanoseconds. Should be used only for relative timing. - * See {@code System.nanoTime()} for tips on using the value returned here. - * - * @return A measure of the current time in nanoseconds. - * @see System#nanoTime() - */ - long nowNanos(); - - /** - * Waits for the given amount of time to pass on this clock before returning. - * - * @param millis the amount of time to wait in milliseconds - * @throws InterruptedException if this wait was interrupted - */ - void waitFor(long millis) throws InterruptedException; -} - -/** - * A typedef to support anonymous {@link Clock} implementations that are also {@link Serializable}. - */ -interface SerializableClock extends Clock, Serializable { } http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/CommandExecutor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/CommandExecutor.java b/commons/src/main/java/com/twitter/common/util/CommandExecutor.java deleted file mode 100644 index ad44524..0000000 --- a/commons/src/main/java/com/twitter/common/util/CommandExecutor.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import com.twitter.common.base.ExceptionalCommand; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -/** - * Asynchronous executor of enqueued tasks in a rate limited manner. - * - * @author Srinivasan Rajagopal - */ -public interface CommandExecutor { - - /** - * Enqueue a task to be executed with retry semantics defined. - * - * @param name Human readable name for this task. - * @param task task to execute. - * @param exceptionClass Concrete exception type. - * @param maxTries num of tries in case of failure. - * @param retryDelay interval between retries in case of failure. - */ - <E extends Exception> void execute( - String name, - ExceptionalCommand<E> task, - Class<E> exceptionClass, - int maxTries, - Amount<Long, Time> retryDelay); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/DateUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/DateUtils.java b/commons/src/main/java/com/twitter/common/util/DateUtils.java deleted file mode 100644 index 0f9f950..0000000 --- a/commons/src/main/java/com/twitter/common/util/DateUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import java.util.Calendar; -import java.util.Date; -import java.util.concurrent.TimeUnit; - -/** - * Utilities for working with java {@link Date}s. - * - * @author John Sirois - */ -public final class DateUtils { - - public static Date now() { - return new Date(); - } - - public static long toUnixTime(Date date) { - return toUnixTime(date.getTime()); - } - - public static long nowUnixTime() { - return toUnixTime(System.currentTimeMillis()); - } - - public static long toUnixTime(long millisSinceEpoch) { - return TimeUnit.MILLISECONDS.toSeconds(millisSinceEpoch); - } - - public static Date ago(int calendarField, int amount) { - return ago(now(), calendarField, amount); - } - - public static Date ago(Date referenceDate, int calendarField, int amount) { - Calendar calendar = Calendar.getInstance(); - calendar.setTime(referenceDate); - calendar.add(calendarField, -1 * amount); - return calendar.getTime(); - } - - private DateUtils() { - // utility - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/FileUtils.java b/commons/src/main/java/com/twitter/common/util/FileUtils.java deleted file mode 100644 index 0951662..0000000 --- a/commons/src/main/java/com/twitter/common/util/FileUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import java.io.File; - -/** - * Utilities for working with Files - * - * @author Florian Leibert - */ -public final class FileUtils { - - private FileUtils() { - } - - /** - * recursively deletes the path and all it's content and returns true if it succeeds - * Note that the content could be partially deleted and the method return false - * - * @param path the path to delete - * @return true if the path was deleted - */ - public static boolean forceDeletePath(File path) { - if (path == null) { - return false; - } - if (path.exists() && path.isDirectory()) { - File[] files = path.listFiles(); - for (File file : files) { - if (file.isDirectory()) { - forceDeletePath(file); - } else { - file.delete(); - } - } - } - return path.delete(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/LowResClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/LowResClock.java b/commons/src/main/java/com/twitter/common/util/LowResClock.java deleted file mode 100644 index ad26bee..0000000 --- a/commons/src/main/java/com/twitter/common/util/LowResClock.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import java.io.Closeable; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -/** - * Low resolution implementation of a {@link com.twitter.common.util.Clock}, - * optimized for fast reads at the expense of precision. - * It works by caching the result of the system clock for a - * {@code resolution} amount of time. - */ -public class LowResClock implements Clock, Closeable { - private static final ScheduledExecutorService GLOBAL_SCHEDULER = - Executors.newScheduledThreadPool(1, new ThreadFactory() { - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "LowResClock"); - t.setDaemon(true); - return t; - } - }); - - private volatile long time; - private final ScheduledFuture<?> updaterHandler; - private final Clock underlying; - - @VisibleForTesting - LowResClock(Amount<Long, Time> resolution, ScheduledExecutorService executor, Clock clock) { - long sleepTimeMs = resolution.as(Time.MILLISECONDS); - Preconditions.checkArgument(sleepTimeMs > 0); - underlying = clock; - Runnable ticker = new Runnable() { - @Override public void run() { - time = underlying.nowMillis(); - } - }; - - // Ensure the constructing thread sees a LowResClock with a valid (low-res) time by executing a - // blocking call now. - ticker.run(); - - updaterHandler = - executor.scheduleAtFixedRate(ticker, sleepTimeMs, sleepTimeMs, TimeUnit.MILLISECONDS); - } - - - /** - * Construct a LowResClock which wraps the system clock. - * This constructor will also schedule a periodic task responsible for - * updating the time every {@code resolution}. - */ - public LowResClock(Amount<Long, Time> resolution) { - this(resolution, GLOBAL_SCHEDULER, Clock.SYSTEM_CLOCK); - } - - /** - * Terminate the underlying updater task. - * Any subsequent usage of the clock will throw an {@link IllegalStateException}. - */ - public void close() { - updaterHandler.cancel(true); - } - - @Override - public long nowMillis() { - checkNotClosed(); - return time; - } - - @Override - public long nowNanos() { - return nowMillis() * 1000 * 1000; - } - - @Override - public void waitFor(long millis) throws InterruptedException { - checkNotClosed(); - underlying.waitFor(millis); - } - - private void checkNotClosed() { - if (updaterHandler.isCancelled()) { - throw new IllegalStateException("LowResClock invoked after being closed!"); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/util/ParsingUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/util/ParsingUtil.java b/commons/src/main/java/com/twitter/common/util/ParsingUtil.java deleted file mode 100644 index d84975a..0000000 --- a/commons/src/main/java/com/twitter/common/util/ParsingUtil.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.util; - -import com.google.common.base.Preconditions; - -import com.twitter.common.collections.Pair; - -/** - * Common methods for parsing configs. - * - * @author John Sirois - */ -public class ParsingUtil { - /** - * Parses a string as a range between one integer and another. The integers must be separated by - * a hypen character (space padding is acceptable). Additionally, the first integer - * (left-hand side) must be less than or equal to the second (right-hand side). - * - * @param rangeString The string to parse as an integer range. - * @return A pair of the parsed integers. - */ - public static Pair<Integer, Integer> parseRange(String rangeString) { - if (rangeString == null) return null; - - String[] startEnd = rangeString.split("-"); - Preconditions.checkState( - startEnd.length == 2, "Shard range format: start-end (e.g. 1-4)"); - int start; - int end; - try { - start = Integer.parseInt(startEnd[0].trim()); - end = Integer.parseInt(startEnd[1].trim()); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Failed to parse shard range.", e); - } - - Preconditions.checkState( - start <= end, "The left-hand side of a shard range must be <= the right-hand side."); - return Pair.of(start, end); - } -}