This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6ac09fb42a1d8cda99500b23f7247f19a272516
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Wed Nov 13 14:33:54 2019 +0100

    [hotfix] Change the signature of the ConfigUtils.encodeCollectionToConfig()
---
 .../flink/client/cli/ExecutionConfigAccessor.java  |   6 +-
 .../apache/flink/configuration/ConfigUtils.java    |  38 ++++---
 .../flink/configuration/ConfigUtilsTest.java       | 121 +++++++++++++++++++++
 3 files changed, 148 insertions(+), 17 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index 72a338a..77c0440 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -65,9 +65,7 @@ public class ExecutionConfigAccessor {
                configuration.setBoolean(DeploymentOptions.ATTACHED, 
!options.getDetachedMode());
                
configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, 
options.isShutdownOnAttachedExit());
 
-               if (options.getClasspaths() != null) {
-                       ConfigUtils.encodeStreamToConfig(configuration, 
PipelineOptions.CLASSPATHS, options.getClasspaths().stream(), URL::toString);
-               }
+               ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString);
 
                parseJarURLToConfig(options.getJarFilePath(), configuration);
 
@@ -84,7 +82,7 @@ public class ExecutionConfigAccessor {
                try {
                        final URL jarUrl = new 
File(jarFile).getAbsoluteFile().toURI().toURL();
                        final List<URL> jarUrlSingleton = 
Collections.singletonList(jarUrl);
-                       ConfigUtils.encodeStreamToConfig(configuration, 
PipelineOptions.JARS, jarUrlSingleton.stream(), URL::toString);
+                       ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.JARS, jarUrlSingleton, URL::toString);
                } catch (MalformedURLException e) {
                        throw new IllegalArgumentException("JAR file path 
invalid", e);
                }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
index be95041..4b41389 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
@@ -20,13 +20,16 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -38,37 +41,45 @@ public class ConfigUtils {
 
        /**
         * Puts an array of values of type {@code IN} in a {@link 
WritableConfig}
-        * as a {@link ConfigOption} of type {@link List} of type {@code OUT}.
+        * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. 
If the {@code values}
+        * is {@code null} or empty, then nothing is put in the configuration.
         *
         * @param configuration the configuration object to put the list in
         * @param key the {@link ConfigOption option} to serve as the key for 
the list in the configuration
-        * @param value the array of values to put as value for the {@code key}
+        * @param values the array of values to put as value for the {@code key}
         * @param mapper the transformation function from {@code IN} to {@code 
OUT}.
         */
        public static <IN, OUT> void encodeArrayToConfig(
                        final WritableConfig configuration,
                        final ConfigOption<List<OUT>> key,
-                       final IN[] value,
+                       @Nullable final IN[] values,
                        final Function<IN, OUT> mapper) {
-               if (value == null) {
+
+               checkNotNull(configuration);
+               checkNotNull(key);
+               checkNotNull(mapper);
+
+               if (values == null) {
                        return;
                }
-               encodeStreamToConfig(configuration, key, Arrays.stream(value), 
mapper);
+
+               encodeCollectionToConfig(configuration, key, 
Arrays.asList(values), mapper);
        }
 
        /**
-        * Puts a {@link Stream} of values of type {@code IN} in a {@link 
WritableConfig}
-        * as a {@link ConfigOption} of type {@link List} of type {@code OUT}.
+        * Puts a {@link Collection} of values of type {@code IN} in a {@link 
WritableConfig}
+        * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. 
If the {@code values}
+        * is {@code null} or empty, then nothing is put in the configuration.
         *
         * @param configuration the configuration object to put the list in
         * @param key the {@link ConfigOption option} to serve as the key for 
the list in the configuration
-        * @param values the stream of values to put as value for the {@code 
key}
+        * @param values the collection of values to put as value for the 
{@code key}
         * @param mapper the transformation function from {@code IN} to {@code 
OUT}.
         */
-       public static <IN, OUT> void encodeStreamToConfig(
+       public static <IN, OUT> void encodeCollectionToConfig(
                        final WritableConfig configuration,
                        final ConfigOption<List<OUT>> key,
-                       final Stream<IN> values,
+                       @Nullable final Collection<IN> values,
                        final Function<IN, OUT> mapper) {
 
                checkNotNull(configuration);
@@ -79,10 +90,11 @@ public class ConfigUtils {
                        return;
                }
 
-               final List<OUT> encodedOption = values
+               final List<OUT> encodedOption = values.stream()
+                               .filter(Objects::nonNull)
                                .map(mapper)
                                .filter(Objects::nonNull)
-                               .collect(Collectors.toList());
+                               
.collect(Collectors.toCollection(ArrayList::new));
 
                if (!encodedOption.isEmpty()) {
                        configuration.set(key, encodedOption);
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigUtilsTest.java
new file mode 100644
index 0000000..2edbd9f
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigUtilsTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests the {@link ConfigUtils} methods.
+ */
+public class ConfigUtilsTest {
+
+       private static final ConfigOption<List<String>> TEST_OPTION =
+                       key("test.option.key")
+                                       .stringType()
+                                       .asList()
+                                       .noDefaultValue();
+
+       private static final Integer[] intArray = {1, 3, 2, 4};
+       private static final List<Integer> intList = Arrays.asList(intArray);
+
+       @Test
+       public void collectionIsCorrectlyPutAndFetched() {
+               final Configuration configurationUnderTest = new 
Configuration();
+               ConfigUtils.encodeCollectionToConfig(configurationUnderTest, 
TEST_OPTION, intList, Object::toString);
+
+               final List<Integer> recovered = 
ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, 
Integer::valueOf);
+               assertThat(recovered, equalTo(intList));
+       }
+
+       @Test
+       public void arrayIsCorrectlyPutAndFetched() {
+               final Configuration configurationUnderTest = new 
Configuration();
+               ConfigUtils.encodeArrayToConfig(configurationUnderTest, 
TEST_OPTION, intArray, Object::toString);
+
+               final List<Integer> recovered = 
ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, 
Integer::valueOf);
+               assertThat(recovered, equalTo(intList));
+       }
+
+       @Test
+       public void nullCollectionPutsNothingInConfig() {
+               final Configuration configurationUnderTest = new 
Configuration();
+               ConfigUtils.encodeCollectionToConfig(configurationUnderTest, 
TEST_OPTION, null, Object::toString);
+
+               assertThat(configurationUnderTest.keySet(), is(empty()));
+
+               final Object recovered = 
configurationUnderTest.get(TEST_OPTION);
+               assertThat(recovered, is(nullValue()));
+
+               final List<Integer> recoveredList = 
ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, 
Integer::valueOf);
+               assertThat(recoveredList, is(empty()));
+       }
+
+       @Test
+       public void nullArrayPutsNothingInConfig() {
+               final Configuration configurationUnderTest = new 
Configuration();
+               ConfigUtils.encodeArrayToConfig(configurationUnderTest, 
TEST_OPTION, null, Object::toString);
+
+               assertThat(configurationUnderTest.keySet(), is(empty()));
+
+               final Object recovered = 
configurationUnderTest.get(TEST_OPTION);
+               assertThat(recovered, is(nullValue()));
+
+               final List<Integer> recoveredList = 
ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, 
Integer::valueOf);
+               assertThat(recoveredList, is(empty()));
+       }
+
+       @Test
+       public void emptyCollectionPutsNothingInConfig() {
+               final Configuration configurationUnderTest = new 
Configuration();
+               ConfigUtils.encodeCollectionToConfig(configurationUnderTest, 
TEST_OPTION, Collections.emptyList(), Object::toString);
+
+               assertThat(configurationUnderTest.keySet(), is(empty()));
+
+               final Object recovered = 
configurationUnderTest.get(TEST_OPTION);
+               assertThat(recovered, is(nullValue()));
+
+               final List<Integer> recoveredList = 
ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, 
Integer::valueOf);
+               assertThat(recoveredList, is(empty()));
+       }
+
+       @Test
+       public void emptyArrayPutsNothingInConfig() {
+               final Configuration configurationUnderTest = new 
Configuration();
+               ConfigUtils.encodeArrayToConfig(configurationUnderTest, 
TEST_OPTION, new Integer[5], Object::toString);
+
+               assertThat(configurationUnderTest.keySet(), is(empty()));
+
+               final Object recovered = 
configurationUnderTest.get(TEST_OPTION);
+               assertThat(recovered, is(nullValue()));
+
+               final List<Integer> recoveredList = 
ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, 
Integer::valueOf);
+               assertThat(recoveredList, is(empty()));
+       }
+}

Reply via email to