This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f840082144c8a340a1ce0a820d268510e2853a1
Author: Gary Yao <g...@apache.org>
AuthorDate: Sun Mar 3 19:12:54 2019 +0100

    [FLINK-11781][yarn] Remove "DISABLED" as possible value for 
yarn.per-job-cluster.include-user-jar
    
    Remove this feature because it is broken since Flink 1.5
    
    This closes #7883.
---
 .../generated/yarn_config_configuration.html       |  2 +-
 .../apache/flink/configuration/Configuration.java  | 30 ++++++++++++
 .../configuration/DelegatingConfiguration.java     |  5 ++
 .../flink/configuration/ConfigurationTest.java     | 54 ++++++++++++++++++++++
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 40 ++++++++--------
 .../yarn/configuration/YarnConfigOptions.java      |  4 +-
 .../flink/yarn/YarnClusterDescriptorTest.java      | 23 +++++++++
 7 files changed, 132 insertions(+), 26 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html 
b/docs/_includes/generated/yarn_config_configuration.html
index bbe2549..ab7e224 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -45,7 +45,7 @@
         <tr>
             <td><h5>yarn.per-job-cluster.include-user-jar</h5></td>
             <td style="word-wrap: break-word;">"ORDER"</td>
-            <td>Defines whether user-jars are included in the system class 
path for per-job-clusters as well as their positioning in the path. They can be 
positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned 
based on their name ("ORDER"). Setting this parameter to "DISABLED" causes the 
jar to be included in the user class path instead.</td>
+            <td>Defines whether user-jars are included in the system class 
path for per-job-clusters as well as their positioning in the path. They can be 
positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned 
based on their name ("ORDER").</td>
         </tr>
         <tr>
             <td><h5>yarn.properties-file.location</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 07c0290..186cf10 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -33,10 +33,13 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Lightweight configuration object which stores key/value pairs.
  */
@@ -608,6 +611,33 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                return o == null ? null : o.toString();
        }
 
