[FLINK-4764] [core] Introduce Config Options

This is a more concise and maintainable way to define configuration keys, 
default values,
deprecated keys, etc.

This closes #2605


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d71a09cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d71a09cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d71a09cc

Branch: refs/heads/flip-6
Commit: d71a09cc2a36a877e8287db8d9fe84134a4901ba
Parents: 05436f4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 7 15:24:44 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigOption.java       | 171 ++++++++
 .../flink/configuration/ConfigOptions.java      | 116 ++++++
 .../flink/configuration/Configuration.java      | 407 +++++++++++++++----
 .../configuration/DelegatingConfiguration.java  | 118 +++++-
 .../flink/configuration/ConfigurationTest.java  |  95 ++++-
 .../DelegatingConfigurationTest.java            |  55 +--
 .../UnmodifiableConfigurationTest.java          |  16 +-
 7 files changed, 844 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
new file mode 100644
index 0000000..3531f6d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code ConfigOption} describes a configuration parameter. It encapsulates
+ * the configuration key, deprecated older versions of the key, and an optional
+ * default value for the configuration parameter.
+ * 
+ * <p>{@code ConfigOptions} are built via the {@link ConfigOptions} class.
+ * Once created, a config option is immutable.
+ * 
+ * @param <T> The type of value associated with the configuration option.
+ */
+@PublicEvolving
+public class ConfigOption<T> {
+
+       private static final String[] EMPTY = new String[0];
+
+       // 
------------------------------------------------------------------------
+
+       /** The current key for that config option */
+       private final String key;
+
+       /** The list of deprecated keys, in the order to be checked */
+       private final String[] deprecatedKeys;
+
+       /** The default value for this option */
+       private final T defaultValue;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new config option with no deprecated keys.
+        *
+        * @param key             The current key for that config option
+        * @param defaultValue    The default value for this option
+        */
+       ConfigOption(String key, T defaultValue) {
+               this.key = checkNotNull(key);
+               this.defaultValue = defaultValue;
+               this.deprecatedKeys = EMPTY;
+       }
+
+       /**
+        * Creates a new config option with deprecated keys.
+        *
+        * @param key             The current key for that config option
+        * @param defaultValue    The default value for this option
+        * @param deprecatedKeys  The list of deprecated keys, in the order to 
be checked
+        */
+       ConfigOption(String key, T defaultValue, String... deprecatedKeys) {
+               this.key = checkNotNull(key);
+               this.defaultValue = defaultValue;
+               this.deprecatedKeys = deprecatedKeys == null || 
deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new config option, using this option's key and default 
value, and
+        * adding the given deprecated keys.
+        * 
+        * <p>When obtaining a value from the configuration via {@link 
Configuration#getValue(ConfigOption)},
+        * the deprecated keys will be checked in the order provided to this 
method. The first key for which
+        * a value is found will be used - that value will be returned.
+        * 
+        * @param deprecatedKeys The deprecated keys, in the order in which 
they should be checked.
+        * @return A new config options, with the given deprecated keys.
+        */
+       public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
+               return new ConfigOption<>(key, defaultValue, deprecatedKeys);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the configuration key.
+        * @return The configuration key
+        */
+       public String key() {
+               return key;
+       }
+
+       /**
+        * Checks if this option has a default value.
+        * @return True if it has a default value, false if not.
+        */
+       public boolean hasDefaultValue() {
+               return defaultValue != null;
+       }
+
+       /**
+        * Returns the default value, or null, if there is no default value.
+        * @return The default value, or null.
+        */
+       public T defaultValue() {
+               return defaultValue;
+       }
+
+       /**
+        * Checks whether this option has deprecated keys.
+        * @return True if the option has deprecated keys, false if not.
+        */
+       public boolean hasDeprecatedKeys() {
+               return deprecatedKeys != EMPTY;
+       }
+
+       /**
+        * Gets the deprecated keys, in the order to be checked.
+        * @return The option's deprecated keys.
+        */
+       public Iterable<String> deprecatedKeys() {
+               return deprecatedKeys == EMPTY ? 
Collections.<String>emptyList() : Arrays.asList(deprecatedKeys);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               else if (o != null && o.getClass() == ConfigOption.class) {
+                       ConfigOption<?> that = (ConfigOption<?>) o;
+                       return this.key.equals(that.key) &&
+                                       Arrays.equals(this.deprecatedKeys, 
that.deprecatedKeys) &&
+                                       (this.defaultValue == null ? 
that.defaultValue == null :
+                                                       (that.defaultValue != 
null && this.defaultValue.equals(that.defaultValue)));
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * key.hashCode() +
+                               17 * Arrays.hashCode(deprecatedKeys) +
+                               (defaultValue != null ? defaultValue.hashCode() 
: 0);
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Key: '%s' , default: %s (deprecated keys: 
%s)",
+                               key, defaultValue, 
Arrays.toString(deprecatedKeys));
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
new file mode 100644
index 0000000..f87da0a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code ConfigOptions} are used to build a {@link ConfigOption}.
+ * The option is typically built in one of the following pattern:
+ * 
+ * <pre>{@code
+ * // simple string-valued option with a default value
+ * ConfigOption<String> tempDirs = ConfigOptions
+ *     .key("tmp.dir")
+ *     .defaultValue("/tmp");
+ * 
+ * // simple integer-valued option with a default value
+ * ConfigOption<Integer> parallelism = ConfigOptions
+ *     .key("application.parallelism")
+ *     .defaultValue(100);
+ * 
+ * // option with no default value
+ * ConfigOption<String> userName = ConfigOptions
+ *     .key("user.name")
+ *     .noDefaultValue();
+ * 
+ * // option with deprecated keys to check
+ * ConfigOption<Double> threshold = ConfigOptions
+ *     .key("cpu.utilization.threshold")
+ *     .defaultValue(0.9).
+ *     .withDeprecatedKeys("cpu.threshold");
+ * }</pre>
+ */
+@PublicEvolving
+public class ConfigOptions {
+
+       /**
+        * Starts building a new {@link ConfigOption}.
+        * 
+        * @param key The key for the config option.
+        * @return The builder for the config option with the given key.
+        */
+       public static OptionBuilder key(String key) {
+               checkNotNull(key);
+               return new OptionBuilder(key);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The option builder is used to create a {@link ConfigOption}.
+        * It is instantiated via {@link ConfigOptions#key(String)}.
+        */
+       public static final class OptionBuilder {
+
+               /** The key for the config option */
+               private final String key;
+
+               /**
+                * Creates a new OptionBuilder.
+                * @param key The key for the config option
+                */
+               OptionBuilder(String key) {
+                       this.key = key;
+               }
+
+               /**
+                * Creates a ConfigOption with the given default value.
+                * 
+                * <p>This method does not accept "null". For options with no 
default value, choose
+                * one of the {@code noDefaultValue} methods.
+                * 
+                * @param value The default value for the config option
+                * @param <T> The type of the default value.
+                * @return The config option with the default value.
+                */
+               public <T> ConfigOption<T> defaultValue(T value) {
+                       checkNotNull(value);
+                       return new ConfigOption<T>(key, value);
+               }
+
+               /**
+                * Creates a string-valued option with no default value.
+                * String-valued options are the only ones that can have no
+                * default value.
+                * 
+                * @return The created ConfigOption.
+                */
+               public ConfigOption<String> noDefaultValue() {
+                       return new ConfigOption<>(key, null);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** Not intended to be instantiated */
+       private ConfigOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 8ca5d07..f15c669 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -134,7 +135,33 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                        return o.toString();
                }
        }
-       
+
+       /**
+        * Returns the value associated with the given config option as a 
string.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public String getString(ConfigOption<String> configOption) {
+               Object o = getValueOrDefaultFromOption(configOption);
+               return o == null ? null : o.toString();
+       }
+
+       /**
+        * Returns the value associated with the given config option as a 
string.
+        * If no value is mapped under any key of the option, it returns the 
specified
+        * default instead of the option's default value.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public String getString(ConfigOption<String> configOption, String 
overrideDefault) {
+               Object o = getRawValueFromOption(configOption);
+               return o == null ? overrideDefault : o.toString();
+       }
+
        /**
         * Adds the given key/value pair to the configuration object.
         * 
@@ -148,6 +175,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        }
 
        /**
+        * Adds the given value to the configuration object.
+        * The main key of the config option will be used to map the value.
+        *
+        * @param key
+        *        the option specifying the key to be added
+        * @param value
+        *        the value of the key/value pair to be added
+        */
+       @PublicEvolving
+       public void setString(ConfigOption<String> key, String value) {
+               setValueInternal(key.key(), value);
+       }
+
+       /**
         * Returns the value associated with the given key as an integer.
         * 
         * @param key
@@ -161,28 +202,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                if (o == null) {
                        return defaultValue;
                }
-               
-               if (o.getClass() == Integer.class) {
-                       return (Integer) o;
-               }
-               else if (o.getClass() == Long.class) {
-                       long value = (Long) o;
-                       if (value <= Integer.MAX_VALUE && value >= 
Integer.MIN_VALUE) {
-                               return (int) value;
-                       } else {
-                               LOG.warn("Configuration value {} 
overflows/underflows the integer type.", value);
-                               return defaultValue;
-                       }
-               }
-               else {
-                       try {
-                               return Integer.parseInt(o.toString());
-                       }
-                       catch (NumberFormatException e) {
-                               LOG.warn("Configuration cannot evaluate value 
{} as an integer number", o);
-                               return defaultValue;
-                       }
-               }
+
+               return convertToInt(o, defaultValue);
+       }
+
+       /**
+        * Returns the value associated with the given config option as an 
integer.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public int getInteger(ConfigOption<Integer> configOption) {
+               Object o = getValueOrDefaultFromOption(configOption);
+               return convertToInt(o, configOption.defaultValue());
        }
 
        /**
@@ -198,6 +231,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        }
 
        /**
+        * Adds the given value to the configuration object.
+        * The main key of the config option will be used to map the value.
+        *
+        * @param key
+        *        the option specifying the key to be added
+        * @param value
+        *        the value of the key/value pair to be added
+        */
+       @PublicEvolving
+       public void setInteger(ConfigOption<Integer> key, int value) {
+               setValueInternal(key.key(), value);
+       }
+
+       /**
         * Returns the value associated with the given key as a long.
         * 
         * @param key
@@ -211,22 +258,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                if (o == null) {
                        return defaultValue;
                }
-               
-               if (o.getClass() == Long.class) {
-                       return (Long) o;
-               }
-               else if (o.getClass() == Integer.class) {
-                       return ((Integer) o).longValue();
-               }
-               else {
-                       try {
-                               return Long.parseLong(o.toString());
-                       }
-                       catch (NumberFormatException e) {
-                               LOG.warn("Configuration cannot evaluate value " 
+ o + " as a long integer number");
-                               return defaultValue;
-                       }
-               }
+
+               return convertToLong(o, defaultValue);
+       }
+
+       /**
+        * Returns the value associated with the given config option as a long 
integer.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public long getLong(ConfigOption<Long> configOption) {
+               Object o = getValueOrDefaultFromOption(configOption);
+               return convertToLong(o, configOption.defaultValue());
        }
 
        /**
@@ -242,6 +287,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        }
 
        /**
+        * Adds the given value to the configuration object.
+        * The main key of the config option will be used to map the value.
+        *
+        * @param key
+        *        the option specifying the key to be added
+        * @param value
+        *        the value of the key/value pair to be added
+        */
+       @PublicEvolving
+       public void setLong(ConfigOption<Long> key, long value) {
+               setValueInternal(key.key(), value);
+       }
+
+       /**
         * Returns the value associated with the given key as a boolean.
         * 
         * @param key
@@ -255,13 +314,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                if (o == null) {
                        return defaultValue;
                }
-               
-               if (o.getClass() == Boolean.class) {
-                       return (Boolean) o;
-               }
-               else {
-                       return Boolean.parseBoolean(o.toString());
-               }
+
+               return convertToBoolean(o);
+       }
+
+       /**
+        * Returns the value associated with the given config option as a 
boolean.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public boolean getBoolean(ConfigOption<Boolean> configOption) {
+               Object o = getValueOrDefaultFromOption(configOption);
+               return convertToBoolean(o);
        }
 
        /**
@@ -277,6 +343,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        }
 
        /**
+        * Adds the given value to the configuration object.
+        * The main key of the config option will be used to map the value.
+        *
+        * @param key
+        *        the option specifying the key to be added
+        * @param value
+        *        the value of the key/value pair to be added
+        */
+       @PublicEvolving
+       public void setBoolean(ConfigOption<Boolean> key, boolean value) {
+               setValueInternal(key.key(), value);
+       }
+
+       /**
         * Returns the value associated with the given key as a float.
         * 
         * @param key
@@ -290,28 +370,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                if (o == null) {
                        return defaultValue;
                }
-               
-               if (o.getClass() == Float.class) {
-                       return (Float) o;
-               }
-               else if (o.getClass() == Double.class) {
-                       double value = ((Double) o);
-                       if (value <= Float.MAX_VALUE && value >= 
Float.MIN_VALUE) {
-                               return (float) value;
-                       } else {
-                               LOG.warn("Configuration value {} 
overflows/underflows the float type.", value);
-                               return defaultValue;
-                       }
-               }
-               else {
-                       try {
-                               return Float.parseFloat(o.toString());
-                       }
-                       catch (NumberFormatException e) {
-                               LOG.warn("Configuration cannot evaluate value 
{} as a float value", o);
-                               return defaultValue;
-                       }
-               }
+
+               return convertToFloat(o, defaultValue);
+       }
+
+       /**
+        * Returns the value associated with the given config option as a float.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public float getFloat(ConfigOption<Float> configOption) {
+               Object o = getValueOrDefaultFromOption(configOption);
+               return convertToFloat(o, configOption.defaultValue());
        }
 
        /**
@@ -325,7 +397,21 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        public void setFloat(String key, float value) {
                setValueInternal(key, value);
        }
-       
+
+       /**
+        * Adds the given value to the configuration object.
+        * The main key of the config option will be used to map the value.
+        *
+        * @param key
+        *        the option specifying the key to be added
+        * @param value
+        *        the value of the key/value pair to be added
+        */
+       @PublicEvolving
+       public void setFloat(ConfigOption<Float> key, float value) {
+               setValueInternal(key.key(), value);
+       }
+
        /**
         * Returns the value associated with the given key as a double.
         * 
@@ -340,22 +426,20 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                if (o == null) {
                        return defaultValue;
                }
-               
-               if (o.getClass() == Double.class) {
-                       return (Double) o;
-               }
-               else if (o.getClass() == Float.class) {
-                       return ((Float) o).doubleValue();
-               }
-               else {
-                       try {
-                               return Double.parseDouble(o.toString());
-                       }
-                       catch (NumberFormatException e) {
-                               LOG.warn("Configuration cannot evaluate value 
{} as a double value", o);
-                               return defaultValue;
-                       }
-               }
+
+               return convertToDouble(o, defaultValue);
+       }
+
+       /**
+        * Returns the value associated with the given config option as a 
{@code double}.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public double getDouble(ConfigOption<Double> configOption) {
+               Object o = getValueOrDefaultFromOption(configOption);
+               return convertToDouble(o, configOption.defaultValue());
        }
 
        /**
@@ -369,7 +453,21 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        public void setDouble(String key, double value) {
                setValueInternal(key, value);
        }
-       
+
+       /**
+        * Adds the given value to the configuration object.
+        * The main key of the config option will be used to map the value.
+        *
+        * @param key
+        *        the option specifying the key to be added
+        * @param value
+        *        the value of the key/value pair to be added
+        */
+       @PublicEvolving
+       public void setDouble(ConfigOption<Double> key, double value) {
+               setValueInternal(key.key(), value);
+       }
+
        /**
         * Returns the value associated with the given key as a byte array.
         * 
@@ -407,6 +505,18 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                setValueInternal(key, bytes);
        }
 
+       /**
+        * Returns the value associated with the given config option as a 
string.
+        *
+        * @param configOption The configuration option
+        * @return the (default) value associated with the given config option
+        */
+       @PublicEvolving
+       public String getValue(ConfigOption<?> configOption) {
+               Object o = getValueOrDefaultFromOption(configOption);
+               return o == null ? null : o.toString();
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        /**
@@ -523,7 +633,130 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                        return this.confData.get(key);
                }
        }
-       
+
+       private Object getRawValueFromOption(ConfigOption<?> configOption) {
+               // first try the current key
+               Object o = getRawValue(configOption.key());
+
+               if (o != null) {
+                       return o;
+               }
+               else if (configOption.hasDeprecatedKeys()) {
+                       for (String deprecatedKey : 
configOption.deprecatedKeys()) {
+                               Object oo = getRawValue(deprecatedKey);
+                               if (oo != null) {
+                                       return oo;
+                               }
+                       }
+               }
+
+               return null;
+       }
+
+       private Object getValueOrDefaultFromOption(ConfigOption<?> 
configOption) {
+               Object o = getRawValueFromOption(configOption);
+               return o != null ? o : configOption.defaultValue();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Type conversion
+       // 
--------------------------------------------------------------------------------------------
+
+       private int convertToInt(Object o, int defaultValue) {
+               if (o.getClass() == Integer.class) {
+                       return (Integer) o;
+               }
+               else if (o.getClass() == Long.class) {
+                       long value = (Long) o;
+                       if (value <= Integer.MAX_VALUE && value >= 
Integer.MIN_VALUE) {
+                               return (int) value;
+                       } else {
+                               LOG.warn("Configuration value {} 
overflows/underflows the integer type.", value);
+                               return defaultValue;
+                       }
+               }
+               else {
+                       try {
+                               return Integer.parseInt(o.toString());
+                       }
+                       catch (NumberFormatException e) {
+                               LOG.warn("Configuration cannot evaluate value 
{} as an integer number", o);
+                               return defaultValue;
+                       }
+               }
+       }
+
+       private long convertToLong(Object o, long defaultValue) {
+               if (o.getClass() == Long.class) {
+                       return (Long) o;
+               }
+               else if (o.getClass() == Integer.class) {
+                       return ((Integer) o).longValue();
+               }
+               else {
+                       try {
+                               return Long.parseLong(o.toString());
+                       }
+                       catch (NumberFormatException e) {
+                               LOG.warn("Configuration cannot evaluate value " 
+ o + " as a long integer number");
+                               return defaultValue;
+                       }
+               }
+       }
+
+       private boolean convertToBoolean(Object o) {
+               if (o.getClass() == Boolean.class) {
+                       return (Boolean) o;
+               }
+               else {
+                       return Boolean.parseBoolean(o.toString());
+               }
+       }
+
+       private float convertToFloat(Object o, float defaultValue) {
+               if (o.getClass() == Float.class) {
+                       return (Float) o;
+               }
+               else if (o.getClass() == Double.class) {
+                       double value = ((Double) o);
+                       if (value <= Float.MAX_VALUE && value >= 
Float.MIN_VALUE) {
+                               return (float) value;
+                       } else {
+                               LOG.warn("Configuration value {} 
overflows/underflows the float type.", value);
+                               return defaultValue;
+                       }
+               }
+               else {
+                       try {
+                               return Float.parseFloat(o.toString());
+                       }
+                       catch (NumberFormatException e) {
+                               LOG.warn("Configuration cannot evaluate value 
{} as a float value", o);
+                               return defaultValue;
+                       }
+               }
+       }
+
+       private double convertToDouble(Object o, double defaultValue) {
+               if (o.getClass() == Double.class) {
+                       return (Double) o;
+               }
+               else if (o.getClass() == Float.class) {
+                       return ((Float) o).doubleValue();
+               }
+               else {
+                       try {
+                               return Double.parseDouble(o.toString());
+                       }
+                       catch (NumberFormatException e) {
+                               LOG.warn("Configuration cannot evaluate value 
{} as a double value", o);
+                               return defaultValue;
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Serialization
        // 
--------------------------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index dba77f3..bd9a962 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
 import org.apache.flink.core.memory.DataInputView;
@@ -22,7 +23,11 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -55,8 +60,7 @@ public final class DelegatingConfiguration extends 
Configuration {
         * @param backingConfig The configuration holding the actual config 
data.
         * @param prefix The prefix prepended to all config keys.
         */
-       public DelegatingConfiguration(Configuration backingConfig, String 
prefix)
-       {
+       public DelegatingConfiguration(Configuration backingConfig, String 
prefix) {
                this.backingConfig = Preconditions.checkNotNull(backingConfig);
                this.prefix = prefix;
        }
@@ -69,11 +73,26 @@ public final class DelegatingConfiguration extends 
Configuration {
        }
 
        @Override
+       public String getString(ConfigOption<String> configOption) {
+               return  this.backingConfig.getString(prefixOption(configOption, 
prefix));
+       }
+
+       @Override
+       public String getString(ConfigOption<String> configOption, String 
overrideDefault) {
+               return  this.backingConfig.getString(prefixOption(configOption, 
prefix), overrideDefault);
+       }
+
+       @Override
        public void setString(String key, String value) {
                this.backingConfig.setString(this.prefix + key, value);
        }
 
        @Override
+       public void setString(ConfigOption<String> key, String value) {
+               this.backingConfig.setString(prefix + key.key(), value);
+       }
+
+       @Override
        public <T> Class<T> getClass(String key, Class<? extends T> 
defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
                return this.backingConfig.getClass(this.prefix + key, 
defaultValue, classLoader);
        }
@@ -89,51 +108,101 @@ public final class DelegatingConfiguration extends 
Configuration {
        }
 
        @Override
+       public int getInteger(ConfigOption<Integer> configOption) {
+               return  
this.backingConfig.getInteger(prefixOption(configOption, prefix));
+       }
+
+       @Override
        public void setInteger(String key, int value) {
                this.backingConfig.setInteger(this.prefix + key, value);
        }
 
        @Override
+       public void setInteger(ConfigOption<Integer> key, int value) {
+               this.backingConfig.setInteger(prefix + key.key(), value);
+       }
+
+       @Override
        public long getLong(String key, long defaultValue) {
                return this.backingConfig.getLong(this.prefix + key, 
defaultValue);
        }
 
        @Override
+       public long getLong(ConfigOption<Long> configOption) {
+               return  this.backingConfig.getLong(prefixOption(configOption, 
prefix));
+       }
+
+       @Override
        public void setLong(String key, long value) {
                this.backingConfig.setLong(this.prefix + key, value);
        }
 
        @Override
+       public void setLong(ConfigOption<Long> key, long value) {
+               this.backingConfig.setLong(prefix + key.key(), value);
+       }
+
+       @Override
        public boolean getBoolean(String key, boolean defaultValue) {
                return this.backingConfig.getBoolean(this.prefix + key, 
defaultValue);
        }
 
        @Override
+       public boolean getBoolean(ConfigOption<Boolean> configOption) {
+               return  
this.backingConfig.getBoolean(prefixOption(configOption, prefix));
+       }
+
+       @Override
        public void setBoolean(String key, boolean value) {
                this.backingConfig.setBoolean(this.prefix + key, value);
        }
 
        @Override
+       public void setBoolean(ConfigOption<Boolean> key, boolean value) {
+               this.backingConfig.setBoolean(prefix + key.key(), value);
+       }
+
+       @Override
        public float getFloat(String key, float defaultValue) {
                return this.backingConfig.getFloat(this.prefix + key, 
defaultValue);
        }
 
        @Override
+       public float getFloat(ConfigOption<Float> configOption) {
+               return this.backingConfig.getFloat(prefixOption(configOption, 
prefix));
+       }
+
+       @Override
        public void setFloat(String key, float value) {
                this.backingConfig.setFloat(this.prefix + key, value);
        }
 
        @Override
+       public void setFloat(ConfigOption<Float> key, float value) {
+               this.backingConfig.setFloat(prefix + key.key(), value);
+       }
+
+       @Override
        public double getDouble(String key, double defaultValue) {
                return this.backingConfig.getDouble(this.prefix + key, 
defaultValue);
        }
 
        @Override
+       public double getDouble(ConfigOption<Double> configOption) {
+               return this.backingConfig.getDouble(prefixOption(configOption, 
prefix));
+       }
+
+       @Override
        public void setDouble(String key, double value) {
                this.backingConfig.setDouble(this.prefix + key, value);
        }
 
        @Override
+       public void setDouble(ConfigOption<Double> key, double value) {
+               this.backingConfig.setDouble(prefix + key.key(), value);
+       }
+
+       @Override
        public byte[] getBytes(final String key, final byte[] defaultValue) {
                return this.backingConfig.getBytes(this.prefix + key, 
defaultValue);
        }
@@ -144,6 +213,11 @@ public final class DelegatingConfiguration extends 
Configuration {
        }
 
        @Override
+       public String getValue(ConfigOption<?> configOption) {
+               return this.backingConfig.getValue(prefixOption(configOption, 
prefix));
+       }
+
+       @Override
        public void addAllToProperties(Properties props) {
                // only add keys with our prefix
                synchronized (backingConfig.confData) {
@@ -195,6 +269,27 @@ public final class DelegatingConfiguration extends 
Configuration {
                return set;
        }
 
+       @Override
+       public Configuration clone() {
+               return new DelegatingConfiguration(backingConfig.clone(), 
prefix);
+       }
+
+       @Override
+       public Map<String, String> toMap() {
+               Map<String, String> map = backingConfig.toMap();
+               Map<String, String> prefixed = new HashMap<>(map.size());
+               for (Map.Entry<String, String> entry : map.entrySet()) {
+                       prefixed.put(prefix + entry.getKey(), entry.getValue());
+               }
+
+               return prefixed; 
+       }
+
+       @Override
+       public boolean containsKey(String key) {
+               return backingConfig.containsKey(prefix + key);
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        @Override
@@ -225,4 +320,23 @@ public final class DelegatingConfiguration extends 
Configuration {
                        return false;
                }
        }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, 
String prefix) {
+               String key = prefix + option.key();
+
+               List<String> deprecatedKeys;
+               if (option.hasDeprecatedKeys()) {
+                       deprecatedKeys = new ArrayList<>();
+                       for (String dk : option.deprecatedKeys()) {
+                               deprecatedKeys.add(prefix + dk);
+                       }
+               } else {
+                       deprecatedKeys = Collections.emptyList();
+               }
+
+               String[] deprecated = deprecatedKeys.toArray(new 
String[deprecatedKeys.size()]);
+               return new ConfigOption<T>(key, option.defaultValue(), 
deprecated);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index cf3c908..91c5f65 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.configuration;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,7 +34,7 @@ import org.junit.Test;
  * objects is tested.
  */
 public class ConfigurationTest extends TestLogger {
-       
+
        private static final byte[] EMPTY_BYTES = new byte[0];
        private static final long TOO_LONG = Integer.MAX_VALUE + 10L;
        private static final double TOO_LONG_DOUBLE = Double.MAX_VALUE;
@@ -73,7 +74,7 @@ public class ConfigurationTest extends TestLogger {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void testConversions() {
                try {
@@ -175,7 +176,7 @@ public class ConfigurationTest extends TestLogger {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void testCopyConstructor() {
                try {
@@ -194,4 +195,92 @@ public class ConfigurationTest extends TestLogger {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testOptionWithDefault() {
+               Configuration cfg = new Configuration();
+               cfg.setInteger("int-key", 11);
+               cfg.setString("string-key", "abc");
+
+               ConfigOption<String> presentStringOption = 
ConfigOptions.key("string-key").defaultValue("my-beautiful-default");
+               ConfigOption<Integer> presentIntOption = 
ConfigOptions.key("int-key").defaultValue(87);
+
+               assertEquals("abc", cfg.getString(presentStringOption));
+               assertEquals("abc", cfg.getValue(presentStringOption));
+
+               assertEquals(11, cfg.getInteger(presentIntOption));
+               assertEquals("11", cfg.getValue(presentIntOption));
+
+               // test getting default when no value is present
+
+               ConfigOption<String> stringOption = 
ConfigOptions.key("test").defaultValue("my-beautiful-default");
+               ConfigOption<Integer> intOption = 
ConfigOptions.key("test2").defaultValue(87);
+
+               // getting strings with default value should work
+               assertEquals("my-beautiful-default", 
cfg.getValue(stringOption));
+               assertEquals("my-beautiful-default", 
cfg.getString(stringOption));
+
+               // overriding the default should work
+               assertEquals("override", cfg.getString(stringOption, 
"override"));
+
+               // getting a primitive with a default value should work
+               assertEquals(87, cfg.getInteger(intOption));
+               assertEquals("87", cfg.getValue(intOption));
+       }
+
+       @Test
+       public void testOptionWithNoDefault() {
+               Configuration cfg = new Configuration();
+               cfg.setInteger("int-key", 11);
+               cfg.setString("string-key", "abc");
+
+               ConfigOption<String> presentStringOption = 
ConfigOptions.key("string-key").noDefaultValue();
+
+               assertEquals("abc", cfg.getString(presentStringOption));
+               assertEquals("abc", cfg.getValue(presentStringOption));
+
+               // test getting default when no value is present
+
+               ConfigOption<String> stringOption = 
ConfigOptions.key("test").noDefaultValue();
+
+               // getting strings for null should work
+               assertNull(cfg.getValue(stringOption));
+               assertNull(cfg.getString(stringOption));
+
+               // overriding the null default should work
+               assertEquals("override", cfg.getString(stringOption, 
"override"));
+       }
+
+       @Test
+       public void testDeprecatedKeys() {
+               Configuration cfg = new Configuration();
+               cfg.setInteger("the-key", 11);
+               cfg.setInteger("old-key", 12);
+               cfg.setInteger("older-key", 13);
+
+               ConfigOption<Integer> matchesFirst = ConfigOptions
+                               .key("the-key")
+                               .defaultValue(-1)
+                               .withDeprecatedKeys("old-key", "older-key");
+
+               ConfigOption<Integer> matchesSecond = ConfigOptions
+                               .key("does-not-exist")
+                               .defaultValue(-1)
+                               .withDeprecatedKeys("old-key", "older-key");
+
+               ConfigOption<Integer> matchesThird = ConfigOptions
+                               .key("does-not-exist")
+                               .defaultValue(-1)
+                               .withDeprecatedKeys("foo", "older-key");
+
+               ConfigOption<Integer> notContained = ConfigOptions
+                               .key("does-not-exist")
+                               .defaultValue(-1)
+                               .withDeprecatedKeys("not-there", 
"also-not-there");
+
+               assertEquals(11, cfg.getInteger(matchesFirst));
+               assertEquals(12, cfg.getInteger(matchesSecond));
+               assertEquals(13, cfg.getInteger(matchesThird));
+               assertEquals(-1, cfg.getInteger(notContained));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
index d8b782d..9298a14 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -24,8 +24,6 @@ import org.junit.Test;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Set;
 
 import static org.junit.Assert.assertTrue;
@@ -34,60 +32,43 @@ import static org.junit.Assert.assertEquals;
 
 public class DelegatingConfigurationTest {
 
-       /**
-        * 
http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
-        */
        @Test
        public void testIfDelegatesImplementAllMethods() throws 
IllegalAccessException, IllegalArgumentException, InvocationTargetException {
 
-               Comparator<Method> methodComparator = new Comparator<Method>() {
-                       @Override
-                       public int compare(Method o1, Method o2) {
-                               String o1Str = o1.getName() + 
typeParamToString(o1.getParameterTypes());
-                               String o2Str = o2.getName() + 
typeParamToString(o2.getParameterTypes());
-                               return o1Str.compareTo( o2Str ); 
-                       }
-
-                       private String typeParamToString(Class<?>[] classes) {
-                               String str = "";
-                               for(Object t : classes) {
-                                       str += t.toString();
-                               }
-                               return str;
-                       }
-               };
-               
                // For each method in the Configuration class...
                Method[] confMethods = Configuration.class.getDeclaredMethods();
                Method[] delegateMethods = 
DelegatingConfiguration.class.getDeclaredMethods();
-               Arrays.sort(confMethods, methodComparator);
-               Arrays.sort(delegateMethods, methodComparator);
-               match : for (Method configurationMethod : confMethods) {
-                       boolean hasMethod = false;
-                       
if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
+
+               for (Method configurationMethod : confMethods) {
+                       if 
(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
                                continue;
                        }
+
+                       boolean hasMethod = false;
+
                        // Find matching method in wrapper class and call it
-                       mismatch: for (Method wrapperMethod : delegateMethods) {
+                       lookForWrapper: for (Method wrapperMethod : 
delegateMethods) {
                                if 
(configurationMethod.getName().equals(wrapperMethod.getName())) {
-                                       
+
                                        // Get parameters for method
                                        Class<?>[] wrapperMethodParams = 
wrapperMethod.getParameterTypes();
                                        Class<?>[] configMethodParams = 
configurationMethod.getParameterTypes();
-                                       if(wrapperMethodParams.length != 
configMethodParams.length) {
-                                               System.err.println("Length");
-                                               break mismatch;
+                                       if (wrapperMethodParams.length != 
configMethodParams.length) {
+                                               continue;
                                        }
-                                       for(int i = 0; i < 
wrapperMethodParams.length; i++) {
-                                               if(wrapperMethodParams[i] != 
configMethodParams[i]) {
-                                                       break mismatch;
+
+                                       for (int i = 0; i < 
wrapperMethodParams.length; i++) {
+                                               if (wrapperMethodParams[i] != 
configMethodParams[i]) {
+                                                       continue lookForWrapper;
                                                }
                                        }
                                        hasMethod = true;
-                                       break match;
+                                       break;
                                }
                        }
-                       assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
+
+                       assertTrue("Configuration method '" + 
configurationMethod.getName() + 
+                                       "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
index 386d03b..26e3d7a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -56,6 +56,9 @@ public class UnmodifiableConfigurationTest extends TestLogger 
{
        @Test
        public void testExceptionOnSet() {
                try {
+                       @SuppressWarnings("rawtypes")
+                       final ConfigOption rawOption = 
ConfigOptions.key("testkey").defaultValue("value");
+
                        Map<Class<?>, Object> parameters = new 
HashMap<Class<?>, Object>();
                        parameters.put(byte[].class, new byte[0]);
                        parameters.put(Class.class, Object.class);
@@ -65,19 +68,22 @@ public class UnmodifiableConfigurationTest extends 
TestLogger {
                        parameters.put(double.class, 0.0);
                        parameters.put(String.class, "");
                        parameters.put(boolean.class, false);
-                                       
+
                        Class<UnmodifiableConfiguration> clazz = 
UnmodifiableConfiguration.class;
                        UnmodifiableConfiguration config = new 
UnmodifiableConfiguration(new Configuration());
-                       
+
                        for (Method m : clazz.getMethods()) {
                                if (m.getName().startsWith("set")) {
-                                       
+
+                                       Class<?> keyClass = 
m.getParameterTypes()[0];
                                        Class<?> parameterClass = 
m.getParameterTypes()[1];
+                                       Object key = keyClass == String.class ? 
"key" : rawOption;
+
                                        Object parameter = 
parameters.get(parameterClass);
                                        assertNotNull("method " + m + " not 
covered by test", parameter);
-                                       
+
                                        try {
-                                               m.invoke(config, "key", 
parameter);
+                                               m.invoke(config, key, 
parameter);
                                                fail("should fail with an 
exception");
                                        }
                                        catch (InvocationTargetException e) {

Reply via email to