Repository: twill Updated Branches: refs/heads/master a91ecd403 -> 6d1490ba2
TWILL-262 YarnUtils#cloneHaNnCredentials uses DFSUtil#getHaNnRpcAddresses, which is removed from DFSUtils from hadoop-2.8 This closes #71 on Github Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6d1490ba Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6d1490ba Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6d1490ba Branch: refs/heads/master Commit: 6d1490ba221a7b2bd6b56f64ef50c9539e0e4096 Parents: a91ecd4 Author: lihongyuan <[email protected]> Authored: Sat Sep 29 14:49:47 2018 +0800 Committer: Terence Yim <[email protected]> Committed: Wed Dec 12 10:39:09 2018 -0800 ---------------------------------------------------------------------- .../apache/twill/internal/yarn/YarnUtils.java | 77 +++++++++++++++++++- 1 file changed, 74 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/6d1490ba/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java index d7e6eb0..3a2f4a5 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java @@ -51,16 +51,21 @@ import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; + /** * Collection of helper methods to simplify YARN calls. */ public class YarnUtils { + private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class); + + /** * Defines different versions of Hadoop. */ @@ -72,7 +77,32 @@ public class YarnUtils { HADOOP_26 } - private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class); + private static boolean hasDFSUtilClient = false; // use this to judge if the hadoop version is above 2.8 + + private static boolean hasHAUtilsClient = false; + + private static Method getHaNnRpcAddressesMethod; + + private static Method cloneDelegationTokenForLogicalUriMethod; + + static { + try { + Class dfsUtilsClientClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtilClient"); + getHaNnRpcAddressesMethod = dfsUtilsClientClazz.getMethod("getHaNnRpcAddresses", + Configuration.class); + hasDFSUtilClient = true; + Class haUtilClientClazz = Class.forName("org.apache.hadoop.hdfs.HAUtilClient"); + cloneDelegationTokenForLogicalUriMethod = haUtilClientClazz.getMethod( + "cloneDelegationTokenForLogicalUri", UserGroupInformation.class, + URI.class, Collection.class); + hasHAUtilsClient = true; + } catch (ClassNotFoundException e) { + LOG.debug("No such class", e); + } catch (NoSuchMethodException e) { + LOG.debug("No such method", e); + } + } + private static final AtomicReference<HadoopVersions> HADOOP_VERSION = new AtomicReference<>(); public static YarnLocalResource createLocalResource(LocalFile localFile) { @@ -185,7 +215,7 @@ public class YarnUtils { CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme(); // Loop through all name services. Each name service could have multiple name node associated with it. - for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) { + for (Map.Entry<String, Map<String, InetSocketAddress>> entry : getHaNnRpcAddresses(config).entrySet()) { String nsId = entry.getKey(); Map<String, InetSocketAddress> addressesInNN = entry.getValue(); if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) { @@ -198,7 +228,47 @@ public class YarnUtils { URI uri = URI.create(scheme + "://" + nsId); LOG.info("Cloning delegation token for uri {}", uri); - HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values()); + cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values()); + } + } + + /** + * When hadoop_version > 2.8.0, class HAUtil has no method cloneDelegationTokenForLogicalUri(Configuration config) + * + */ + private static void cloneDelegationTokenForLogicalUri(UserGroupInformation ugi, URI haUri, + Collection<InetSocketAddress> nnAddrs) { + if (hasHAUtilsClient) { + invokeStaticMethodWithExceptionHandled(cloneDelegationTokenForLogicalUriMethod, ugi, haUri, nnAddrs); + } else { + HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); + } + } + + + /** + * When hadoop_version > 2.8.0, class DFSUtils has no method getHaNnRpcAddresses(Configuration config) + * @param config + * @return + */ + private static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(Configuration config) { + return hasDFSUtilClient ? getHaNnRpcAddressesUseDFSUtilClient(config) : + DFSUtil.getHaNnRpcAddresses(config); + } + + private static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddressesUseDFSUtilClient(Configuration config) { + return (Map) invokeStaticMethodWithExceptionHandled(getHaNnRpcAddressesMethod, config); + } + + private static Object invokeStaticMethodWithExceptionHandled(Method method, Object ... args) { + Preconditions.checkNotNull(method); + try { + return method.invoke(null, args); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug(e.getMessage(), e); + } + throw Throwables.propagate(e); } } @@ -345,6 +415,7 @@ public class YarnUtils { return null; } + private YarnUtils() { } }
