http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/Thrift.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/Thrift.java b/commons/src/main/java/com/twitter/common/thrift/Thrift.java deleted file mode 100644 index 3c44d79..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/Thrift.java +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift; - -import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.net.loadbalancing.RequestTracker; -import com.twitter.common.net.pool.Connection; -import com.twitter.common.net.pool.ObjectPool; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.thrift.callers.Caller; -import com.twitter.common.thrift.callers.DeadlineCaller; -import com.twitter.common.thrift.callers.DebugCaller; -import com.twitter.common.thrift.callers.RetryingCaller; -import com.twitter.common.thrift.callers.StatTrackingCaller; -import com.twitter.common.thrift.callers.ThriftCaller; - -/** - * A generic thrift client that handles reconnection in the case of protocol errors, automatic - * retries, call deadlines and call statistics tracking. This class aims for behavior compatible - * with the <a href="http://github.com/fauna/thrift_client">generic ruby thrift client</a>. - * - * <p>In order to enforce call deadlines for synchronous clients, this class uses an - * {@link java.util.concurrent.ExecutorService}. If a custom executor is supplied, it should throw - * a subclass of {@link RejectedExecutionException} to signal thread resource exhaustion, in which - * case the client will fail fast and propagate the event as a {@link TResourceExhaustedException}. - * - * TODO(William Farner): Before open sourcing, look into changing the current model of wrapped proxies - * to use a single proxy and wrapped functions for decorators. - * - * @author John Sirois - */ -public class Thrift<T> { - - /** - * The default thrift call configuration used if none is specified. - * - * Specifies the following settings: - * <ul> - * <li>global call timeout: 1 second - * <li>call retries: 0 - * <li>retryable exceptions: TTransportException (network exceptions including socket timeouts) - * <li>wait for connections: true - * <li>debug: false - * </ul> - */ - public static final Config DEFAULT_CONFIG = Config.builder() - .withRequestTimeout(Amount.of(1L, Time.SECONDS)) - .noRetries() - .retryOn(TTransportException.class) // if maxRetries is set non-zero - .create(); - - /** - * The default thrift call configuration used for an async client if none is specified. - * - * Specifies the following settings: - * <ul> - * <li>global call timeout: none - * <li>call retries: 0 - * <li>retryable exceptions: IOException, TTransportException - * (network exceptions but not timeouts) - * <li>wait for connections: true - * <li>debug: false - * </ul> - */ - @SuppressWarnings("unchecked") - public static final Config DEFAULT_ASYNC_CONFIG = Config.builder(DEFAULT_CONFIG) - .withRequestTimeout(Amount.of(0L, Time.SECONDS)) - .noRetries() - .retryOn(ImmutableSet.<Class<? extends Exception>>builder() - .add(IOException.class) - .add(TTransportException.class).build()) // if maxRetries is set non-zero - .create(); - - private final Config defaultConfig; - private final ExecutorService executorService; - private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool; - private final RequestTracker<InetSocketAddress> requestTracker; - private final String serviceName; - private final Class<T> serviceInterface; - private final Function<TTransport, T> clientFactory; - private final boolean async; - private final boolean withSsl; - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG}, cached thread pool - * {@link ExecutorService}, and synchronous calls. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, - String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory) { - - this(DEFAULT_CONFIG, connectionPool, requestTracker, serviceName, serviceInterface, - clientFactory, false, false); - } - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool - * {@link ExecutorService}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, - String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, - boolean async) { - - this(getConfig(async), connectionPool, requestTracker, serviceName, - serviceInterface, clientFactory, async, false); - } - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool - * {@link ExecutorService}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, - String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, - boolean async, boolean ssl) { - - this(getConfig(async), connectionPool, requestTracker, serviceName, - serviceInterface, clientFactory, async, ssl); - } - - /** - * Constructs an instance with a cached thread pool {@link ExecutorService}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(Config config, ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, - String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, - boolean async, boolean ssl) { - - this(config, - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Thrift["+ serviceName +"][%d]") - .build()), - connectionPool, requestTracker, serviceName, serviceInterface, clientFactory, async, ssl); - } - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ExecutorService executorService, - ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, - String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, - boolean async, boolean ssl) { - - this(getConfig(async), executorService, connectionPool, requestTracker, serviceName, - serviceInterface, clientFactory, async, ssl); - } - - private static Config getConfig(boolean async) { - return async ? DEFAULT_ASYNC_CONFIG : DEFAULT_CONFIG; - } - - /** - * Constructs a new Thrift factory for creating clients that make calls to a particular thrift - * service. - * - * <p>Note that the combination of {@code config} and {@code connectionPool} need to be chosen - * with care depending on usage of the generated thrift clients. In particular, if configured - * to not wait for connections, the {@code connectionPool} ought to be warmed up with a set of - * connections or else be actively building connections in the background. - * - * <p>TODO(John Sirois): consider adding an method to ObjectPool that would allow Thrift to handle - * this case by pro-actively warming the pool. - * - * @param config the default configuration to use for all thrift calls; also the configuration all - * {@link ClientBuilder}s start with - * @param executorService for invoking calls with a specified deadline - * @param connectionPool the source for thrift connections - * @param serviceName a /vars friendly name identifying the service clients will connect to - * @param serviceInterface the thrift compiler generate interface class for the remote service - * (Iface) - * @param clientFactory a function that can generate a concrete thrift client for the given - * {@code serviceInterface} - * @param async enable asynchronous API - * @param ssl enable TLS handshaking for Thrift calls - */ - public Thrift(Config config, ExecutorService executorService, - ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, String serviceName, - Class<T> serviceInterface, Function<TTransport, T> clientFactory, boolean async, boolean ssl) { - - defaultConfig = Preconditions.checkNotNull(config); - this.executorService = Preconditions.checkNotNull(executorService); - this.connectionPool = Preconditions.checkNotNull(connectionPool); - this.requestTracker = Preconditions.checkNotNull(requestTracker); - this.serviceName = MorePreconditions.checkNotBlank(serviceName); - this.serviceInterface = checkServiceInterface(serviceInterface); - this.clientFactory = Preconditions.checkNotNull(clientFactory); - this.async = async; - this.withSsl = ssl; - } - - static <I> Class<I> checkServiceInterface(Class<I> serviceInterface) { - Preconditions.checkNotNull(serviceInterface); - Preconditions.checkArgument(serviceInterface.isInterface(), - "%s must be a thrift service interface", serviceInterface); - return serviceInterface; - } - - /** - * Closes any open connections and prepares this thrift client for graceful shutdown. Any thrift - * client proxies returned from {@link #create()} will become invalid. - */ - public void close() { - connectionPool.close(); - executorService.shutdown(); - } - - /** - * A builder class that allows modifications of call behavior to be made for a given Thrift - * client. Note that in the case of conflicting configuration calls, the last call wins. So, - * for example, the following sequence would result in all calls being subject to a 5 second - * global deadline: - * <code> - * builder.blocking().withDeadline(5, TimeUnit.SECONDS).create() - * </code> - * - * @see Config - */ - public final class ClientBuilder extends Config.AbstractBuilder<ClientBuilder> { - private ClientBuilder(Config template) { - super(template); - } - - @Override - protected ClientBuilder getThis() { - return this; - } - - /** - * Creates a new client using the built up configuration changes. - */ - public T create() { - return createClient(getConfig()); - } - } - - /** - * Creates a new thrift client builder that inherits this Thrift instance's default configuration. - * This is useful for customizing a client for a particular thrift call that makes sense to treat - * differently from the rest of the calls to a given service. - */ - public ClientBuilder builder() { - return builder(defaultConfig); - } - - /** - * Creates a new thrift client builder that inherits the given configuration. - * This is useful for customizing a client for a particular thrift call that makes sense to treat - * differently from the rest of the calls to a given service. - */ - public ClientBuilder builder(Config config) { - Preconditions.checkNotNull(config); - return new ClientBuilder(config); - } - - /** - * Creates a new client using the default configuration specified for this Thrift instance. - */ - public T create() { - return createClient(defaultConfig); - } - - private T createClient(Config config) { - StatsProvider statsProvider = config.getStatsProvider(); - - // lease/call/[invalidate]/release - boolean debug = config.isDebug(); - - Caller decorated = new ThriftCaller<T>(connectionPool, requestTracker, clientFactory, - config.getConnectTimeout(), debug); - - // [retry] - if (config.getMaxRetries() > 0) { - decorated = new RetryingCaller(decorated, async, statsProvider, serviceName, - config.getMaxRetries(), config.getRetryableExceptions(), debug); - } - - // [deadline] - if (config.getRequestTimeout().getValue() > 0) { - Preconditions.checkArgument(!async, - "Request deadlines may not be used with an asynchronous client."); - - decorated = new DeadlineCaller(decorated, async, executorService, config.getRequestTimeout()); - } - - // [debug] - if (debug) { - decorated = new DebugCaller(decorated, async); - } - - // stats - if (config.enableStats()) { - decorated = new StatTrackingCaller(decorated, async, statsProvider, serviceName); - } - - final Caller caller = decorated; - - final InvocationHandler invocationHandler = new InvocationHandler() { - @Override - public Object invoke(Object o, Method method, Object[] args) throws Throwable { - AsyncMethodCallback callback = null; - if (args != null && async) { - List<Object> argsList = Lists.newArrayList(args); - callback = extractCallback(argsList); - args = argsList.toArray(); - } - - return caller.call(method, args, callback, null); - } - }; - - @SuppressWarnings("unchecked") - T instance = (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), - new Class<?>[] {serviceInterface}, invocationHandler); - return instance; - } - - /** - * Verifies that the final argument in a list of objects is a fully-formed - * {@link AsyncMethodCallback} and extracts it, removing it from the argument list. - * - * @param args Argument list to remove the callback from. - * @return The callback extracted from {@code args}. - */ - private static AsyncMethodCallback extractCallback(List<Object> args) { - // TODO(William Farner): Check all interface methods when building the Thrift client - // and verify that last arguments are all callbacks...this saves us from checking - // each time. - - // Check that the last argument is a callback. - Preconditions.checkArgument(args.size() > 0); - Object lastArg = args.get(args.size() - 1); - Preconditions.checkArgument(lastArg instanceof AsyncMethodCallback, - "Last argument of an async thrift call is expected to be of type AsyncMethodCallback."); - - return (AsyncMethodCallback) args.remove(args.size() - 1); - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java b/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java deleted file mode 100644 index c1db7b7..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java +++ /dev/null @@ -1,366 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.twitter.common.base.Closure; -import com.twitter.common.base.Closures; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.net.pool.Connection; -import com.twitter.common.net.pool.ConnectionFactory; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; - -/** - * A connection factory for thrift transport connections to a given host. This connection factory - * is lazy and will only create a configured maximum number of active connections - where a - * {@link ConnectionFactory#create(com.twitter.common.quantity.Amount) created} connection that has - * not been {@link #destroy destroyed} is considered active. - * - * @author John Sirois - */ -public class ThriftConnectionFactory - implements ConnectionFactory<Connection<TTransport, InetSocketAddress>> { - - public enum TransportType { - BLOCKING, FRAMED, NONBLOCKING; - - /** - * Async clients implicitly use a framed transport, requiring the server they connect to to do - * the same. This prevents specifying a nonblocking client without a framed transport, since - * that is not compatible with thrift and would simply cause the client to blow up when making a - * request. Instead, you must explicitly say useFramedTransport(true) for any buildAsync(). - */ - public static TransportType get(boolean framedTransport, boolean nonblocking) { - if (nonblocking) { - Preconditions.checkArgument(framedTransport, - "nonblocking client requires a server running framed transport"); - return NONBLOCKING; - } - - return framedTransport ? FRAMED : BLOCKING; - } - } - - private static InetSocketAddress asEndpoint(String host, int port) { - MorePreconditions.checkNotBlank(host); - Preconditions.checkArgument(port > 0); - return InetSocketAddress.createUnresolved(host, port); - } - - private InetSocketAddress endpoint; - private final int maxConnections; - private final TransportType transportType; - private final Amount<Long, Time> socketTimeout; - private final Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback; - private boolean sslTransport = false; - - private final Set<Connection<TTransport, InetSocketAddress>> activeConnections = - Sets.newSetFromMap( - Maps.<Connection<TTransport, InetSocketAddress>, Boolean>newIdentityHashMap()); - private volatile int lastActiveConnectionsSize = 0; - - private final Lock activeConnectionsWriteLock = new ReentrantLock(true); - - /** - * Creates a thrift connection factory with a plain socket (non-framed transport). - * This is the same as calling {@link #ThriftConnectionFactory(String, int, int, boolean)} with - * {@code framedTransport} set to {@code false}. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections) { - this(host, port, maxConnections, TransportType.BLOCKING); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - * @param framedTransport Whether to use framed or blocking transport. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections, - boolean framedTransport) { - - this(asEndpoint(host, port), maxConnections, TransportType.get(framedTransport, false)); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * - * @param endpoint Endpoint to connect to. - * @param maxConnections Maximum number of connections for this host:port. - * @param framedTransport Whether to use framed or blocking transport. - */ - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - boolean framedTransport) { - - this(endpoint, maxConnections, TransportType.get(framedTransport, false)); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, - * otherwise a raw {@link TSocket} will be used. - * Timeouts are ignored when nonblocking transport is used. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - * @param transportType Whether to use normal blocking, framed blocking, or non-blocking - * (implicitly framed) transport. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections, - TransportType transportType) { - this(host, port, maxConnections, transportType, null); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, - * otherwise a raw {@link TSocket} will be used. - * Timeouts are ignored when nonblocking transport is used. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - * @param transportType Whether to use normal blocking, framed blocking, or non-blocking - * (implicitly framed) transport. - * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o - * the blocking client. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections, - TransportType transportType, Amount<Long, Time> socketTimeout) { - this(asEndpoint(host, port), maxConnections, transportType, socketTimeout); - } - - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - TransportType transportType) { - this(endpoint, maxConnections, transportType, null); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, - * otherwise a raw {@link TSocket} will be used. - * Timeouts are ignored when nonblocking transport is used. - * - * @param endpoint Endpoint to connect to. - * @param maxConnections Maximum number of connections for this host:port. - * @param transportType Whether to use normal blocking, framed blocking, or non-blocking - * (implicitly framed) transport. - * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o - * the blocking client. - */ - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - TransportType transportType, Amount<Long, Time> socketTimeout) { - this(endpoint, maxConnections, transportType, socketTimeout, - Closures.<Connection<TTransport, InetSocketAddress>>noop(), false); - } - - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - TransportType transportType, Amount<Long, Time> socketTimeout, - Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback, - boolean sslTransport) { - Preconditions.checkArgument(maxConnections > 0, "maxConnections must be at least 1"); - if (socketTimeout != null) { - Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0); - } - - this.endpoint = Preconditions.checkNotNull(endpoint); - this.maxConnections = maxConnections; - this.transportType = transportType; - this.socketTimeout = socketTimeout; - this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback); - this.sslTransport = sslTransport; - } - - @Override - public boolean mightCreate() { - return lastActiveConnectionsSize < maxConnections; - } - - /** - * FIXME: shouldn't this throw TimeoutException instead of returning null - * in the timeout cases as per the ConnectionFactory.create javadoc? - */ - @Override - public Connection<TTransport, InetSocketAddress> create(Amount<Long, Time> timeout) - throws TTransportException, IOException { - - Preconditions.checkNotNull(timeout); - if (timeout.getValue() == 0) { - return create(); - } - - try { - long timeRemainingNs = timeout.as(Time.NANOSECONDS); - long start = System.nanoTime(); - if(activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) { - try { - if (!willCreateSafe()) { - return null; - } - - timeRemainingNs -= (System.nanoTime() - start); - - return createConnection((int) TimeUnit.NANOSECONDS.toMillis(timeRemainingNs)); - } finally { - activeConnectionsWriteLock.unlock(); - } - } else { - return null; - } - } catch (InterruptedException e) { - return null; - } - } - - private Connection<TTransport, InetSocketAddress> create() - throws TTransportException, IOException { - activeConnectionsWriteLock.lock(); - try { - if (!willCreateSafe()) { - return null; - } - - return createConnection(0); - } finally { - activeConnectionsWriteLock.unlock(); - } - } - - private Connection<TTransport, InetSocketAddress> createConnection(int timeoutMillis) - throws TTransportException, IOException { - TTransport transport = createTransport(timeoutMillis); - if (transport == null) { - return null; - } - - Connection<TTransport, InetSocketAddress> connection = - new TTransportConnection(transport, endpoint); - postCreateCallback.execute(connection); - activeConnections.add(connection); - lastActiveConnectionsSize = activeConnections.size(); - return connection; - } - - private boolean willCreateSafe() { - return activeConnections.size() < maxConnections; - } - - @VisibleForTesting - TTransport createTransport(int timeoutMillis) throws TTransportException, IOException { - TSocket socket = null; - if (transportType != TransportType.NONBLOCKING) { - // can't do a nonblocking create on a blocking transport - if (timeoutMillis <= 0) { - return null; - } - - if (sslTransport) { - SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault(); - SSLSocket ssl_socket = (SSLSocket) factory.createSocket(endpoint.getHostName(), endpoint.getPort()); - ssl_socket.setSoTimeout(timeoutMillis); - return new TSocket(ssl_socket); - } else { - socket = new TSocket(endpoint.getHostName(), endpoint.getPort(), timeoutMillis); - } - } - - try { - switch (transportType) { - case BLOCKING: - socket.open(); - setSocketTimeout(socket); - return socket; - case FRAMED: - TFramedTransport transport = new TFramedTransport(socket); - transport.open(); - setSocketTimeout(socket); - return transport; - case NONBLOCKING: - try { - return new TNonblockingSocket(endpoint.getHostName(), endpoint.getPort()); - } catch (IOException e) { - throw new IOException("Failed to create non-blocking transport to " + endpoint, e); - } - } - } catch (TTransportException e) { - throw new TTransportException("Failed to create transport to " + endpoint, e); - } - - throw new IllegalArgumentException("unknown transport type " + transportType); - } - - private void setSocketTimeout(TSocket socket) { - if (socketTimeout != null) { - socket.setTimeout(socketTimeout.as(Time.MILLISECONDS).intValue()); - } - } - - @Override - public void destroy(Connection<TTransport, InetSocketAddress> connection) { - activeConnectionsWriteLock.lock(); - try { - boolean wasActiveConnection = activeConnections.remove(connection); - Preconditions.checkArgument(wasActiveConnection, - "connection %s not created by this factory", connection); - lastActiveConnectionsSize = activeConnections.size(); - } finally { - activeConnectionsWriteLock.unlock(); - } - - // We close the connection outside the critical section which means we may have more connections - // "active" (open) than maxConnections for a very short time - connection.close(); - } - - @Override - public String toString() { - return String.format("%s[%s]", getClass().getSimpleName(), endpoint); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/ThriftException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftException.java b/commons/src/main/java/com/twitter/common/thrift/ThriftException.java deleted file mode 100644 index 5364521..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/ThriftException.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift; - -/** - * Exception class to wrap exceptions caught during thrift calls. - */ -public class ThriftException extends Exception { - public ThriftException(String message) { - super(message); - } - public ThriftException(String message, Throwable t) { - super(message, t); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/ThriftFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftFactory.java b/commons/src/main/java/com/twitter/common/thrift/ThriftFactory.java deleted file mode 100644 index bf69361..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/ThriftFactory.java +++ /dev/null @@ -1,654 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.thrift.async.TAsyncClient; -import org.apache.thrift.async.TAsyncClientManager; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TNonblockingTransport; -import org.apache.thrift.transport.TTransport; - -import com.twitter.common.base.Closure; -import com.twitter.common.base.Closures; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.net.loadbalancing.LeastConnectedStrategy; -import com.twitter.common.net.loadbalancing.LoadBalancer; -import com.twitter.common.net.loadbalancing.LoadBalancerImpl; -import com.twitter.common.net.loadbalancing.LoadBalancingStrategy; -import com.twitter.common.net.loadbalancing.MarkDeadStrategyWithHostCheck; -import com.twitter.common.net.loadbalancing.TrafficMonitorAdapter; -import com.twitter.common.net.monitoring.TrafficMonitor; -import com.twitter.common.net.pool.Connection; -import com.twitter.common.net.pool.ConnectionPool; -import com.twitter.common.net.pool.DynamicHostSet; -import com.twitter.common.net.pool.DynamicPool; -import com.twitter.common.net.pool.MetaPool; -import com.twitter.common.net.pool.ObjectPool; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stats; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.thrift.ThriftConnectionFactory.TransportType; -import com.twitter.common.util.BackoffDecider; -import com.twitter.common.util.BackoffStrategy; -import com.twitter.common.util.TruncatedBinaryBackoff; -import com.twitter.common.util.concurrent.ForwardingExecutorService; -import com.twitter.thrift.ServiceInstance; - -/** - * A utility that provides convenience methods to build common {@link Thrift}s. - * - * The thrift factory allows you to specify parameters that define how the client connects to - * and communicates with servers, such as the transport type, connection settings, and load - * balancing. Request-level settings like sync/async and retries should be set on the - * {@link Thrift} instance that this factory will create. - * - * The factory will attempt to provide reasonable defaults to allow the caller to minimize the - * amount of necessary configuration. Currently, the default behavior includes: - * - * <ul> - * <li> A test lease/release for each host will be performed every second - * {@link #withDeadConnectionRestoreInterval(Amount)} - * <li> At most 50 connections will be established to each host - * {@link #withMaxConnectionsPerEndpoint(int)} - * <li> Unframed transport {@link #useFramedTransport(boolean)} - * <li> A load balancing strategy that will mark hosts dead and prefer least-connected hosts. - * Hosts are marked dead if the most recent connection attempt was a failure or else based on - * the windowed error rate of attempted RPCs. If the error rate for a connected host exceeds - * 20% over the last second, the host will be disabled for 2 seconds ascending up to 10 seconds - * if the elevated error rate persists. - * {@link #withLoadBalancingStrategy(LoadBalancingStrategy)} - * <li> Statistics are reported through {@link Stats} - * {@link #withStatsProvider(StatsProvider)} - * <li> A service name matching the thrift interface name {@link #withServiceName(String)} - * </ul> - * - * @author John Sirois - */ -public class ThriftFactory<T> { - private static final Amount<Long,Time> DEFAULT_DEAD_TARGET_RESTORE_INTERVAL = - Amount.of(1L, Time.SECONDS); - - private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 50; - - private Class<T> serviceInterface; - private Function<TTransport, T> clientFactory; - private int maxConnectionsPerEndpoint; - private Amount<Long,Time> connectionRestoreInterval; - private boolean framedTransport; - private LoadBalancingStrategy<InetSocketAddress> loadBalancingStrategy = null; - private final TrafficMonitor<InetSocketAddress> monitor; - private Amount<Long,Time> socketTimeout = null; - private Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback = Closures.noop(); - private StatsProvider statsProvider = Stats.STATS_PROVIDER; - private Optional<String> endpointName = Optional.absent(); - private String serviceName; - private boolean sslTransport; - - public static <T> ThriftFactory<T> create(Class<T> serviceInterface) { - return new ThriftFactory<T>(serviceInterface); - } - - /** - * Creates a default factory that will use unframed blocking transport. - * - * @param serviceInterface The interface of the thrift service to make a client for. - */ - private ThriftFactory(Class<T> serviceInterface) { - this.serviceInterface = Thrift.checkServiceInterface(serviceInterface); - this.maxConnectionsPerEndpoint = DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT; - this.connectionRestoreInterval = DEFAULT_DEAD_TARGET_RESTORE_INTERVAL; - this.framedTransport = false; - this.monitor = new TrafficMonitor<InetSocketAddress>(serviceInterface.getName()); - this.serviceName = serviceInterface.getEnclosingClass().getSimpleName(); - this.sslTransport = false; - } - - private void checkBaseState() { - Preconditions.checkArgument(maxConnectionsPerEndpoint > 0, - "Must allow at least 1 connection per endpoint; %s specified", maxConnectionsPerEndpoint); - } - - public TrafficMonitor<InetSocketAddress> getMonitor() { - return monitor; - } - - /** - * Creates the thrift client, and initializes connection pools. - * - * @param backends Backends to connect to. - * @return A new thrift client. - */ - public Thrift<T> build(Set<InetSocketAddress> backends) { - checkBaseState(); - MorePreconditions.checkNotBlank(backends); - - ManagedThreadPool managedThreadPool = createManagedThreadpool(backends.size()); - LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer(); - Function<TTransport, T> clientFactory = getClientFactory(); - - ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = - createConnectionPool(backends, loadBalancer, managedThreadPool, false); - - return new Thrift<T>(managedThreadPool, connectionPool, loadBalancer, serviceName, - serviceInterface, clientFactory, false, sslTransport); - } - - /** - * Creates a synchronous thrift client that will communicate with a dynamic host set. - * - * @param hostSet The host set to use as a backend. - * @return A thrift client. - * @throws ThriftFactoryException If an error occurred while creating the client. - */ - public Thrift<T> build(DynamicHostSet<ServiceInstance> hostSet) throws ThriftFactoryException { - checkBaseState(); - Preconditions.checkNotNull(hostSet); - - ManagedThreadPool managedThreadPool = createManagedThreadpool(1); - LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer(); - Function<TTransport, T> clientFactory = getClientFactory(); - - ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = - createConnectionPool(hostSet, loadBalancer, managedThreadPool, false, endpointName); - - return new Thrift<T>(managedThreadPool, connectionPool, loadBalancer, serviceName, - serviceInterface, clientFactory, false, sslTransport); - } - - private ManagedThreadPool createManagedThreadpool(int initialEndpointCount) { - return new ManagedThreadPool(serviceName, initialEndpointCount, maxConnectionsPerEndpoint); - } - - /** - * A finite thread pool that monitors backend choice events to dynamically resize. This - * {@link java.util.concurrent.ExecutorService} implementation immediately rejects requests when - * there are no more available worked threads (requests are not queued). - */ - private static class ManagedThreadPool extends ForwardingExecutorService<ThreadPoolExecutor> - implements Closure<Collection<InetSocketAddress>> { - - private static final Logger LOG = Logger.getLogger(ManagedThreadPool.class.getName()); - - private static ThreadPoolExecutor createThreadPool(String serviceName, int initialSize) { - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setNameFormat("Thrift[" +serviceName + "][%d]") - .setDaemon(true) - .build(); - return new ThreadPoolExecutor(initialSize, initialSize, 0, TimeUnit.MILLISECONDS, - new SynchronousQueue<Runnable>(), threadFactory); - } - - private final int maxConnectionsPerEndpoint; - - public ManagedThreadPool(String serviceName, int initialEndpointCount, - int maxConnectionsPerEndpoint) { - - super(createThreadPool(serviceName, initialEndpointCount * maxConnectionsPerEndpoint)); - this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint; - setRejectedExecutionHandler(initialEndpointCount); - } - - private void setRejectedExecutionHandler(int endpointCount) { - final String message = - String.format("All %d x %d connections in use", endpointCount, maxConnectionsPerEndpoint); - delegate.setRejectedExecutionHandler(new RejectedExecutionHandler() { - @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { - throw new RejectedExecutionException(message); - } - }); - } - - @Override - public void execute(Collection<InetSocketAddress> chosenBackends) { - int previousPoolSize = delegate.getMaximumPoolSize(); - /* - * In the case of no available backends, we need to make sure we pass in a positive pool - * size to our delegate. In particular, java.util.concurrent.ThreadPoolExecutor does not - * accept zero as a valid core or max pool size. - */ - int backendCount = Math.max(chosenBackends.size(), 1); - int newPoolSize = backendCount * maxConnectionsPerEndpoint; - - if (previousPoolSize != newPoolSize) { - LOG.info(String.format("Re-sizing deadline thread pool from: %d to: %d", - previousPoolSize, newPoolSize)); - if (previousPoolSize < newPoolSize) { // Don't cross the beams! - delegate.setMaximumPoolSize(newPoolSize); - delegate.setCorePoolSize(newPoolSize); - } else { - delegate.setCorePoolSize(newPoolSize); - delegate.setMaximumPoolSize(newPoolSize); - } - setRejectedExecutionHandler(backendCount); - } - } - } - - /** - * Creates an asynchronous thrift client that will communicate with a fixed set of backends. - * - * @param backends Backends to connect to. - * @return A thrift client. - * @throws ThriftFactoryException If an error occurred while creating the client. - */ - public Thrift<T> buildAsync(Set<InetSocketAddress> backends) throws ThriftFactoryException { - checkBaseState(); - MorePreconditions.checkNotBlank(backends); - - LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer(); - Closure<Collection<InetSocketAddress>> noop = Closures.noop(); - Function<TTransport, T> asyncClientFactory = getAsyncClientFactory(); - - ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = - createConnectionPool(backends, loadBalancer, noop, true); - - return new Thrift<T>(connectionPool, loadBalancer, - serviceName, serviceInterface, asyncClientFactory, true); - } - - /** - * Creates an asynchronous thrift client that will communicate with a dynamic host set. - * - * @param hostSet The host set to use as a backend. - * @return A thrift client. - * @throws ThriftFactoryException If an error occurred while creating the client. - */ - public Thrift<T> buildAsync(DynamicHostSet<ServiceInstance> hostSet) - throws ThriftFactoryException { - checkBaseState(); - Preconditions.checkNotNull(hostSet); - - LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer(); - Closure<Collection<InetSocketAddress>> noop = Closures.noop(); - Function<TTransport, T> asyncClientFactory = getAsyncClientFactory(); - - ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = - createConnectionPool(hostSet, loadBalancer, noop, true, endpointName); - - return new Thrift<T>(connectionPool, loadBalancer, - serviceName, serviceInterface, asyncClientFactory, true); - } - - /** - * Prepare the client factory, which will create client class instances from transports. - * - * @return The client factory to use. - */ - private Function<TTransport, T> getClientFactory() { - return clientFactory == null ? createClientFactory(serviceInterface) : clientFactory; - } - - /** - * Prepare the async client factory, which will create client class instances from transports. - * - * @return The client factory to use. - * @throws ThriftFactoryException If there was a problem creating the factory. - */ - private Function<TTransport, T> getAsyncClientFactory() throws ThriftFactoryException { - try { - return clientFactory == null ? createAsyncClientFactory(serviceInterface) : clientFactory; - } catch (IOException e) { - throw new ThriftFactoryException("Failed to create async client factory.", e); - } - } - - private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool( - Set<InetSocketAddress> backends, LoadBalancer<InetSocketAddress> loadBalancer, - Closure<Collection<InetSocketAddress>> onBackendsChosen, boolean nonblocking) { - - ImmutableMap.Builder<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>> - backendBuilder = ImmutableMap.builder(); - for (InetSocketAddress backend : backends) { - backendBuilder.put(backend, createConnectionPool(backend, nonblocking)); - } - - return new MetaPool<TTransport, InetSocketAddress>(backendBuilder.build(), - loadBalancer, onBackendsChosen, connectionRestoreInterval); - } - - private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool( - DynamicHostSet<ServiceInstance> hostSet, LoadBalancer<InetSocketAddress> loadBalancer, - Closure<Collection<InetSocketAddress>> onBackendsChosen, - final boolean nonblocking, Optional<String> serviceEndpointName) - throws ThriftFactoryException { - - Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>> - endpointPoolFactory = - new Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>() { - @Override public ObjectPool<Connection<TTransport, InetSocketAddress>> apply( - InetSocketAddress endpoint) { - return createConnectionPool(endpoint, nonblocking); - } - }; - - try { - return new DynamicPool<ServiceInstance, TTransport, InetSocketAddress>(hostSet, - endpointPoolFactory, loadBalancer, onBackendsChosen, connectionRestoreInterval, - Util.getAddress(serviceEndpointName), Util.IS_ALIVE); - } catch (DynamicHostSet.MonitorException e) { - throw new ThriftFactoryException("Failed to monitor host set.", e); - } - } - - private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool( - InetSocketAddress backend, boolean nonblocking) { - - ThriftConnectionFactory connectionFactory = new ThriftConnectionFactory( - backend, maxConnectionsPerEndpoint, TransportType.get(framedTransport, nonblocking), - socketTimeout, postCreateCallback, sslTransport); - - return new ConnectionPool<Connection<TTransport, InetSocketAddress>>(connectionFactory, - statsProvider); - } - - @VisibleForTesting - public ThriftFactory<T> withClientFactory(Function<TTransport, T> clientFactory) { - this.clientFactory = Preconditions.checkNotNull(clientFactory); - - return this; - } - - public ThriftFactory<T> withSslEnabled() { - this.sslTransport = true; - return this; - } - - /** - * Specifies the maximum number of connections that should be made to any single endpoint. - * - * @param maxConnectionsPerEndpoint Maximum number of connections per endpoint. - * @return A reference to the factory. - */ - public ThriftFactory<T> withMaxConnectionsPerEndpoint(int maxConnectionsPerEndpoint) { - Preconditions.checkArgument(maxConnectionsPerEndpoint > 0); - this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint; - - return this; - } - - /** - * Specifies the interval at which dead endpoint connections should be checked and revived. - * - * @param connectionRestoreInterval the time interval to check. - * @return A reference to the factory. - */ - public ThriftFactory<T> withDeadConnectionRestoreInterval( - Amount<Long, Time> connectionRestoreInterval) { - Preconditions.checkNotNull(connectionRestoreInterval); - Preconditions.checkArgument(connectionRestoreInterval.getValue() >= 0, - "A negative interval is invalid: %s", connectionRestoreInterval); - this.connectionRestoreInterval = connectionRestoreInterval; - - return this; - } - - /** - * Instructs the factory whether framed transport should be used. - * - * @param framedTransport Whether to use framed transport. - * @return A reference to the factory. - */ - public ThriftFactory<T> useFramedTransport(boolean framedTransport) { - this.framedTransport = framedTransport; - - return this; - } - - /** - * Specifies the load balancer to use when interacting with multiple backends. - * - * @param strategy Load balancing strategy. - * @return A reference to the factory. - */ - public ThriftFactory<T> withLoadBalancingStrategy( - LoadBalancingStrategy<InetSocketAddress> strategy) { - this.loadBalancingStrategy = Preconditions.checkNotNull(strategy); - - return this; - } - - private LoadBalancer<InetSocketAddress> createLoadBalancer() { - if (loadBalancingStrategy == null) { - loadBalancingStrategy = createDefaultLoadBalancingStrategy(); - } - - return LoadBalancerImpl.create(TrafficMonitorAdapter.create(loadBalancingStrategy, monitor)); - } - - private LoadBalancingStrategy<InetSocketAddress> createDefaultLoadBalancingStrategy() { - Function<InetSocketAddress, BackoffDecider> backoffFactory = - new Function<InetSocketAddress, BackoffDecider>() { - @Override public BackoffDecider apply(InetSocketAddress socket) { - BackoffStrategy backoffStrategy = new TruncatedBinaryBackoff( - Amount.of(2L, Time.SECONDS), Amount.of(10L, Time.SECONDS)); - - return BackoffDecider.builder(socket.toString()) - .withTolerateFailureRate(0.2) - .withRequestWindow(Amount.of(1L, Time.SECONDS)) - .withSeedSize(5) - .withStrategy(backoffStrategy) - .withRecoveryType(BackoffDecider.RecoveryType.FULL_CAPACITY) - .withStatsProvider(statsProvider) - .build(); - } - }; - - return new MarkDeadStrategyWithHostCheck<InetSocketAddress>( - new LeastConnectedStrategy<InetSocketAddress>(), backoffFactory); - } - - /** - * Specifies the net read/write timeout to set via SO_TIMEOUT on the thrift blocking client - * or AsyncClient.setTimeout on the thrift async client. Defaults to the connectTimeout on - * the blocking client if not set. - * - * @param socketTimeout timeout on thrift i/o operations - * @return A reference to the factory. - */ - public ThriftFactory<T> withSocketTimeout(Amount<Long, Time> socketTimeout) { - this.socketTimeout = Preconditions.checkNotNull(socketTimeout); - Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0); - - return this; - } - - /** - * Specifies the callback to notify when a connection has been created. The callback may - * be used to make thrift calls to the connection, but must not invalidate it. - * Defaults to a no-op closure. - * - * @param postCreateCallback function to setup new connections - * @return A reference to the factory. - */ - public ThriftFactory<T> withPostCreateCallback( - Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback) { - this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback); - - return this; - } - - /** - * Registers a custom stats provider to use to track various client stats. - * - * @param statsProvider the {@code StatsProvider} to use - * @return A reference to the factory. - */ - public ThriftFactory<T> withStatsProvider(StatsProvider statsProvider) { - this.statsProvider = Preconditions.checkNotNull(statsProvider); - - return this; - } - - /** - * Name to be passed to Thrift constructor, used in stats. - * - * @param serviceName string to use - * @return A reference to the factory. - */ - public ThriftFactory<T> withServiceName(String serviceName) { - this.serviceName = MorePreconditions.checkNotBlank(serviceName); - - return this; - } - - /** - * Set the end-point to use from {@link ServiceInstance#getAdditionalEndpoints()}. - * If not set, the default behavior is to use {@link ServiceInstance#getServiceEndpoint()}. - * - * @param endpointName the (optional) name of the end-point, if unset - the - * default/primary end-point is selected - * @return a reference to the factory for chaining - */ - public ThriftFactory<T> withEndpointName(String endpointName) { - this.endpointName = Optional.of(endpointName); - return this; - } - - private static <T> Function<TTransport, T> createClientFactory(Class<T> serviceInterface) { - final Constructor<? extends T> implementationConstructor = - findImplementationConstructor(serviceInterface); - - return new Function<TTransport, T>() { - @Override public T apply(TTransport transport) { - try { - return implementationConstructor.newInstance(new TBinaryProtocol(transport)); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private <T> Function<TTransport, T> createAsyncClientFactory( - final Class<T> serviceInterface) throws IOException { - - final TAsyncClientManager clientManager = new TAsyncClientManager(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override public void run() { - clientManager.stop(); - } - }); - - final Constructor<? extends T> implementationConstructor = - findAsyncImplementationConstructor(serviceInterface); - - return new Function<TTransport, T>() { - @Override public T apply(TTransport transport) { - Preconditions.checkNotNull(transport); - Preconditions.checkArgument(transport instanceof TNonblockingTransport, - "Invalid transport provided to client factory: " + transport.getClass()); - - try { - T client = implementationConstructor.newInstance(new TBinaryProtocol.Factory(), - clientManager, transport); - - if (socketTimeout != null) { - ((TAsyncClient) client).setTimeout(socketTimeout.as(Time.MILLISECONDS)); - } - - return client; - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static <T> Constructor<? extends T> findImplementationConstructor( - final Class<T> serviceInterface) { - Class<? extends T> implementationClass = findImplementationClass(serviceInterface); - try { - return implementationClass.getConstructor(TProtocol.class); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("Failed to find a single argument TProtocol constructor " - + "in service client class: " + implementationClass); - } - } - - private static <T> Constructor<? extends T> findAsyncImplementationConstructor( - final Class<T> serviceInterface) { - Class<? extends T> implementationClass = findImplementationClass(serviceInterface); - try { - return implementationClass.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, - TNonblockingTransport.class); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("Failed to find expected constructor " - + "in service client class: " + implementationClass); - } - } - - @SuppressWarnings("unchecked") - private static <T> Class<? extends T> findImplementationClass(final Class<T> serviceInterface) { - try { - return (Class<? extends T>) - Iterables.find(ImmutableList.copyOf(serviceInterface.getEnclosingClass().getClasses()), - new Predicate<Class<?>>() { - @Override public boolean apply(Class<?> inner) { - return !serviceInterface.equals(inner) - && serviceInterface.isAssignableFrom(inner); - } - }); - } catch (NoSuchElementException e) { - throw new IllegalArgumentException("Could not find a sibling enclosed implementation of " - + "service interface: " + serviceInterface); - } - } - - public static class ThriftFactoryException extends Exception { - public ThriftFactoryException(String msg) { - super(msg); - } - - public ThriftFactoryException(String msg, Throwable t) { - super(msg, t); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/Util.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/Util.java b/commons/src/main/java/com/twitter/common/thrift/Util.java deleted file mode 100644 index 1435999..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/Util.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift; - -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - -import org.apache.thrift.TBase; -import org.apache.thrift.TFieldIdEnum; -import org.apache.thrift.meta_data.FieldMetaData; - -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; - -/** - * Utility functions for thrift. - * - * @author William Farner - */ -public class Util { - - /** - * Maps a {@link ServiceInstance} to an {@link InetSocketAddress} given the {@code endpointName}. - * - * @param optionalEndpointName the name of the end-point on the service's additional end-points, - * if not set, maps to the primary service end-point - */ - public static Function<ServiceInstance, InetSocketAddress> getAddress( - final Optional<String> optionalEndpointName) { - if (!optionalEndpointName.isPresent()) { - return GET_ADDRESS; - } - - final String endpointName = optionalEndpointName.get(); - return getAddress( - new Function<ServiceInstance, Endpoint>() { - @Override public Endpoint apply(@Nullable ServiceInstance serviceInstance) { - Map<String, Endpoint> endpoints = serviceInstance.getAdditionalEndpoints(); - Preconditions.checkArgument(endpoints.containsKey(endpointName), - "Did not find end-point %s on %s", endpointName, serviceInstance); - return endpoints.get(endpointName); - } - }); - } - - private static Function<ServiceInstance, InetSocketAddress> getAddress( - final Function<ServiceInstance, Endpoint> serviceToEndpoint) { - return new Function<ServiceInstance, InetSocketAddress>() { - @Override public InetSocketAddress apply(ServiceInstance serviceInstance) { - Endpoint endpoint = serviceToEndpoint.apply(serviceInstance); - return InetSocketAddress.createUnresolved(endpoint.getHost(), endpoint.getPort()); - } - }; - } - - private static Function<ServiceInstance, Endpoint> GET_PRIMARY_ENDPOINT = - new Function<ServiceInstance, Endpoint>() { - @Override public Endpoint apply(ServiceInstance input) { - return input.getServiceEndpoint(); - } - }; - - public static Function<ServiceInstance, InetSocketAddress> GET_ADDRESS = - getAddress(GET_PRIMARY_ENDPOINT); - - public static final Predicate<ServiceInstance> IS_ALIVE = new Predicate<ServiceInstance>() { - @Override public boolean apply(ServiceInstance serviceInstance) { - switch (serviceInstance.getStatus()) { - case ALIVE: - return true; - - // We'll be optimistic here and let MTCP's ranking deal with - // unhealthy services in a WARNING state. - case WARNING: - return true; - - // Services which are just starting up, on the other hand... are much easier to just not - // send requests to. The STARTING state is useful to distinguish from WARNING or ALIVE: - // you exist in ZooKeeper, but don't yet serve traffic. - case STARTING: - default: - return false; - } - } - }; - - /** - * Pretty-prints a thrift object contents. - * - * @param t The thrift object to print. - * @return The pretty-printed version of the thrift object. - */ - public static String prettyPrint(TBase t) { - return t == null ? "null" : printTbase(t, 0); - } - - /** - * Prints an object contained in a thrift message. - * - * @param o The object to print. - * @param depth The print nesting level. - * @return The pretty-printed version of the thrift field. - */ - private static String printValue(Object o, int depth) { - if (o == null) { - return "null"; - } else if (TBase.class.isAssignableFrom(o.getClass())) { - return "\n" + printTbase((TBase) o, depth + 1); - } else if (Map.class.isAssignableFrom(o.getClass())) { - return printMap((Map) o, depth + 1); - } else if (List.class.isAssignableFrom(o.getClass())) { - return printList((List) o, depth + 1); - } else if (Set.class.isAssignableFrom(o.getClass())) { - return printSet((Set) o, depth + 1); - } else if (String.class == o.getClass()) { - return '"' + o.toString() + '"'; - } else { - return o.toString(); - } - } - - private static final String METADATA_MAP_FIELD_NAME = "metaDataMap"; - - /** - * Prints a TBase. - * - * @param t The object to print. - * @param depth The print nesting level. - * @return The pretty-printed version of the TBase. - */ - private static String printTbase(TBase t, int depth) { - List<String> fields = Lists.newArrayList(); - for (Map.Entry<? extends TFieldIdEnum, FieldMetaData> entry : - FieldMetaData.getStructMetaDataMap(t.getClass()).entrySet()) { - @SuppressWarnings("unchecked") - boolean fieldSet = t.isSet(entry.getKey()); - String strValue; - if (fieldSet) { - @SuppressWarnings("unchecked") - Object value = t.getFieldValue(entry.getKey()); - strValue = printValue(value, depth); - } else { - strValue = "not set"; - } - fields.add(tabs(depth) + entry.getValue().fieldName + ": " + strValue); - } - - return Joiner.on("\n").join(fields); - } - - /** - * Prints a map in a style that is consistent with TBase pretty printing. - * - * @param map The map to print - * @param depth The print nesting level. - * @return The pretty-printed version of the map. - */ - private static String printMap(Map<?, ?> map, int depth) { - List<String> entries = Lists.newArrayList(); - for (Map.Entry entry : map.entrySet()) { - entries.add(tabs(depth) + printValue(entry.getKey(), depth) - + " = " + printValue(entry.getValue(), depth)); - } - - return entries.isEmpty() ? "{}" - : String.format("{\n%s\n%s}", Joiner.on(",\n").join(entries), tabs(depth - 1)); - } - - /** - * Prints a list in a style that is consistent with TBase pretty printing. - * - * @param list The list to print - * @param depth The print nesting level. - * @return The pretty-printed version of the list - */ - private static String printList(List<?> list, int depth) { - List<String> entries = Lists.newArrayList(); - for (int i = 0; i < list.size(); i++) { - entries.add( - String.format("%sItem[%d] = %s", tabs(depth), i, printValue(list.get(i), depth))); - } - - return entries.isEmpty() ? "[]" - : String.format("[\n%s\n%s]", Joiner.on(",\n").join(entries), tabs(depth - 1)); - } - /** - * Prints a set in a style that is consistent with TBase pretty printing. - * - * @param set The set to print - * @param depth The print nesting level. - * @return The pretty-printed version of the set - */ - private static String printSet(Set<?> set, int depth) { - List<String> entries = Lists.newArrayList(); - for (Object item : set) { - entries.add( - String.format("%sItem = %s", tabs(depth), printValue(item, depth))); - } - - return entries.isEmpty() ? "{}" - : String.format("{\n%s\n%s}", Joiner.on(",\n").join(entries), tabs(depth - 1)); - } - - private static String tabs(int n) { - return Strings.repeat(" ", n); - } - - private Util() { - // Utility class. - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/Caller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/Caller.java b/commons/src/main/java/com/twitter/common/thrift/callers/Caller.java deleted file mode 100644 index 63dd709..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/callers/Caller.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.callers; - -import com.google.common.base.Preconditions; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import org.apache.thrift.async.AsyncMethodCallback; - -import javax.annotation.Nullable; -import java.lang.reflect.Method; - -/** -* A caller that invokes a method on an object. -* -* @author William Farner -*/ -public interface Caller { - - /** - * Invokes a method on an object, using the given arguments. The method call may be - * asynchronous, in which case {@code callback} will be non-null. - * - * @param method The method being invoked. - * @param args The arguments to call {@code method} with. - * @param callback The callback to use if the method is asynchronous. - * @param connectTimeoutOverride Optional override for the default connection timeout. - * @return The return value from invoking the method. - * @throws Throwable Exception, as prescribed by the method's contract. - */ - public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, - @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable; - - /** - * Captures the result of a request, whether synchronous or asynchronous. It should be expected - * that for every request made, exactly one of these methods will be called. - */ - static interface ResultCapture { - /** - * Called when the request completed successfully. - */ - void success(); - - /** - * Called when the request failed. - * - * @param t Throwable that was caught. Must never be null. - * @return {@code true} if a wrapped callback should be notified of the failure, - * {@code false} otherwise. - */ - boolean fail(Throwable t); - } - - /** - * A callback that adapts a {@link ResultCapture} with an {@link AsyncMethodCallback} while - * maintaining the AsyncMethodCallback interface. The wrapped callback will handle invocation - * of the underlying callback based on the return values from the ResultCapture. - */ - static class WrappedMethodCallback implements AsyncMethodCallback { - private final AsyncMethodCallback wrapped; - private final ResultCapture capture; - - private boolean callbackTriggered = false; - - public WrappedMethodCallback(AsyncMethodCallback wrapped, ResultCapture capture) { - this.wrapped = wrapped; - this.capture = capture; - } - - private void callbackTriggered() { - Preconditions.checkState(!callbackTriggered, "Each callback may only be triggered once."); - callbackTriggered = true; - } - - @Override @SuppressWarnings("unchecked") public void onComplete(Object o) { - capture.success(); - wrapped.onComplete(o); - callbackTriggered(); - } - - @Override public void onError(Exception t) { - if (capture.fail(t)) { - wrapped.onError(t); - callbackTriggered(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/CallerDecorator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/CallerDecorator.java b/commons/src/main/java/com/twitter/common/thrift/callers/CallerDecorator.java deleted file mode 100644 index 06ce9a2..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/callers/CallerDecorator.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.callers; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import org.apache.thrift.async.AsyncMethodCallback; - -import javax.annotation.Nullable; -import java.lang.reflect.Method; - -/** -* A caller that decorates another caller. -* -* @author William Farner -*/ -abstract class CallerDecorator implements Caller { - private final Caller decoratedCaller; - private final boolean async; - - CallerDecorator(Caller decoratedCaller, boolean async) { - this.decoratedCaller = decoratedCaller; - this.async = async; - } - - /** - * Convenience method for invoking the method and shunting the capture into the callback if - * the call is asynchronous. - * - * @param method The method being invoked. - * @param args The arguments to call {@code method} with. - * @param callback The callback to use if the method is asynchronous. - * @param capture The result capture to notify of the call result. - * @param connectTimeoutOverride Optional override for the default connection timeout. - * @return The return value from invoking the method. - * @throws Throwable Exception, as prescribed by the method's contract. - */ - protected final Object invoke(Method method, Object[] args, - @Nullable AsyncMethodCallback callback, @Nullable final ResultCapture capture, - @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable { - - // Swap the wrapped callback out for ours. - if (callback != null) { - callback = new WrappedMethodCallback(callback, capture); - } - - try { - Object result = decoratedCaller.call(method, args, callback, connectTimeoutOverride); - if (callback == null && capture != null) capture.success(); - - return result; - } catch (Exception t) { - // We allow this one to go to both sync and async captures. - if (callback != null) { - callback.onError(t); - return null; - } else { - if (capture != null) capture.fail(t); - throw t; - } - } - } - - boolean isAsync() { - return async; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/DeadlineCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/DeadlineCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/DeadlineCaller.java deleted file mode 100644 index d59b82d..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/callers/DeadlineCaller.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.callers; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeoutException; - -import javax.annotation.Nullable; - -import com.google.common.base.Throwables; - -import org.apache.thrift.async.AsyncMethodCallback; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.thrift.TResourceExhaustedException; -import com.twitter.common.thrift.TTimeoutException; - -/** - * A caller that imposes a time deadline on the underlying caller. If the underlying calls fail - * to meet the deadline {@link TTimeoutException} is thrown. If the executor service rejects - * execution of a task, {@link TResourceExhaustedException} is thrown. - * - * @author William Farner - */ -public class DeadlineCaller extends CallerDecorator { - private final ExecutorService executorService; - private final Amount<Long, Time> timeout; - - /** - * Creates a new deadline caller. - * - * @param decoratedCaller The caller to decorate with a deadline. - * @param async Whether the caller is asynchronous. - * @param executorService The executor service to use for performing calls. - * @param timeout The timeout by which the underlying call should complete in. - */ - public DeadlineCaller(Caller decoratedCaller, boolean async, ExecutorService executorService, - Amount<Long, Time> timeout) { - super(decoratedCaller, async); - - this.executorService = executorService; - this.timeout = timeout; - } - - @Override - public Object call(final Method method, final Object[] args, - @Nullable final AsyncMethodCallback callback, - @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Throwable { - try { - Future<Object> result = executorService.submit(new Callable<Object>() { - @Override public Object call() throws Exception { - try { - return invoke(method, args, callback, null, connectTimeoutOverride); - } catch (Throwable t) { - Throwables.propagateIfInstanceOf(t, Exception.class); - throw new RuntimeException(t); - } - } - }); - - try { - return result.get(timeout.getValue(), timeout.getUnit().getTimeUnit()); - } catch (TimeoutException e) { - result.cancel(true); - throw new TTimeoutException(e); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (RejectedExecutionException e) { - throw new TResourceExhaustedException(e); - } catch (InvocationTargetException e) { - throw e.getCause(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/thrift/callers/DebugCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/callers/DebugCaller.java b/commons/src/main/java/com/twitter/common/thrift/callers/DebugCaller.java deleted file mode 100644 index 0d73d67..0000000 --- a/commons/src/main/java/com/twitter/common/thrift/callers/DebugCaller.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.thrift.callers; - -import com.google.common.base.Joiner; -import com.google.common.base.Throwables; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import org.apache.thrift.async.AsyncMethodCallback; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.logging.Logger; - -/** - * A caller that reports debugging information about calls. - * - * @author William Farner - */ -public class DebugCaller extends CallerDecorator { - private static final Logger LOG = Logger.getLogger(DebugCaller.class.getName()); - private static final Joiner ARG_JOINER = Joiner.on(", "); - - /** - * Creates a new debug caller. - * - * @param decoratedCaller The caller to decorate with debug information. - * @param async Whether the caller is asynchronous. - */ - public DebugCaller(Caller decoratedCaller, boolean async) { - super(decoratedCaller, async); - } - - @Override - public Object call(final Method method, final Object[] args, - @Nullable AsyncMethodCallback callback, @Nullable Amount<Long, Time> connectTimeoutOverride) - throws Throwable { - ResultCapture capture = new ResultCapture() { - @Override public void success() { - // No-op. - } - - @Override public boolean fail(Throwable t) { - StringBuilder message = new StringBuilder("Thrift call failed: "); - message.append(method.getName()).append("("); - ARG_JOINER.appendTo(message, args); - message.append(")"); - LOG.warning(message.toString()); - - return true; - } - }; - - try { - return invoke(method, args, callback, capture, connectTimeoutOverride); - } catch (Throwable t) { - capture.fail(t); - throw t; - } - } -}