+       /**
+        * Returns the value associated with the given config option as an enum.
+        *
+        * @param enumClass    The return enum class
+        * @param configOption The configuration option
+        * @throws IllegalArgumentException If the string associated with the 
given config option cannot
+        *                                  be parsed as a value of the 
provided enum class.
+        */
+       @PublicEvolving
+       public <T extends Enum<T>> T getEnum(
+                       final Class<T> enumClass,
+                       final ConfigOption<String> configOption) {
+               checkNotNull(enumClass, "enumClass must not be null");
+               checkNotNull(configOption, "configOption must not be null");
+
+               final String configValue = getString(configOption);
+               try {
+                       return Enum.valueOf(enumClass, 
configValue.toUpperCase(Locale.ROOT));
+               } catch (final IllegalArgumentException | NullPointerException 
e) {
+                       final String errorMessage = String.format("Value for 
config option %s must be one of %s (was %s)",
+                               configOption.key(),
+                               Arrays.toString(enumClass.getEnumConstants()),
+                               configValue);
+                       throw new IllegalArgumentException(errorMessage, e);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 0a0a777..b0249a0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -245,6 +245,11 @@ public final class DelegatingConfiguration extends 
Configuration {
        }
 
        @Override
+       public <T extends Enum<T>> T getEnum(final Class<T> enumClass, final 
ConfigOption<String> configOption) {
+               return this.backingConfig.getEnum(enumClass, 
prefixOption(configOption, prefix));
+       }
+
+       @Override
        public void addAllToProperties(Properties props) {
                // only add keys with our prefix
                synchronized (backingConfig.confData) {
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index f02d59e..9727b44 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -23,10 +23,12 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -403,4 +405,56 @@ public class ConfigurationTest extends TestLogger {
                assertEquals("Wrong expectation about size", 
cfg.keySet().size(), 0);
                assertFalse("Expected 'unexistedOption' is not removed", 
cfg.removeConfig(unexistedOption));
        }
+
+       @Test
+       public void testShouldParseValidStringToEnum() {
+               final ConfigOption<String> configOption = 
createStringConfigOption();
+
+               final Configuration configuration = new Configuration();
+               configuration.setString(configOption.key(), 
TestEnum.VALUE1.toString());
+
+               final TestEnum parsedEnumValue = 
configuration.getEnum(TestEnum.class, configOption);
+               assertEquals(TestEnum.VALUE1, parsedEnumValue);
+       }
+
+       @Test
+       public void testShouldParseValidStringToEnumIgnoringCase() {
+               final ConfigOption<String> configOption = 
createStringConfigOption();
+
+               final Configuration configuration = new Configuration();
+               configuration.setString(configOption.key(), 
TestEnum.VALUE1.toString().toLowerCase());
+
+               final TestEnum parsedEnumValue = 
configuration.getEnum(TestEnum.class, configOption);
+               assertEquals(TestEnum.VALUE1, parsedEnumValue);
+       }
+
+       @Test
+       public void testThrowsExceptionIfTryingToParseInvalidStringForEnum() {
+               final ConfigOption<String> configOption = 
createStringConfigOption();
+
+               final Configuration configuration = new Configuration();
+               final String invalidValueForTestEnum = 
"InvalidValueForTestEnum";
+               configuration.setString(configOption.key(), 
invalidValueForTestEnum);
+
+               try {
+                       configuration.getEnum(TestEnum.class, configOption);
+                       fail("Expected exception not thrown");
+               } catch (IllegalArgumentException e) {
+                       final String expectedMessage = "Value for config option 
" +
+                               configOption.key() + " must be one of [VALUE1, 
VALUE2] (was " +
+                               invalidValueForTestEnum + ")";
+                       assertThat(e.getMessage(), 
containsString(expectedMessage));
+               }
+       }
+
+       enum TestEnum {
+               VALUE1,
+               VALUE2
+       }
+
+       private static ConfigOption<String> createStringConfigOption() {
+               return ConfigOptions
+                       .key("test-string-key")
+                       .noDefaultValue();
+       }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index a87772f..783a459 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -763,19 +763,14 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        localResources,
                        envShipFileList);
 
-               List<String> userClassPaths;
-               if (userJarInclusion != 
YarnConfigOptions.UserJarInclusion.DISABLED) {
-                       userClassPaths = uploadAndRegisterFiles(
-                               userJarFiles,
-                               fs,
-                               homeDir,
-                               appId,
-                               paths,
-                               localResources,
-                               envShipFileList);
-               } else {
-                       userClassPaths = Collections.emptyList();
-               }
+               final List<String> userClassPaths = uploadAndRegisterFiles(
+                       userJarFiles,
+                       fs,
+                       homeDir,
+                       appId,
+                       paths,
+                       localResources,
+                       envShipFileList);
 
                if (userJarInclusion == 
YarnConfigOptions.UserJarInclusion.ORDER) {
                        systemClassPaths.addAll(userClassPaths);
@@ -1602,15 +1597,16 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
        }
 
        private static YarnConfigOptions.UserJarInclusion 
getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) {
-               String configuredUserJarInclusion = 
config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
-               try {
-                       return 
YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase());
-               } catch (IllegalArgumentException e) {
-                       LOG.warn("Configuration parameter {} was configured 
with an invalid value {}. Falling back to default ({}).",
-                               
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
-                               configuredUserJarInclusion,
-                               
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
-                       return 
YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
+               
throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(config);
+
+               return config.getEnum(YarnConfigOptions.UserJarInclusion.class, 
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
+       }
+
+       private static void 
throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration 
config) {
+               final String userJarInclusion = 
config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
+               if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
+                       throw new 
IllegalArgumentException(String.format("Config option %s cannot be set to 
DISABLED anymore (see FLINK-11781)",
+                               
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
                }
        }
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index b059475..6abeb0d 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -57,8 +57,7 @@ public class YarnConfigOptions {
                        .defaultValue("ORDER")
                        .withDescription("Defines whether user-jars are 
included in the system class path for per-job-clusters as" +
                                " well as their positioning in the path. They 
can be positioned at the beginning (\"FIRST\"), at the" +
-                               " end (\"LAST\"), or be positioned based on 
their name (\"ORDER\"). Setting this parameter to" +
-                               " \"DISABLED\" causes the jar to be included in 
the user class path instead.");
+                               " end (\"LAST\"), or be positioned based on 
their name (\"ORDER\").");
 
        /**
         * The vcores exposed by YARN.
@@ -156,7 +155,6 @@ public class YarnConfigOptions {
 
        /** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */
        public enum UserJarInclusion {
-               DISABLED,
                FIRST,
                LAST,
                ORDER
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 16b3586..6c2e42e 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -53,7 +53,9 @@ import java.util.Map;
 import java.util.Set;
 
 import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
@@ -89,6 +91,27 @@ public class YarnClusterDescriptorTest extends TestLogger {
                yarnClient.stop();
        }
 
+       /**
+        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-11781";>FLINK-11781</a>
+        */
+       @Test
+       public void 
testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() {
+               final Configuration configuration = new Configuration();
+               
configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, 
"DISABLED");
+
+               try {
+                       new YarnClusterDescriptor(
+                               configuration,
+                               yarnConfiguration,
+                               temporaryFolder.getRoot().getAbsolutePath(),
+                               yarnClient,
+                               true);
+                       fail("Expected exception not thrown");
+               } catch (final IllegalArgumentException e) {
+                       assertThat(e.getMessage(), containsString("cannot be 
set to DISABLED anymore"));
+               }
+       }
+
        @Test
        public void testFailIfTaskSlotsHigherThanMaxVcores() throws 
ClusterDeploymentException {
                final Configuration flinkConfiguration = new Configuration();

Reply via email to