[FLINK-4764] [core] Introduce Config Options This is a more concise and maintainable way to define configuration keys, default values, deprecated keys, etc.
This closes #2605 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d71a09cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d71a09cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d71a09cc Branch: refs/heads/flip-6 Commit: d71a09cc2a36a877e8287db8d9fe84134a4901ba Parents: 05436f4 Author: Stephan Ewen <se...@apache.org> Authored: Fri Oct 7 15:24:44 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Oct 13 16:25:49 2016 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigOption.java | 171 ++++++++ .../flink/configuration/ConfigOptions.java | 116 ++++++ .../flink/configuration/Configuration.java | 407 +++++++++++++++---- .../configuration/DelegatingConfiguration.java | 118 +++++- .../flink/configuration/ConfigurationTest.java | 95 ++++- .../DelegatingConfigurationTest.java | 55 +-- .../UnmodifiableConfigurationTest.java | 16 +- 7 files changed, 844 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java new file mode 100644 index 0000000..3531f6d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@code ConfigOption} describes a configuration parameter. It encapsulates + * the configuration key, deprecated older versions of the key, and an optional + * default value for the configuration parameter. + * + * <p>{@code ConfigOptions} are built via the {@link ConfigOptions} class. + * Once created, a config option is immutable. + * + * @param <T> The type of value associated with the configuration option. + */ +@PublicEvolving +public class ConfigOption<T> { + + private static final String[] EMPTY = new String[0]; + + // ------------------------------------------------------------------------ + + /** The current key for that config option */ + private final String key; + + /** The list of deprecated keys, in the order to be checked */ + private final String[] deprecatedKeys; + + /** The default value for this option */ + private final T defaultValue; + + // ------------------------------------------------------------------------ + + /** + * Creates a new config option with no deprecated keys. + * + * @param key The current key for that config option + * @param defaultValue The default value for this option + */ + ConfigOption(String key, T defaultValue) { + this.key = checkNotNull(key); + this.defaultValue = defaultValue; + this.deprecatedKeys = EMPTY; + } + + /** + * Creates a new config option with deprecated keys. + * + * @param key The current key for that config option + * @param defaultValue The default value for this option + * @param deprecatedKeys The list of deprecated keys, in the order to be checked + */ + ConfigOption(String key, T defaultValue, String... deprecatedKeys) { + this.key = checkNotNull(key); + this.defaultValue = defaultValue; + this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys; + } + + // ------------------------------------------------------------------------ + + /** + * Creates a new config option, using this option's key and default value, and + * adding the given deprecated keys. + * + * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)}, + * the deprecated keys will be checked in the order provided to this method. The first key for which + * a value is found will be used - that value will be returned. + * + * @param deprecatedKeys The deprecated keys, in the order in which they should be checked. + * @return A new config options, with the given deprecated keys. + */ + public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) { + return new ConfigOption<>(key, defaultValue, deprecatedKeys); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the configuration key. + * @return The configuration key + */ + public String key() { + return key; + } + + /** + * Checks if this option has a default value. + * @return True if it has a default value, false if not. + */ + public boolean hasDefaultValue() { + return defaultValue != null; + } + + /** + * Returns the default value, or null, if there is no default value. + * @return The default value, or null. + */ + public T defaultValue() { + return defaultValue; + } + + /** + * Checks whether this option has deprecated keys. + * @return True if the option has deprecated keys, false if not. + */ + public boolean hasDeprecatedKeys() { + return deprecatedKeys != EMPTY; + } + + /** + * Gets the deprecated keys, in the order to be checked. + * @return The option's deprecated keys. + */ + public Iterable<String> deprecatedKeys() { + return deprecatedKeys == EMPTY ? Collections.<String>emptyList() : Arrays.asList(deprecatedKeys); + } + + // ------------------------------------------------------------------------ + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + else if (o != null && o.getClass() == ConfigOption.class) { + ConfigOption<?> that = (ConfigOption<?>) o; + return this.key.equals(that.key) && + Arrays.equals(this.deprecatedKeys, that.deprecatedKeys) && + (this.defaultValue == null ? that.defaultValue == null : + (that.defaultValue != null && this.defaultValue.equals(that.defaultValue))); + } + else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * key.hashCode() + + 17 * Arrays.hashCode(deprecatedKeys) + + (defaultValue != null ? defaultValue.hashCode() : 0); + } + + @Override + public String toString() { + return String.format("Key: '%s' , default: %s (deprecated keys: %s)", + key, defaultValue, Arrays.toString(deprecatedKeys)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java new file mode 100644 index 0000000..f87da0a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code ConfigOptions} are used to build a {@link ConfigOption}. + * The option is typically built in one of the following pattern: + * + * <pre>{@code + * // simple string-valued option with a default value + * ConfigOption<String> tempDirs = ConfigOptions + * .key("tmp.dir") + * .defaultValue("/tmp"); + * + * // simple integer-valued option with a default value + * ConfigOption<Integer> parallelism = ConfigOptions + * .key("application.parallelism") + * .defaultValue(100); + * + * // option with no default value + * ConfigOption<String> userName = ConfigOptions + * .key("user.name") + * .noDefaultValue(); + * + * // option with deprecated keys to check + * ConfigOption<Double> threshold = ConfigOptions + * .key("cpu.utilization.threshold") + * .defaultValue(0.9). + * .withDeprecatedKeys("cpu.threshold"); + * }</pre> + */ +@PublicEvolving +public class ConfigOptions { + + /** + * Starts building a new {@link ConfigOption}. + * + * @param key The key for the config option. + * @return The builder for the config option with the given key. + */ + public static OptionBuilder key(String key) { + checkNotNull(key); + return new OptionBuilder(key); + } + + // ------------------------------------------------------------------------ + + /** + * The option builder is used to create a {@link ConfigOption}. + * It is instantiated via {@link ConfigOptions#key(String)}. + */ + public static final class OptionBuilder { + + /** The key for the config option */ + private final String key; + + /** + * Creates a new OptionBuilder. + * @param key The key for the config option + */ + OptionBuilder(String key) { + this.key = key; + } + + /** + * Creates a ConfigOption with the given default value. + * + * <p>This method does not accept "null". For options with no default value, choose + * one of the {@code noDefaultValue} methods. + * + * @param value The default value for the config option + * @param <T> The type of the default value. + * @return The config option with the default value. + */ + public <T> ConfigOption<T> defaultValue(T value) { + checkNotNull(value); + return new ConfigOption<T>(key, value); + } + + /** + * Creates a string-valued option with no default value. + * String-valued options are the only ones that can have no + * default value. + * + * @return The created ConfigOption. + */ + public ConfigOption<String> noDefaultValue() { + return new ConfigOption<>(key, null); + } + } + + // ------------------------------------------------------------------------ + + /** Not intended to be instantiated */ + private ConfigOptions() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index 8ca5d07..f15c669 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -27,6 +27,7 @@ import java.util.Properties; import java.util.Set; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; @@ -134,7 +135,33 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters return o.toString(); } } - + + /** + * Returns the value associated with the given config option as a string. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public String getString(ConfigOption<String> configOption) { + Object o = getValueOrDefaultFromOption(configOption); + return o == null ? null : o.toString(); + } + + /** + * Returns the value associated with the given config option as a string. + * If no value is mapped under any key of the option, it returns the specified + * default instead of the option's default value. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public String getString(ConfigOption<String> configOption, String overrideDefault) { + Object o = getRawValueFromOption(configOption); + return o == null ? overrideDefault : o.toString(); + } + /** * Adds the given key/value pair to the configuration object. * @@ -148,6 +175,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters } /** + * Adds the given value to the configuration object. + * The main key of the config option will be used to map the value. + * + * @param key + * the option specifying the key to be added + * @param value + * the value of the key/value pair to be added + */ + @PublicEvolving + public void setString(ConfigOption<String> key, String value) { + setValueInternal(key.key(), value); + } + + /** * Returns the value associated with the given key as an integer. * * @param key @@ -161,28 +202,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters if (o == null) { return defaultValue; } - - if (o.getClass() == Integer.class) { - return (Integer) o; - } - else if (o.getClass() == Long.class) { - long value = (Long) o; - if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) { - return (int) value; - } else { - LOG.warn("Configuration value {} overflows/underflows the integer type.", value); - return defaultValue; - } - } - else { - try { - return Integer.parseInt(o.toString()); - } - catch (NumberFormatException e) { - LOG.warn("Configuration cannot evaluate value {} as an integer number", o); - return defaultValue; - } - } + + return convertToInt(o, defaultValue); + } + + /** + * Returns the value associated with the given config option as an integer. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public int getInteger(ConfigOption<Integer> configOption) { + Object o = getValueOrDefaultFromOption(configOption); + return convertToInt(o, configOption.defaultValue()); } /** @@ -198,6 +231,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters } /** + * Adds the given value to the configuration object. + * The main key of the config option will be used to map the value. + * + * @param key + * the option specifying the key to be added + * @param value + * the value of the key/value pair to be added + */ + @PublicEvolving + public void setInteger(ConfigOption<Integer> key, int value) { + setValueInternal(key.key(), value); + } + + /** * Returns the value associated with the given key as a long. * * @param key @@ -211,22 +258,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters if (o == null) { return defaultValue; } - - if (o.getClass() == Long.class) { - return (Long) o; - } - else if (o.getClass() == Integer.class) { - return ((Integer) o).longValue(); - } - else { - try { - return Long.parseLong(o.toString()); - } - catch (NumberFormatException e) { - LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number"); - return defaultValue; - } - } + + return convertToLong(o, defaultValue); + } + + /** + * Returns the value associated with the given config option as a long integer. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public long getLong(ConfigOption<Long> configOption) { + Object o = getValueOrDefaultFromOption(configOption); + return convertToLong(o, configOption.defaultValue()); } /** @@ -242,6 +287,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters } /** + * Adds the given value to the configuration object. + * The main key of the config option will be used to map the value. + * + * @param key + * the option specifying the key to be added + * @param value + * the value of the key/value pair to be added + */ + @PublicEvolving + public void setLong(ConfigOption<Long> key, long value) { + setValueInternal(key.key(), value); + } + + /** * Returns the value associated with the given key as a boolean. * * @param key @@ -255,13 +314,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters if (o == null) { return defaultValue; } - - if (o.getClass() == Boolean.class) { - return (Boolean) o; - } - else { - return Boolean.parseBoolean(o.toString()); - } + + return convertToBoolean(o); + } + + /** + * Returns the value associated with the given config option as a boolean. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public boolean getBoolean(ConfigOption<Boolean> configOption) { + Object o = getValueOrDefaultFromOption(configOption); + return convertToBoolean(o); } /** @@ -277,6 +343,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters } /** + * Adds the given value to the configuration object. + * The main key of the config option will be used to map the value. + * + * @param key + * the option specifying the key to be added + * @param value + * the value of the key/value pair to be added + */ + @PublicEvolving + public void setBoolean(ConfigOption<Boolean> key, boolean value) { + setValueInternal(key.key(), value); + } + + /** * Returns the value associated with the given key as a float. * * @param key @@ -290,28 +370,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters if (o == null) { return defaultValue; } - - if (o.getClass() == Float.class) { - return (Float) o; - } - else if (o.getClass() == Double.class) { - double value = ((Double) o); - if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) { - return (float) value; - } else { - LOG.warn("Configuration value {} overflows/underflows the float type.", value); - return defaultValue; - } - } - else { - try { - return Float.parseFloat(o.toString()); - } - catch (NumberFormatException e) { - LOG.warn("Configuration cannot evaluate value {} as a float value", o); - return defaultValue; - } - } + + return convertToFloat(o, defaultValue); + } + + /** + * Returns the value associated with the given config option as a float. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public float getFloat(ConfigOption<Float> configOption) { + Object o = getValueOrDefaultFromOption(configOption); + return convertToFloat(o, configOption.defaultValue()); } /** @@ -325,7 +397,21 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters public void setFloat(String key, float value) { setValueInternal(key, value); } - + + /** + * Adds the given value to the configuration object. + * The main key of the config option will be used to map the value. + * + * @param key + * the option specifying the key to be added + * @param value + * the value of the key/value pair to be added + */ + @PublicEvolving + public void setFloat(ConfigOption<Float> key, float value) { + setValueInternal(key.key(), value); + } + /** * Returns the value associated with the given key as a double. * @@ -340,22 +426,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters if (o == null) { return defaultValue; } - - if (o.getClass() == Double.class) { - return (Double) o; - } - else if (o.getClass() == Float.class) { - return ((Float) o).doubleValue(); - } - else { - try { - return Double.parseDouble(o.toString()); - } - catch (NumberFormatException e) { - LOG.warn("Configuration cannot evaluate value {} as a double value", o); - return defaultValue; - } - } + + return convertToDouble(o, defaultValue); + } + + /** + * Returns the value associated with the given config option as a {@code double}. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public double getDouble(ConfigOption<Double> configOption) { + Object o = getValueOrDefaultFromOption(configOption); + return convertToDouble(o, configOption.defaultValue()); } /** @@ -369,7 +453,21 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters public void setDouble(String key, double value) { setValueInternal(key, value); } - + + /** + * Adds the given value to the configuration object. + * The main key of the config option will be used to map the value. + * + * @param key + * the option specifying the key to be added + * @param value + * the value of the key/value pair to be added + */ + @PublicEvolving + public void setDouble(ConfigOption<Double> key, double value) { + setValueInternal(key.key(), value); + } + /** * Returns the value associated with the given key as a byte array. * @@ -407,6 +505,18 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters setValueInternal(key, bytes); } + /** + * Returns the value associated with the given config option as a string. + * + * @param configOption The configuration option + * @return the (default) value associated with the given config option + */ + @PublicEvolving + public String getValue(ConfigOption<?> configOption) { + Object o = getValueOrDefaultFromOption(configOption); + return o == null ? null : o.toString(); + } + // -------------------------------------------------------------------------------------------- /** @@ -523,7 +633,130 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters return this.confData.get(key); } } - + + private Object getRawValueFromOption(ConfigOption<?> configOption) { + // first try the current key + Object o = getRawValue(configOption.key()); + + if (o != null) { + return o; + } + else if (configOption.hasDeprecatedKeys()) { + for (String deprecatedKey : configOption.deprecatedKeys()) { + Object oo = getRawValue(deprecatedKey); + if (oo != null) { + return oo; + } + } + } + + return null; + } + + private Object getValueOrDefaultFromOption(ConfigOption<?> configOption) { + Object o = getRawValueFromOption(configOption); + return o != null ? o : configOption.defaultValue(); + } + + // -------------------------------------------------------------------------------------------- + // Type conversion + // -------------------------------------------------------------------------------------------- + + private int convertToInt(Object o, int defaultValue) { + if (o.getClass() == Integer.class) { + return (Integer) o; + } + else if (o.getClass() == Long.class) { + long value = (Long) o; + if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) { + return (int) value; + } else { + LOG.warn("Configuration value {} overflows/underflows the integer type.", value); + return defaultValue; + } + } + else { + try { + return Integer.parseInt(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value {} as an integer number", o); + return defaultValue; + } + } + } + + private long convertToLong(Object o, long defaultValue) { + if (o.getClass() == Long.class) { + return (Long) o; + } + else if (o.getClass() == Integer.class) { + return ((Integer) o).longValue(); + } + else { + try { + return Long.parseLong(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number"); + return defaultValue; + } + } + } + + private boolean convertToBoolean(Object o) { + if (o.getClass() == Boolean.class) { + return (Boolean) o; + } + else { + return Boolean.parseBoolean(o.toString()); + } + } + + private float convertToFloat(Object o, float defaultValue) { + if (o.getClass() == Float.class) { + return (Float) o; + } + else if (o.getClass() == Double.class) { + double value = ((Double) o); + if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) { + return (float) value; + } else { + LOG.warn("Configuration value {} overflows/underflows the float type.", value); + return defaultValue; + } + } + else { + try { + return Float.parseFloat(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value {} as a float value", o); + return defaultValue; + } + } + } + + private double convertToDouble(Object o, double defaultValue) { + if (o.getClass() == Double.class) { + return (Double) o; + } + else if (o.getClass() == Float.class) { + return ((Float) o).doubleValue(); + } + else { + try { + return Double.parseDouble(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value {} as a double value", o); + return defaultValue; + } + } + } + + // -------------------------------------------------------------------------------------------- + // Serialization // -------------------------------------------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java index dba77f3..bd9a962 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.configuration; import org.apache.flink.core.memory.DataInputView; @@ -22,7 +23,11 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -55,8 +60,7 @@ public final class DelegatingConfiguration extends Configuration { * @param backingConfig The configuration holding the actual config data. * @param prefix The prefix prepended to all config keys. */ - public DelegatingConfiguration(Configuration backingConfig, String prefix) - { + public DelegatingConfiguration(Configuration backingConfig, String prefix) { this.backingConfig = Preconditions.checkNotNull(backingConfig); this.prefix = prefix; } @@ -69,11 +73,26 @@ public final class DelegatingConfiguration extends Configuration { } @Override + public String getString(ConfigOption<String> configOption) { + return this.backingConfig.getString(prefixOption(configOption, prefix)); + } + + @Override + public String getString(ConfigOption<String> configOption, String overrideDefault) { + return this.backingConfig.getString(prefixOption(configOption, prefix), overrideDefault); + } + + @Override public void setString(String key, String value) { this.backingConfig.setString(this.prefix + key, value); } @Override + public void setString(ConfigOption<String> key, String value) { + this.backingConfig.setString(prefix + key.key(), value); + } + + @Override public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException { return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader); } @@ -89,51 +108,101 @@ public final class DelegatingConfiguration extends Configuration { } @Override + public int getInteger(ConfigOption<Integer> configOption) { + return this.backingConfig.getInteger(prefixOption(configOption, prefix)); + } + + @Override public void setInteger(String key, int value) { this.backingConfig.setInteger(this.prefix + key, value); } @Override + public void setInteger(ConfigOption<Integer> key, int value) { + this.backingConfig.setInteger(prefix + key.key(), value); + } + + @Override public long getLong(String key, long defaultValue) { return this.backingConfig.getLong(this.prefix + key, defaultValue); } @Override + public long getLong(ConfigOption<Long> configOption) { + return this.backingConfig.getLong(prefixOption(configOption, prefix)); + } + + @Override public void setLong(String key, long value) { this.backingConfig.setLong(this.prefix + key, value); } @Override + public void setLong(ConfigOption<Long> key, long value) { + this.backingConfig.setLong(prefix + key.key(), value); + } + + @Override public boolean getBoolean(String key, boolean defaultValue) { return this.backingConfig.getBoolean(this.prefix + key, defaultValue); } @Override + public boolean getBoolean(ConfigOption<Boolean> configOption) { + return this.backingConfig.getBoolean(prefixOption(configOption, prefix)); + } + + @Override public void setBoolean(String key, boolean value) { this.backingConfig.setBoolean(this.prefix + key, value); } @Override + public void setBoolean(ConfigOption<Boolean> key, boolean value) { + this.backingConfig.setBoolean(prefix + key.key(), value); + } + + @Override public float getFloat(String key, float defaultValue) { return this.backingConfig.getFloat(this.prefix + key, defaultValue); } @Override + public float getFloat(ConfigOption<Float> configOption) { + return this.backingConfig.getFloat(prefixOption(configOption, prefix)); + } + + @Override public void setFloat(String key, float value) { this.backingConfig.setFloat(this.prefix + key, value); } @Override + public void setFloat(ConfigOption<Float> key, float value) { + this.backingConfig.setFloat(prefix + key.key(), value); + } + + @Override public double getDouble(String key, double defaultValue) { return this.backingConfig.getDouble(this.prefix + key, defaultValue); } @Override + public double getDouble(ConfigOption<Double> configOption) { + return this.backingConfig.getDouble(prefixOption(configOption, prefix)); + } + + @Override public void setDouble(String key, double value) { this.backingConfig.setDouble(this.prefix + key, value); } @Override + public void setDouble(ConfigOption<Double> key, double value) { + this.backingConfig.setDouble(prefix + key.key(), value); + } + + @Override public byte[] getBytes(final String key, final byte[] defaultValue) { return this.backingConfig.getBytes(this.prefix + key, defaultValue); } @@ -144,6 +213,11 @@ public final class DelegatingConfiguration extends Configuration { } @Override + public String getValue(ConfigOption<?> configOption) { + return this.backingConfig.getValue(prefixOption(configOption, prefix)); + } + + @Override public void addAllToProperties(Properties props) { // only add keys with our prefix synchronized (backingConfig.confData) { @@ -195,6 +269,27 @@ public final class DelegatingConfiguration extends Configuration { return set; } + @Override + public Configuration clone() { + return new DelegatingConfiguration(backingConfig.clone(), prefix); + } + + @Override + public Map<String, String> toMap() { + Map<String, String> map = backingConfig.toMap(); + Map<String, String> prefixed = new HashMap<>(map.size()); + for (Map.Entry<String, String> entry : map.entrySet()) { + prefixed.put(prefix + entry.getKey(), entry.getValue()); + } + + return prefixed; + } + + @Override + public boolean containsKey(String key) { + return backingConfig.containsKey(prefix + key); + } + // -------------------------------------------------------------------------------------------- @Override @@ -225,4 +320,23 @@ public final class DelegatingConfiguration extends Configuration { return false; } } + + // -------------------------------------------------------------------------------------------- + + private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, String prefix) { + String key = prefix + option.key(); + + List<String> deprecatedKeys; + if (option.hasDeprecatedKeys()) { + deprecatedKeys = new ArrayList<>(); + for (String dk : option.deprecatedKeys()) { + deprecatedKeys.add(prefix + dk); + } + } else { + deprecatedKeys = Collections.emptyList(); + } + + String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]); + return new ConfigOption<T>(key, option.defaultValue(), deprecated); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index cf3c908..91c5f65 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.configuration; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,7 +34,7 @@ import org.junit.Test; * objects is tested. */ public class ConfigurationTest extends TestLogger { - + private static final byte[] EMPTY_BYTES = new byte[0]; private static final long TOO_LONG = Integer.MAX_VALUE + 10L; private static final double TOO_LONG_DOUBLE = Double.MAX_VALUE; @@ -73,7 +74,7 @@ public class ConfigurationTest extends TestLogger { fail(e.getMessage()); } } - + @Test public void testConversions() { try { @@ -175,7 +176,7 @@ public class ConfigurationTest extends TestLogger { fail(e.getMessage()); } } - + @Test public void testCopyConstructor() { try { @@ -194,4 +195,92 @@ public class ConfigurationTest extends TestLogger { fail(e.getMessage()); } } + + @Test + public void testOptionWithDefault() { + Configuration cfg = new Configuration(); + cfg.setInteger("int-key", 11); + cfg.setString("string-key", "abc"); + + ConfigOption<String> presentStringOption = ConfigOptions.key("string-key").defaultValue("my-beautiful-default"); + ConfigOption<Integer> presentIntOption = ConfigOptions.key("int-key").defaultValue(87); + + assertEquals("abc", cfg.getString(presentStringOption)); + assertEquals("abc", cfg.getValue(presentStringOption)); + + assertEquals(11, cfg.getInteger(presentIntOption)); + assertEquals("11", cfg.getValue(presentIntOption)); + + // test getting default when no value is present + + ConfigOption<String> stringOption = ConfigOptions.key("test").defaultValue("my-beautiful-default"); + ConfigOption<Integer> intOption = ConfigOptions.key("test2").defaultValue(87); + + // getting strings with default value should work + assertEquals("my-beautiful-default", cfg.getValue(stringOption)); + assertEquals("my-beautiful-default", cfg.getString(stringOption)); + + // overriding the default should work + assertEquals("override", cfg.getString(stringOption, "override")); + + // getting a primitive with a default value should work + assertEquals(87, cfg.getInteger(intOption)); + assertEquals("87", cfg.getValue(intOption)); + } + + @Test + public void testOptionWithNoDefault() { + Configuration cfg = new Configuration(); + cfg.setInteger("int-key", 11); + cfg.setString("string-key", "abc"); + + ConfigOption<String> presentStringOption = ConfigOptions.key("string-key").noDefaultValue(); + + assertEquals("abc", cfg.getString(presentStringOption)); + assertEquals("abc", cfg.getValue(presentStringOption)); + + // test getting default when no value is present + + ConfigOption<String> stringOption = ConfigOptions.key("test").noDefaultValue(); + + // getting strings for null should work + assertNull(cfg.getValue(stringOption)); + assertNull(cfg.getString(stringOption)); + + // overriding the null default should work + assertEquals("override", cfg.getString(stringOption, "override")); + } + + @Test + public void testDeprecatedKeys() { + Configuration cfg = new Configuration(); + cfg.setInteger("the-key", 11); + cfg.setInteger("old-key", 12); + cfg.setInteger("older-key", 13); + + ConfigOption<Integer> matchesFirst = ConfigOptions + .key("the-key") + .defaultValue(-1) + .withDeprecatedKeys("old-key", "older-key"); + + ConfigOption<Integer> matchesSecond = ConfigOptions + .key("does-not-exist") + .defaultValue(-1) + .withDeprecatedKeys("old-key", "older-key"); + + ConfigOption<Integer> matchesThird = ConfigOptions + .key("does-not-exist") + .defaultValue(-1) + .withDeprecatedKeys("foo", "older-key"); + + ConfigOption<Integer> notContained = ConfigOptions + .key("does-not-exist") + .defaultValue(-1) + .withDeprecatedKeys("not-there", "also-not-there"); + + assertEquals(11, cfg.getInteger(matchesFirst)); + assertEquals(12, cfg.getInteger(matchesSecond)); + assertEquals(13, cfg.getInteger(matchesThird)); + assertEquals(-1, cfg.getInteger(notContained)); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java index d8b782d..9298a14 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java @@ -24,8 +24,6 @@ import org.junit.Test; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.Comparator; import java.util.Set; import static org.junit.Assert.assertTrue; @@ -34,60 +32,43 @@ import static org.junit.Assert.assertEquals; public class DelegatingConfigurationTest { - /** - * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated - */ @Test public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { - Comparator<Method> methodComparator = new Comparator<Method>() { - @Override - public int compare(Method o1, Method o2) { - String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes()); - String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes()); - return o1Str.compareTo( o2Str ); - } - - private String typeParamToString(Class<?>[] classes) { - String str = ""; - for(Object t : classes) { - str += t.toString(); - } - return str; - } - }; - // For each method in the Configuration class... Method[] confMethods = Configuration.class.getDeclaredMethods(); Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods(); - Arrays.sort(confMethods, methodComparator); - Arrays.sort(delegateMethods, methodComparator); - match : for (Method configurationMethod : confMethods) { - boolean hasMethod = false; - if(!Modifier.isPublic(configurationMethod.getModifiers()) ) { + + for (Method configurationMethod : confMethods) { + if (!Modifier.isPublic(configurationMethod.getModifiers()) ) { continue; } + + boolean hasMethod = false; + // Find matching method in wrapper class and call it - mismatch: for (Method wrapperMethod : delegateMethods) { + lookForWrapper: for (Method wrapperMethod : delegateMethods) { if (configurationMethod.getName().equals(wrapperMethod.getName())) { - + // Get parameters for method Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes(); Class<?>[] configMethodParams = configurationMethod.getParameterTypes(); - if(wrapperMethodParams.length != configMethodParams.length) { - System.err.println("Length"); - break mismatch; + if (wrapperMethodParams.length != configMethodParams.length) { + continue; } - for(int i = 0; i < wrapperMethodParams.length; i++) { - if(wrapperMethodParams[i] != configMethodParams[i]) { - break mismatch; + + for (int i = 0; i < wrapperMethodParams.length; i++) { + if (wrapperMethodParams[i] != configMethodParams[i]) { + continue lookForWrapper; } } hasMethod = true; - break match; + break; } } - assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod); + + assertTrue("Configuration method '" + configurationMethod.getName() + + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java index 386d03b..26e3d7a 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java @@ -56,6 +56,9 @@ public class UnmodifiableConfigurationTest extends TestLogger { @Test public void testExceptionOnSet() { try { + @SuppressWarnings("rawtypes") + final ConfigOption rawOption = ConfigOptions.key("testkey").defaultValue("value"); + Map<Class<?>, Object> parameters = new HashMap<Class<?>, Object>(); parameters.put(byte[].class, new byte[0]); parameters.put(Class.class, Object.class); @@ -65,19 +68,22 @@ public class UnmodifiableConfigurationTest extends TestLogger { parameters.put(double.class, 0.0); parameters.put(String.class, ""); parameters.put(boolean.class, false); - + Class<UnmodifiableConfiguration> clazz = UnmodifiableConfiguration.class; UnmodifiableConfiguration config = new UnmodifiableConfiguration(new Configuration()); - + for (Method m : clazz.getMethods()) { if (m.getName().startsWith("set")) { - + + Class<?> keyClass = m.getParameterTypes()[0]; Class<?> parameterClass = m.getParameterTypes()[1]; + Object key = keyClass == String.class ? "key" : rawOption; + Object parameter = parameters.get(parameterClass); assertNotNull("method " + m + " not covered by test", parameter); - + try { - m.invoke(config, "key", parameter); + m.invoke(config, key, parameter); fail("should fail with an exception"); } catch (InvocationTargetException e) {