http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8e03592/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java new file mode 100644 index 0000000..3a32be1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -0,0 +1,856 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A client proxy for Router -> NN communication using the NN ClientProtocol. + * <p> + * Provides routers to invoke remote ClientProtocol methods and handle + * retries/failover. + * <ul> + * <li>invokeSingle Make a single request to a single namespace + * <li>invokeSequential Make a sequential series of requests to multiple + * ordered namespaces until a condition is met. + * <li>invokeConcurrent Make concurrent requests to multiple namespaces and + * return all of the results. + * </ul> + * Also maintains a cached pool of connections to NNs. Connections are managed + * by the ConnectionManager and are unique to each user + NN. The size of the + * connection pool can be configured. Larger pools allow for more simultaneous + * requests to a single NN from a single user. + */ +public class RouterRpcClient { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterRpcClient.class); + + + /** Router identifier. */ + private final String routerId; + + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + + /** Connection pool to the Namenodes per user for performance. */ + private final ConnectionManager connectionManager; + /** Service to run asynchronous calls. */ + private final ExecutorService executorService; + /** Retry policy for router -> NN communication. */ + private final RetryPolicy retryPolicy; + + /** Pattern to parse a stack trace line. */ + private static final Pattern STACK_TRACE_PATTERN = + Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); + + + /** + * Create a router RPC client to manage remote procedure calls to NNs. + * + * @param conf Hdfs Configuation. + * @param resolver A NN resolver to determine the currently active NN in HA. + * @param monitor Optional performance monitor. + */ + public RouterRpcClient(Configuration conf, String identifier, + ActiveNamenodeResolver resolver) { + + this.routerId = identifier; + + this.namenodeResolver = resolver; + + this.connectionManager = new ConnectionManager(conf); + this.connectionManager.start(); + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("RPC Router Client-%d") + .build(); + this.executorService = Executors.newCachedThreadPool(threadFactory); + + int maxFailoverAttempts = conf.getInt( + HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, + HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); + int maxRetryAttempts = conf.getInt( + HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, + HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); + int failoverSleepBaseMillis = conf.getInt( + HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, + HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); + int failoverSleepMaxMillis = conf.getInt( + HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, + HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); + this.retryPolicy = RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts, + failoverSleepBaseMillis, failoverSleepMaxMillis); + } + + /** + * Shutdown the client. + */ + public void shutdown() { + if (this.connectionManager != null) { + this.connectionManager.close(); + } + if (this.executorService != null) { + this.executorService.shutdownNow(); + } + } + + /** + * Total number of available sockets between the router and NNs. + * + * @return Number of namenode clients. + */ + public int getNumConnections() { + return this.connectionManager.getNumConnections(); + } + + /** + * Total number of available sockets between the router and NNs. + * + * @return Number of namenode clients. + */ + public int getNumActiveConnections() { + return this.connectionManager.getNumActiveConnections(); + } + + /** + * Total number of open connection pools to a NN. Each connection pool. + * represents one user + one NN. + * + * @return Number of connection pools. + */ + public int getNumConnectionPools() { + return this.connectionManager.getNumConnectionPools(); + } + + /** + * Number of connections between the router and NNs being created sockets. + * + * @return Number of connections waiting to be created. + */ + public int getNumCreatingConnections() { + return this.connectionManager.getNumCreatingConnections(); + } + + /** + * Get ClientProtocol proxy client for a NameNode. Each combination of user + + * NN must use a unique proxy client. Previously created clients are cached + * and stored in a connection pool by the ConnectionManager. + * + * @param ugi User group information. + * @param nsId Nameservice identifier. + * @param rpcAddress ClientProtocol RPC server address of the NN. + * @return ConnectionContext containing a ClientProtocol proxy client for the + * NN + current user. + * @throws IOException If we cannot get a connection to the NameNode. + */ + private ConnectionContext getConnection( + UserGroupInformation ugi, String nsId, String rpcAddress) + throws IOException { + ConnectionContext connection = null; + try { + // Each proxy holds the UGI info for the current user when it is created. + // This cache does not scale very well, one entry per user per namenode, + // and may need to be adjusted and/or selectively pruned. The cache is + // important due to the excessive overhead of creating a new proxy wrapper + // for each individual request. + + // TODO Add tokens from the federated UGI + connection = this.connectionManager.getConnection(ugi, rpcAddress); + LOG.debug("User {} NN {} is using connection {}", + ugi.getUserName(), rpcAddress, connection); + } catch (Exception ex) { + LOG.error("Cannot open NN client to address: {}", rpcAddress, ex); + } + + if (connection == null) { + throw new IOException("Cannot get a connection to " + rpcAddress); + } + return connection; + } + + /** + * Convert an exception to an IOException. + * + * For a non-IOException, wrap it with IOException. For a RemoteException, + * unwrap it. For an IOException which is not a RemoteException, return it. + * + * @param e Exception to convert into an exception. + * @return Created IO exception. + */ + private static IOException toIOException(Exception e) { + if (e instanceof RemoteException) { + return ((RemoteException) e).unwrapRemoteException(); + } + if (e instanceof IOException) { + return (IOException)e; + } + return new IOException(e); + } + + /** + * If we should retry the RPC call. + * + * @param ex Exception reported. + * @param retryCount Number of retries. + * @return Retry decision. + * @throws IOException Original exception if the retry policy generates one. + */ + private RetryDecision shouldRetry(final IOException ioe, final int retryCount) + throws IOException { + try { + final RetryPolicy.RetryAction a = + this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); + return a.action; + } catch (Exception ex) { + LOG.error("Re-throwing API exception, no more retries", ex); + throw toIOException(ex); + } + } + + /** + * Invokes a method against the ClientProtocol proxy server. If a standby + * exception is generated by the call to the client, retries using the + * alternate server. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param ugi User group information. + * @param namenodes A prioritized list of namenodes within the same + * nameservice. + * @param method Remote ClientProtcol method to invoke. + * @param params Variable list of parameters matching the method. + * @return The result of invoking the method. + * @throws IOException + */ + private Object invokeMethod( + final UserGroupInformation ugi, + final List<? extends FederationNamenodeContext> namenodes, + final Method method, final Object... params) throws IOException { + + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("No namenodes to invoke " + method.getName() + + " with params " + Arrays.toString(params) + " from " + this.routerId); + } + + Object ret = null; + boolean failover = false; + Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>(); + for (FederationNamenodeContext namenode : namenodes) { + ConnectionContext connection = null; + try { + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); + connection = this.getConnection(ugi, nsId, rpcAddress); + ProxyAndInfo<ClientProtocol> client = connection.getClient(); + ClientProtocol proxy = client.getProxy(); + ret = invoke(0, method, proxy, params); + if (failover) { + // Success on alternate server, update + InetSocketAddress address = client.getAddress(); + namenodeResolver.updateActiveNamenode(nsId, address); + } + return ret; + } catch (IOException ioe) { + ioes.put(namenode, ioe); + if (ioe instanceof StandbyException) { + // Fail over indicated by retry policy and/or NN + failover = true; + } else if (ioe instanceof RemoteException) { + // RemoteException returned by NN + throw (RemoteException) ioe; + } else { + // Other communication error, this is a failure + // Communication retries are handled by the retry policy + throw ioe; + } + } finally { + if (connection != null) { + connection.release(); + } + } + } + + // All namenodes were unavailable or in standby + String msg = "No namenode available to invoke " + method.getName() + " " + + Arrays.toString(params); + LOG.error(msg); + for (Entry<FederationNamenodeContext, IOException> entry : + ioes.entrySet()) { + FederationNamenodeContext namenode = entry.getKey(); + String nsId = namenode.getNameserviceId(); + String nnId = namenode.getNamenodeId(); + String addr = namenode.getRpcAddress(); + IOException ioe = entry.getValue(); + if (ioe instanceof StandbyException) { + LOG.error("{} {} at {} is in Standby", nsId, nnId, addr); + } else { + LOG.error("{} {} at {} error: \"{}\"", + nsId, nnId, addr, ioe.getMessage()); + } + } + throw new StandbyException(msg); + } + + /** + * Invokes a method on the designated object. Catches exceptions specific to + * the invocation. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param method Method to invoke + * @param obj Target object for the method + * @param params Variable parameters + * @return Response from the remote server + * @throws IOException + * @throws InterruptedException + */ + private Object invoke(int retryCount, final Method method, final Object obj, + final Object... params) throws IOException { + try { + return method.invoke(obj, params); + } catch (IllegalAccessException e) { + LOG.error("Unexpected exception while proxying API", e); + return null; + } catch (IllegalArgumentException e) { + LOG.error("Unexpected exception while proxying API", e); + return null; + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + IOException ioe = (IOException) cause; + // Check if we should retry. + RetryDecision decision = shouldRetry(ioe, retryCount); + if (decision == RetryDecision.RETRY) { + // retry + return invoke(++retryCount, method, obj, params); + } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { + // failover, invoker looks for standby exceptions for failover. + if (ioe instanceof StandbyException) { + throw ioe; + } else { + throw new StandbyException(ioe.getMessage()); + } + } else { + if (ioe instanceof RemoteException) { + RemoteException re = (RemoteException) ioe; + ioe = re.unwrapRemoteException(); + ioe = getCleanException(ioe); + } + throw ioe; + } + } else { + throw new IOException(e); + } + } + } + + /** + * Get a clean copy of the exception. Sometimes the exceptions returned by the + * server contain the full stack trace in the message. + * + * @param ioe Exception to clean up. + * @return Copy of the original exception with a clean message. + */ + private static IOException getCleanException(IOException ioe) { + IOException ret = null; + + String msg = ioe.getMessage(); + Throwable cause = ioe.getCause(); + StackTraceElement[] stackTrace = ioe.getStackTrace(); + + // Clean the message by removing the stack trace + int index = msg.indexOf("\n"); + if (index > 0) { + String[] msgSplit = msg.split("\n"); + msg = msgSplit[0]; + + // Parse stack trace from the message + List<StackTraceElement> elements = new LinkedList<>(); + for (int i=1; i<msgSplit.length; i++) { + String line = msgSplit[i]; + Matcher matcher = STACK_TRACE_PATTERN.matcher(line); + if (matcher.find()) { + String declaringClass = matcher.group(1); + String methodName = matcher.group(2); + String fileName = matcher.group(3); + int lineNumber = Integer.parseInt(matcher.group(4)); + StackTraceElement element = new StackTraceElement( + declaringClass, methodName, fileName, lineNumber); + elements.add(element); + } + } + stackTrace = elements.toArray(new StackTraceElement[elements.size()]); + } + + // Create the new output exception + if (ioe instanceof RemoteException) { + RemoteException re = (RemoteException)ioe; + ret = new RemoteException(re.getClassName(), msg); + } else { + // Try the simple constructor and initialize the fields + Class<? extends IOException> ioeClass = ioe.getClass(); + try { + Constructor<? extends IOException> constructor = + ioeClass.getDeclaredConstructor(String.class); + ret = constructor.newInstance(msg); + } catch (ReflectiveOperationException e) { + // If there are errors, just use the input one + LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e); + ret = ioe; + } + } + if (ret != null) { + ret.initCause(cause); + ret.setStackTrace(stackTrace); + } + + return ret; + } + + /** + * Invokes a ClientProtocol method. Determines the target nameservice via a + * provided block. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param block Block used to determine appropriate nameservice. + * @param method The remote method and parameters to invoke. + * @return The result of invoking the method. + * @throws IOException + */ + public Object invokeSingle(final ExtendedBlock block, RemoteMethod method) + throws IOException { + String bpId = block.getBlockPoolId(); + return invokeSingleBlockPool(bpId, method); + } + + /** + * Invokes a ClientProtocol method. Determines the target nameservice using + * the block pool id. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param bpId Block pool identifier. + * @param method The remote method and parameters to invoke. + * @return The result of invoking the method. + * @throws IOException + */ + public Object invokeSingleBlockPool(final String bpId, RemoteMethod method) + throws IOException { + String nsId = getNameserviceForBlockPoolId(bpId); + return invokeSingle(nsId, method); + } + + /** + * Invokes a ClientProtocol method against the specified namespace. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param nsId Target namespace for the method. + * @param method The remote method and parameters to invoke. + * @return The result of invoking the method. + * @throws IOException + */ + public Object invokeSingle(final String nsId, RemoteMethod method) + throws IOException { + UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + List<? extends FederationNamenodeContext> nns = + getNamenodesForNameservice(nsId); + RemoteLocationContext loc = new RemoteLocation(nsId, "/"); + return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc)); + } + + /** + * Invokes a single proxy call for a single location. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param location RemoteLocation to invoke. + * @param remoteMethod The remote method and parameters to invoke. + * @return The result of invoking the method if successful. + * @throws IOException + */ + public Object invokeSingle(final RemoteLocationContext location, + RemoteMethod remoteMethod) throws IOException { + List<RemoteLocationContext> locations = Collections.singletonList(location); + return invokeSequential(locations, remoteMethod); + } + + /** + * Invokes sequential proxy calls to different locations. Continues to invoke + * calls until a call returns without throwing a remote exception. + * + * @param locations List of locations/nameservices to call concurrently. + * @param remoteMethod The remote method and parameters to invoke. + * @return The result of the first successful call, or if no calls are + * successful, the result of the last RPC call executed. + * @throws IOException if the success condition is not met and one of the RPC + * calls generated a remote exception. + */ + public Object invokeSequential( + final List<? extends RemoteLocationContext> locations, + final RemoteMethod remoteMethod) throws IOException { + return invokeSequential(locations, remoteMethod, null, null); + } + + /** + * Invokes sequential proxy calls to different locations. Continues to invoke + * calls until the success condition is met, or until all locations have been + * attempted. + * + * The success condition may be specified by: + * <ul> + * <li>An expected result class + * <li>An expected result value + * </ul> + * + * If no expected result class/values are specified, the success condition is + * a call that does not throw a remote exception. + * + * @param locations List of locations/nameservices to call concurrently. + * @param remoteMethod The remote method and parameters to invoke. + * @param expectedResultClass In order to be considered a positive result, the + * return type must be of this class. + * @param expectedResultValue In order to be considered a positive result, the + * return value must equal the value of this object. + * @return The result of the first successful call, or if no calls are + * successful, the result of the first RPC call executed. + * @throws IOException if the success condition is not met, return the first + * remote exception generated. + */ + public Object invokeSequential( + final List<? extends RemoteLocationContext> locations, + final RemoteMethod remoteMethod, Class<?> expectedResultClass, + Object expectedResultValue) throws IOException { + + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = remoteMethod.getMethod(); + IOException firstThrownException = null; + IOException lastThrownException = null; + Object firstResult = null; + // Invoke in priority order + for (final RemoteLocationContext loc : locations) { + String ns = loc.getNameserviceId(); + List<? extends FederationNamenodeContext> namenodes = + getNamenodesForNameservice(ns); + try { + Object[] params = remoteMethod.getParams(loc); + Object result = invokeMethod(ugi, namenodes, m, params); + // Check if the result is what we expected + if (isExpectedClass(expectedResultClass, result) && + isExpectedValue(expectedResultValue, result)) { + // Valid result, stop here + return result; + } + if (firstResult == null) { + firstResult = result; + } + } catch (IOException ioe) { + // Record it and move on + lastThrownException = (IOException) ioe; + if (firstThrownException == null) { + firstThrownException = lastThrownException; + } + } catch (Exception e) { + // Unusual error, ClientProtocol calls always use IOException (or + // RemoteException). Re-wrap in IOException for compatibility with + // ClientProtcol. + LOG.error("Unexpected exception {} proxying {} to {}", + e.getClass(), m.getName(), ns, e); + lastThrownException = new IOException( + "Unexpected exception proxying API " + e.getMessage(), e); + if (firstThrownException == null) { + firstThrownException = lastThrownException; + } + } + } + + if (firstThrownException != null) { + // re-throw the last exception thrown for compatibility + throw firstThrownException; + } + // Return the last result, whether it is the value we are looking for or a + return firstResult; + } + + /** + * Checks if a result matches the required result class. + * + * @param expectedResultClass Required result class, null to skip the check. + * @param result The result to check. + * @return True if the result is an instance of the required class or if the + * expected class is null. + */ + private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) { + if (expectedClass == null) { + return true; + } else if (clazz == null) { + return false; + } else { + return expectedClass.isInstance(clazz); + } + } + + /** + * Checks if a result matches the expected value. + * + * @param expectedResultValue The expected value, null to skip the check. + * @param result The result to check. + * @return True if the result is equals to the expected value or if the + * expected value is null. + */ + private static boolean isExpectedValue(Object expectedValue, Object value) { + if (expectedValue == null) { + return true; + } else if (value == null) { + return false; + } else { + return value.equals(expectedValue); + } + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param locations List of remote locations to call concurrently. + * @param remoteMethod The remote method and parameters to invoke. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param standby If the requests should go to the standby namenodes too. + * @return Result of invoking the method per subcluster: nsId -> result. + * @throws IOException If requiredResponse=true and any of the calls throw an + * exception. + */ + @SuppressWarnings("unchecked") + public <T extends RemoteLocationContext> Map<T, Object> invokeConcurrent( + final Collection<T> locations, final RemoteMethod method, + boolean requireResponse, boolean standby) throws IOException { + + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); + + if (locations.size() == 1) { + // Shortcut, just one call + T location = locations.iterator().next(); + String ns = location.getNameserviceId(); + final List<? extends FederationNamenodeContext> namenodes = + getNamenodesForNameservice(ns); + Object[] paramList = method.getParams(location); + Object result = invokeMethod(ugi, namenodes, m, paramList); + return Collections.singletonMap(location, result); + } + + List<T> orderedLocations = new LinkedList<>(); + Set<Callable<Object>> callables = new HashSet<>(); + for (final T location : locations) { + String nsId = location.getNameserviceId(); + final List<? extends FederationNamenodeContext> namenodes = + getNamenodesForNameservice(nsId); + final Object[] paramList = method.getParams(location); + if (standby) { + // Call the objectGetter to all NNs (including standby) + for (final FederationNamenodeContext nn : namenodes) { + String nnId = nn.getNamenodeId(); + final List<FederationNamenodeContext> nnList = + Collections.singletonList(nn); + T nnLocation = location; + if (location instanceof RemoteLocation) { + nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); + } + orderedLocations.add(nnLocation); + callables.add(new Callable<Object>() { + public Object call() throws Exception { + return invokeMethod(ugi, nnList, m, paramList); + } + }); + } + } else { + // Call the objectGetter in order of nameservices in the NS list + orderedLocations.add(location); + callables.add(new Callable<Object>() { + public Object call() throws Exception { + return invokeMethod(ugi, namenodes, m, paramList); + } + }); + } + } + + try { + List<Future<Object>> futures = executorService.invokeAll(callables); + Map<T, Object> results = new TreeMap<>(); + Map<T, IOException> exceptions = new TreeMap<>(); + for (int i=0; i<futures.size(); i++) { + T location = orderedLocations.get(i); + try { + Future<Object> future = futures.get(i); + Object result = future.get(); + results.put(location, result); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + LOG.debug("Canot execute {} in {}: {}", + m.getName(), location, cause.getMessage()); + + // Convert into IOException if needed + IOException ioe = null; + if (cause instanceof IOException) { + ioe = (IOException) cause; + } else { + ioe = new IOException("Unhandled exception while proxying API " + + m.getName() + ": " + cause.getMessage(), cause); + } + + // Response from all servers required, use this error. + if (requireResponse) { + throw ioe; + } + + // Store the exceptions + exceptions.put(location, ioe); + } + } + + // Throw the exception for the first location if there are no results + if (results.isEmpty()) { + T location = orderedLocations.get(0); + IOException ioe = exceptions.get(location); + if (ioe != null) { + throw ioe; + } + } + + return results; + } catch (InterruptedException ex) { + LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); + throw new IOException( + "Unexpected error while invoking API " + ex.getMessage(), ex); + } + } + + /** + * Get a prioritized list of NNs that share the same nameservice ID (in the + * same namespace). NNs that are reported as ACTIVE will be first in the list. + * + * @param nameserviceId The nameservice ID for the namespace. + * @return A prioritized list of NNs to use for communication. + * @throws IOException If a NN cannot be located for the nameservice ID. + */ + private List<? extends FederationNamenodeContext> getNamenodesForNameservice( + final String nsId) throws IOException { + + final List<? extends FederationNamenodeContext> namenodes = + namenodeResolver.getNamenodesForNameserviceId(nsId); + + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("Cannot locate a registered namenode for " + nsId + + " from " + this.routerId); + } + return namenodes; + } + + /** + * Get a prioritized list of NNs that share the same block pool ID (in the + * same namespace). NNs that are reported as ACTIVE will be first in the list. + * + * @param blockPoolId The blockpool ID for the namespace. + * @return A prioritized list of NNs to use for communication. + * @throws IOException If a NN cannot be located for the block pool ID. + */ + private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId( + final String bpId) throws IOException { + + List<? extends FederationNamenodeContext> namenodes = + namenodeResolver.getNamenodesForBlockPoolId(bpId); + + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("Cannot locate a registered namenode for " + bpId + + " from " + this.routerId); + } + return namenodes; + } + + /** + * Get the nameservice identifier for a block pool. + * + * @param bpId Identifier of the block pool. + * @return Nameservice identifier. + * @throws IOException If a NN cannot be located for the block pool ID. + */ + private String getNameserviceForBlockPoolId(final String bpId) + throws IOException { + List<? extends FederationNamenodeContext> namenodes = + getNamenodesForBlockPoolId(bpId); + FederationNamenodeContext namenode = namenodes.get(0); + return namenode.getNameserviceId(); + } +} \ No newline at end of file
--------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org