[FLINK-5153] Support YARN application tags Adds a new config option `yarn.tags`, a comma-separated list of strings passed to YARN as application tags.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c116e5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c116e5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c116e5 Branch: refs/heads/master Commit: d9c116e542889b6ad00485529b97652dc2d59cad Parents: e24a866 Author: Patrick Lucas <m...@patricklucas.com> Authored: Fri Feb 3 19:17:55 2017 -0500 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Feb 8 18:39:47 2017 +0100 ---------------------------------------------------------------------- docs/setup/config.md | 2 + .../flink/configuration/ConfigConstants.java | 5 + .../yarn/AbstractYarnClusterDescriptor.java | 114 ++++++++++++++----- 3 files changed, 93 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 2872cfa..2accdc2 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -431,6 +431,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports. +- `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application. + ### Mesos http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 14ba9dd..c608fde 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -442,6 +442,11 @@ public final class ConfigConstants { */ public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port"; + /** + * A comma-separated list of strings to use as YARN application tags. + */ + public static final String YARN_APPLICATION_TAGS = "yarn.tags"; + // ------------------------ Mesos Configuration ------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index b537e09..21599c1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -855,7 +855,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor name = customName; } - appContext.setApplicationName(name); // application name + appContext.setApplicationName(name); appContext.setApplicationType("Apache Flink"); appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); @@ -863,6 +863,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor appContext.setQueue(yarnQueue); } + setApplicationTags(appContext); + // add a hook to clean up in case deployment fails Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication); Runtime.getRuntime().addShutdownHook(deploymentFailureHook); @@ -1024,75 +1026,117 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor customName = name; } - private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException { + private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws + InvocationTargetException, IllegalAccessException { + ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); reflector.setKeepContainersAcrossApplicationAttempts(appContext, true); reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis()); } + private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException, + IllegalAccessException { + + final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); + final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, ""); + + final Set<String> applicationTags = new HashSet<>(); + + // Trim whitespace and cull empty tags + for (final String tag : tagsString.split(",")) { + final String trimmedTag = tag.trim(); + if (!trimmedTag.isEmpty()) { + applicationTags.add(trimmedTag); + } + } + + reflector.setApplicationTags(appContext, applicationTags); + } + /** * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext} - * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval - * methods. Depending on the Hadoop version these methods are supported or not. If the methods - * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or - * setAttemptFailuresValidityInterval are called. + * supports various methods which, depending on the Hadoop version, may or may not be supported. + * + * If an unsupported method is invoked, nothing happens. + * + * Currently three methods are proxied: + * - setApplicationTags (>= 2.4.0) + * - setAttemptFailuresValidityInterval (>= 2.6.0) + * - setKeepContainersAcrossApplicationAttempts (>= 2.4.0) */ private static class ApplicationSubmissionContextReflector { private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class); - private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class); + private static final ApplicationSubmissionContextReflector instance = + new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class); public static ApplicationSubmissionContextReflector getInstance() { return instance; } - private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts"; - private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval"; + private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags"; + private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval"; + private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts"; - private final Method keepContainersMethod; + private final Method applicationTagsMethod; private final Method attemptFailuresValidityIntervalMethod; + private final Method keepContainersMethod; private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) { - Method keepContainersMethod; + Method applicationTagsMethod; Method attemptFailuresValidityIntervalMethod; + Method keepContainersMethod; try { // this method is only supported by Hadoop 2.4.0 onwards - keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class); - LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName); + applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME); } catch (NoSuchMethodException e) { - LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName); + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME); // assign null because the Hadoop version apparently does not support this call. - keepContainersMethod = null; + applicationTagsMethod = null; } - this.keepContainersMethod = keepContainersMethod; + this.applicationTagsMethod = applicationTagsMethod; try { // this method is only supported by Hadoop 2.6.0 onwards - attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class); - LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); + attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME); } catch (NoSuchMethodException e) { - LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName); + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME); // assign null because the Hadoop version apparently does not support this call. attemptFailuresValidityIntervalMethod = null; } this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod; - } - public void setKeepContainersAcrossApplicationAttempts( - ApplicationSubmissionContext appContext, - boolean keepContainers) throws InvocationTargetException, IllegalAccessException { + try { + // this method is only supported by Hadoop 2.4.0 onwards + keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class); + LOG.debug("{} supports method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME); + } catch (NoSuchMethodException e) { + LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME); + // assign null because the Hadoop version apparently does not support this call. + keepContainersMethod = null; + } - if (keepContainersMethod != null) { - LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(), + this.keepContainersMethod = keepContainersMethod; + } + + public void setApplicationTags( + ApplicationSubmissionContext appContext, + Set<String> applicationTags) throws InvocationTargetException, IllegalAccessException { + if (applicationTagsMethod != null) { + LOG.debug("Calling method {} of {}.", + applicationTagsMethod.getName(), appContext.getClass().getCanonicalName()); - keepContainersMethod.invoke(appContext, keepContainers); + applicationTagsMethod.invoke(appContext, applicationTags); } else { LOG.debug("{} does not support method {}. Doing nothing.", - appContext.getClass().getCanonicalName(), keepContainersMethodName); + appContext.getClass().getCanonicalName(), + APPLICATION_TAGS_METHOD_NAME); } } @@ -1107,7 +1151,21 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } else { LOG.debug("{} does not support method {}. Doing nothing.", appContext.getClass().getCanonicalName(), - attemptsFailuresValidityIntervalMethodName); + ATTEMPT_FAILURES_METHOD_NAME); + } + } + + public void setKeepContainersAcrossApplicationAttempts( + ApplicationSubmissionContext appContext, + boolean keepContainers) throws InvocationTargetException, IllegalAccessException { + + if (keepContainersMethod != null) { + LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(), + appContext.getClass().getCanonicalName()); + keepContainersMethod.invoke(appContext, keepContainers); + } else { + LOG.debug("{} does not support method {}. Doing nothing.", + appContext.getClass().getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME); } } }