[FLINK-5153] Support YARN application tags

Adds a new config option `yarn.tags`, a comma-separated list of strings
passed to YARN as application tags.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c116e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c116e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c116e5

Branch: refs/heads/master
Commit: d9c116e542889b6ad00485529b97652dc2d59cad
Parents: e24a866
Author: Patrick Lucas <m...@patricklucas.com>
Authored: Fri Feb 3 19:17:55 2017 -0500
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Feb 8 18:39:47 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../flink/configuration/ConfigConstants.java    |   5 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 114 ++++++++++++++-----
 3 files changed, 93 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 2872cfa..2accdc2 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -431,6 +431,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` 
variable in the String
 
   For example when running Flink on YARN on an environment with a restrictive 
firewall, this option allows specifying a range of allowed ports.
 
+- `yarn.tags` A comma-separated list of tags to apply to the Flink YARN 
application.
+
 ### Mesos
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 14ba9dd..c608fde 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -442,6 +442,11 @@ public final class ConfigConstants {
         */
        public static final String YARN_APPLICATION_MASTER_PORT = 
"yarn.application-master.port";
 
+       /**
+        * A comma-separated list of strings to use as YARN application tags.
+        */
+       public static final String YARN_APPLICATION_TAGS = "yarn.tags";
+
 
        // ------------------------ Mesos Configuration ------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b537e09..21599c1 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -855,7 +855,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        name = customName;
                }
 
-               appContext.setApplicationName(name); // application name
+               appContext.setApplicationName(name);
                appContext.setApplicationType("Apache Flink");
                appContext.setAMContainerSpec(amContainer);
                appContext.setResource(capability);
@@ -863,6 +863,8 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        appContext.setQueue(yarnQueue);
                }
 
+               setApplicationTags(appContext);
+
                // add a hook to clean up in case deployment fails
                Thread deploymentFailureHook = new 
DeploymentFailureHook(yarnClient, yarnApplication);
                Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
@@ -1024,75 +1026,117 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                customName = name;
        }
 
-       private void 
activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws 
InvocationTargetException, IllegalAccessException {
+       private void 
activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
+               InvocationTargetException, IllegalAccessException {
+
                ApplicationSubmissionContextReflector reflector = 
ApplicationSubmissionContextReflector.getInstance();
 
                
reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
                reflector.setAttemptFailuresValidityInterval(appContext, 
AkkaUtils.getTimeout(flinkConfiguration).toMillis());
        }
 
+       private void setApplicationTags(final ApplicationSubmissionContext 
appContext) throws InvocationTargetException,
+               IllegalAccessException {
+
+               final ApplicationSubmissionContextReflector reflector = 
ApplicationSubmissionContextReflector.getInstance();
+               final String tagsString = 
flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+
+               final Set<String> applicationTags = new HashSet<>();
+
+               // Trim whitespace and cull empty tags
+               for (final String tag : tagsString.split(",")) {
+                       final String trimmedTag = tag.trim();
+                       if (!trimmedTag.isEmpty()) {
+                               applicationTags.add(trimmedTag);
+                       }
+               }
+
+               reflector.setApplicationTags(appContext, applicationTags);
+       }
+
        /**
         * Singleton object which uses reflection to determine whether the 
{@link ApplicationSubmissionContext}
-        * supports the setKeepContainersAcrossApplicationAttempts and the 
setAttemptFailuresValidityInterval
-        * methods. Depending on the Hadoop version these methods are supported 
or not. If the methods
-        * are not supported, then nothing happens when 
setKeepContainersAcrossApplicationAttempts or
-        * setAttemptFailuresValidityInterval are called.
+        * supports various methods which, depending on the Hadoop version, may 
or may not be supported.
+        *
+        * If an unsupported method is invoked, nothing happens.
+        *
+        * Currently three methods are proxied:
+        * - setApplicationTags (>= 2.4.0)
+        * - setAttemptFailuresValidityInterval (>= 2.6.0)
+        * - setKeepContainersAcrossApplicationAttempts (>= 2.4.0)
         */
        private static class ApplicationSubmissionContextReflector {
                private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
 
-               private static final ApplicationSubmissionContextReflector 
instance = new 
ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+               private static final ApplicationSubmissionContextReflector 
instance =
+                       new 
ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
 
                public static ApplicationSubmissionContextReflector 
getInstance() {
                        return instance;
                }
 
-               private static final String keepContainersMethodName = 
"setKeepContainersAcrossApplicationAttempts";
-               private static final String 
attemptsFailuresValidityIntervalMethodName = 
"setAttemptFailuresValidityInterval";
+               private static final String APPLICATION_TAGS_METHOD_NAME = 
"setApplicationTags";
+               private static final String ATTEMPT_FAILURES_METHOD_NAME = 
"setAttemptFailuresValidityInterval";
+               private static final String KEEP_CONTAINERS_METHOD_NAME = 
"setKeepContainersAcrossApplicationAttempts";
 
-               private final Method keepContainersMethod;
+               private final Method applicationTagsMethod;
                private final Method attemptFailuresValidityIntervalMethod;
+               private final Method keepContainersMethod;
 
                private 
ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> 
clazz) {
-                       Method keepContainersMethod;
+                       Method applicationTagsMethod;
                        Method attemptFailuresValidityIntervalMethod;
+                       Method keepContainersMethod;
 
                        try {
                                // this method is only supported by Hadoop 
2.4.0 onwards
-                               keepContainersMethod = 
clazz.getMethod(keepContainersMethodName, boolean.class);
-                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), keepContainersMethodName);
+                               applicationTagsMethod = 
clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
+                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
                        } catch (NoSuchMethodException e) {
-                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), keepContainersMethodName);
+                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
                                // assign null because the Hadoop version 
apparently does not support this call.
-                               keepContainersMethod = null;
+                               applicationTagsMethod = null;
                        }
 
-                       this.keepContainersMethod = keepContainersMethod;
+                       this.applicationTagsMethod = applicationTagsMethod;
 
                        try {
                                // this method is only supported by Hadoop 
2.6.0 onwards
-                               attemptFailuresValidityIntervalMethod = 
clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
-                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+                               attemptFailuresValidityIntervalMethod = 
clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class);
+                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
                        } catch (NoSuchMethodException e) {
-                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
                                // assign null because the Hadoop version 
apparently does not support this call.
                                attemptFailuresValidityIntervalMethod = null;
                        }
 
                        this.attemptFailuresValidityIntervalMethod = 
attemptFailuresValidityIntervalMethod;
-               }
 
-               public void setKeepContainersAcrossApplicationAttempts(
-                               ApplicationSubmissionContext appContext,
-                               boolean keepContainers) throws 
InvocationTargetException, IllegalAccessException {
+                       try {
+                               // this method is only supported by Hadoop 
2.4.0 onwards
+                               keepContainersMethod = 
clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class);
+                               LOG.debug("{} supports method {}.", 
clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+                       } catch (NoSuchMethodException e) {
+                               LOG.debug("{} does not support method {}.", 
clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+                               // assign null because the Hadoop version 
apparently does not support this call.
+                               keepContainersMethod = null;
+                       }
 
-                       if (keepContainersMethod != null) {
-                               LOG.debug("Calling method {} of {}.", 
keepContainersMethod.getName(),
+                       this.keepContainersMethod = keepContainersMethod;
+               }
+
+               public void setApplicationTags(
+                       ApplicationSubmissionContext appContext,
+                       Set<String> applicationTags) throws 
InvocationTargetException, IllegalAccessException {
+                       if (applicationTagsMethod != null) {
+                               LOG.debug("Calling method {} of {}.",
+                                       applicationTagsMethod.getName(),
                                        
appContext.getClass().getCanonicalName());
-                               keepContainersMethod.invoke(appContext, 
keepContainers);
+                               applicationTagsMethod.invoke(appContext, 
applicationTags);
                        } else {
                                LOG.debug("{} does not support method {}. Doing 
nothing.",
-                                       
appContext.getClass().getCanonicalName(), keepContainersMethodName);
+                                       
appContext.getClass().getCanonicalName(),
+                                       APPLICATION_TAGS_METHOD_NAME);
                        }
                }
 
@@ -1107,7 +1151,21 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        } else {
                                LOG.debug("{} does not support method {}. Doing 
nothing.",
                                        
appContext.getClass().getCanonicalName(),
-                                       
attemptsFailuresValidityIntervalMethodName);
+                                       ATTEMPT_FAILURES_METHOD_NAME);
+                       }
+               }
+
+               public void setKeepContainersAcrossApplicationAttempts(
+                       ApplicationSubmissionContext appContext,
+                       boolean keepContainers) throws 
InvocationTargetException, IllegalAccessException {
+
+                       if (keepContainersMethod != null) {
+                               LOG.debug("Calling method {} of {}.", 
keepContainersMethod.getName(),
+                                       
appContext.getClass().getCanonicalName());
+                               keepContainersMethod.invoke(appContext, 
keepContainers);
+                       } else {
+                               LOG.debug("{} does not support method {}. Doing 
nothing.",
+                                       
appContext.getClass().getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
                        }
                }
        }

Reply via email to