This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f840082144c8a340a1ce0a820d268510e2853a1 Author: Gary Yao <g...@apache.org> AuthorDate: Sun Mar 3 19:12:54 2019 +0100 [FLINK-11781][yarn] Remove "DISABLED" as possible value for yarn.per-job-cluster.include-user-jar Remove this feature because it is broken since Flink 1.5 This closes #7883. --- .../generated/yarn_config_configuration.html | 2 +- .../apache/flink/configuration/Configuration.java | 30 ++++++++++++ .../configuration/DelegatingConfiguration.java | 5 ++ .../flink/configuration/ConfigurationTest.java | 54 ++++++++++++++++++++++ .../flink/yarn/AbstractYarnClusterDescriptor.java | 40 ++++++++-------- .../yarn/configuration/YarnConfigOptions.java | 4 +- .../flink/yarn/YarnClusterDescriptorTest.java | 23 +++++++++ 7 files changed, 132 insertions(+), 26 deletions(-) diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html index bbe2549..ab7e224 100644 --- a/docs/_includes/generated/yarn_config_configuration.html +++ b/docs/_includes/generated/yarn_config_configuration.html @@ -45,7 +45,7 @@ <tr> <td><h5>yarn.per-job-cluster.include-user-jar</h5></td> <td style="word-wrap: break-word;">"ORDER"</td> - <td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). Setting this parameter to "DISABLED" causes the jar to be included in the user class path instead.</td> + <td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER").</td> </tr> <tr> <td><h5>yarn.properties-file.location</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index 07c0290..186cf10 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -33,10 +33,13 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Lightweight configuration object which stores key/value pairs. */ @@ -608,6 +611,33 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters return o == null ? null : o.toString(); } + /** + * Returns the value associated with the given config option as an enum. + * + * @param enumClass The return enum class + * @param configOption The configuration option + * @throws IllegalArgumentException If the string associated with the given config option cannot + * be parsed as a value of the provided enum class. + */ + @PublicEvolving + public <T extends Enum<T>> T getEnum( + final Class<T> enumClass, + final ConfigOption<String> configOption) { + checkNotNull(enumClass, "enumClass must not be null"); + checkNotNull(configOption, "configOption must not be null"); + + final String configValue = getString(configOption); + try { + return Enum.valueOf(enumClass, configValue.toUpperCase(Locale.ROOT)); + } catch (final IllegalArgumentException | NullPointerException e) { + final String errorMessage = String.format("Value for config option %s must be one of %s (was %s)", + configOption.key(), + Arrays.toString(enumClass.getEnumConstants()), + configValue); + throw new IllegalArgumentException(errorMessage, e); + } + } + // -------------------------------------------------------------------------------------------- /** diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java index 0a0a777..b0249a0 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -245,6 +245,11 @@ public final class DelegatingConfiguration extends Configuration { } @Override + public <T extends Enum<T>> T getEnum(final Class<T> enumClass, final ConfigOption<String> configOption) { + return this.backingConfig.getEnum(enumClass, prefixOption(configOption, prefix)); + } + + @Override public void addAllToProperties(Properties props) { // only add keys with our prefix synchronized (backingConfig.confData) { diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index f02d59e..9727b44 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -23,10 +23,12 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -403,4 +405,56 @@ public class ConfigurationTest extends TestLogger { assertEquals("Wrong expectation about size", cfg.keySet().size(), 0); assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption)); } + + @Test + public void testShouldParseValidStringToEnum() { + final ConfigOption<String> configOption = createStringConfigOption(); + + final Configuration configuration = new Configuration(); + configuration.setString(configOption.key(), TestEnum.VALUE1.toString()); + + final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption); + assertEquals(TestEnum.VALUE1, parsedEnumValue); + } + + @Test + public void testShouldParseValidStringToEnumIgnoringCase() { + final ConfigOption<String> configOption = createStringConfigOption(); + + final Configuration configuration = new Configuration(); + configuration.setString(configOption.key(), TestEnum.VALUE1.toString().toLowerCase()); + + final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption); + assertEquals(TestEnum.VALUE1, parsedEnumValue); + } + + @Test + public void testThrowsExceptionIfTryingToParseInvalidStringForEnum() { + final ConfigOption<String> configOption = createStringConfigOption(); + + final Configuration configuration = new Configuration(); + final String invalidValueForTestEnum = "InvalidValueForTestEnum"; + configuration.setString(configOption.key(), invalidValueForTestEnum); + + try { + configuration.getEnum(TestEnum.class, configOption); + fail("Expected exception not thrown"); + } catch (IllegalArgumentException e) { + final String expectedMessage = "Value for config option " + + configOption.key() + " must be one of [VALUE1, VALUE2] (was " + + invalidValueForTestEnum + ")"; + assertThat(e.getMessage(), containsString(expectedMessage)); + } + } + + enum TestEnum { + VALUE1, + VALUE2 + } + + private static ConfigOption<String> createStringConfigOption() { + return ConfigOptions + .key("test-string-key") + .noDefaultValue(); + } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index a87772f..783a459 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -763,19 +763,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor localResources, envShipFileList); - List<String> userClassPaths; - if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) { - userClassPaths = uploadAndRegisterFiles( - userJarFiles, - fs, - homeDir, - appId, - paths, - localResources, - envShipFileList); - } else { - userClassPaths = Collections.emptyList(); - } + final List<String> userClassPaths = uploadAndRegisterFiles( + userJarFiles, + fs, + homeDir, + appId, + paths, + localResources, + envShipFileList); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { systemClassPaths.addAll(userClassPaths); @@ -1602,15 +1597,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) { - String configuredUserJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); - try { - return YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase()); - } catch (IllegalArgumentException e) { - LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).", - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(), - configuredUserJarInclusion, - YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); - return YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue()); + throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(config); + + return config.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + } + + private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) { + final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR); + if ("DISABLED".equalsIgnoreCase(userJarInclusion)) { + throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)", + YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key())); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index b059475..6abeb0d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -57,8 +57,7 @@ public class YarnConfigOptions { .defaultValue("ORDER") .withDescription("Defines whether user-jars are included in the system class path for per-job-clusters as" + " well as their positioning in the path. They can be positioned at the beginning (\"FIRST\"), at the" + - " end (\"LAST\"), or be positioned based on their name (\"ORDER\"). Setting this parameter to" + - " \"DISABLED\" causes the jar to be included in the user class path instead."); + " end (\"LAST\"), or be positioned based on their name (\"ORDER\")."); /** * The vcores exposed by YARN. @@ -156,7 +155,6 @@ public class YarnConfigOptions { /** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */ public enum UserJarInclusion { - DISABLED, FIRST, LAST, ORDER diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 16b3586..6c2e42e 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -53,7 +53,9 @@ import java.util.Map; import java.util.Set; import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; /** @@ -89,6 +91,27 @@ public class YarnClusterDescriptorTest extends TestLogger { yarnClient.stop(); } + /** + * @see <a href="https://issues.apache.org/jira/browse/FLINK-11781">FLINK-11781</a> + */ + @Test + public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() { + final Configuration configuration = new Configuration(); + configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED"); + + try { + new YarnClusterDescriptor( + configuration, + yarnConfiguration, + temporaryFolder.getRoot().getAbsolutePath(), + yarnClient, + true); + fail("Expected exception not thrown"); + } catch (final IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore")); + } + } + @Test public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException { final Configuration flinkConfiguration = new Configuration();