This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6ac09fb42a1d8cda99500b23f7247f19a272516 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Wed Nov 13 14:33:54 2019 +0100 [hotfix] Change the signature of the ConfigUtils.encodeCollectionToConfig() --- .../flink/client/cli/ExecutionConfigAccessor.java | 6 +- .../apache/flink/configuration/ConfigUtils.java | 38 ++++--- .../flink/configuration/ConfigUtilsTest.java | 121 +++++++++++++++++++++ 3 files changed, 148 insertions(+), 17 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index 72a338a..77c0440 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -65,9 +65,7 @@ public class ExecutionConfigAccessor { configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode()); configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit()); - if (options.getClasspaths() != null) { - ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths().stream(), URL::toString); - } + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString); parseJarURLToConfig(options.getJarFilePath(), configuration); @@ -84,7 +82,7 @@ public class ExecutionConfigAccessor { try { final URL jarUrl = new File(jarFile).getAbsoluteFile().toURI().toURL(); final List<URL> jarUrlSingleton = Collections.singletonList(jarUrl); - ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.JARS, jarUrlSingleton.stream(), URL::toString); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jarUrlSingleton, URL::toString); } catch (MalformedURLException e) { throw new IllegalArgumentException("JAR file path invalid", e); } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java index be95041..4b41389 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java @@ -20,13 +20,16 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.Internal; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -38,37 +41,45 @@ public class ConfigUtils { /** * Puts an array of values of type {@code IN} in a {@link WritableConfig} - * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. + * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. If the {@code values} + * is {@code null} or empty, then nothing is put in the configuration. * * @param configuration the configuration object to put the list in * @param key the {@link ConfigOption option} to serve as the key for the list in the configuration - * @param value the array of values to put as value for the {@code key} + * @param values the array of values to put as value for the {@code key} * @param mapper the transformation function from {@code IN} to {@code OUT}. */ public static <IN, OUT> void encodeArrayToConfig( final WritableConfig configuration, final ConfigOption<List<OUT>> key, - final IN[] value, + @Nullable final IN[] values, final Function<IN, OUT> mapper) { - if (value == null) { + + checkNotNull(configuration); + checkNotNull(key); + checkNotNull(mapper); + + if (values == null) { return; } - encodeStreamToConfig(configuration, key, Arrays.stream(value), mapper); + + encodeCollectionToConfig(configuration, key, Arrays.asList(values), mapper); } /** - * Puts a {@link Stream} of values of type {@code IN} in a {@link WritableConfig} - * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. + * Puts a {@link Collection} of values of type {@code IN} in a {@link WritableConfig} + * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. If the {@code values} + * is {@code null} or empty, then nothing is put in the configuration. * * @param configuration the configuration object to put the list in * @param key the {@link ConfigOption option} to serve as the key for the list in the configuration - * @param values the stream of values to put as value for the {@code key} + * @param values the collection of values to put as value for the {@code key} * @param mapper the transformation function from {@code IN} to {@code OUT}. */ - public static <IN, OUT> void encodeStreamToConfig( + public static <IN, OUT> void encodeCollectionToConfig( final WritableConfig configuration, final ConfigOption<List<OUT>> key, - final Stream<IN> values, + @Nullable final Collection<IN> values, final Function<IN, OUT> mapper) { checkNotNull(configuration); @@ -79,10 +90,11 @@ public class ConfigUtils { return; } - final List<OUT> encodedOption = values + final List<OUT> encodedOption = values.stream() + .filter(Objects::nonNull) .map(mapper) .filter(Objects::nonNull) - .collect(Collectors.toList()); + .collect(Collectors.toCollection(ArrayList::new)); if (!encodedOption.isEmpty()) { configuration.set(key, encodedOption); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigUtilsTest.java new file mode 100644 index 0000000..2edbd9f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigUtilsTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests the {@link ConfigUtils} methods. + */ +public class ConfigUtilsTest { + + private static final ConfigOption<List<String>> TEST_OPTION = + key("test.option.key") + .stringType() + .asList() + .noDefaultValue(); + + private static final Integer[] intArray = {1, 3, 2, 4}; + private static final List<Integer> intList = Arrays.asList(intArray); + + @Test + public void collectionIsCorrectlyPutAndFetched() { + final Configuration configurationUnderTest = new Configuration(); + ConfigUtils.encodeCollectionToConfig(configurationUnderTest, TEST_OPTION, intList, Object::toString); + + final List<Integer> recovered = ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, Integer::valueOf); + assertThat(recovered, equalTo(intList)); + } + + @Test + public void arrayIsCorrectlyPutAndFetched() { + final Configuration configurationUnderTest = new Configuration(); + ConfigUtils.encodeArrayToConfig(configurationUnderTest, TEST_OPTION, intArray, Object::toString); + + final List<Integer> recovered = ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, Integer::valueOf); + assertThat(recovered, equalTo(intList)); + } + + @Test + public void nullCollectionPutsNothingInConfig() { + final Configuration configurationUnderTest = new Configuration(); + ConfigUtils.encodeCollectionToConfig(configurationUnderTest, TEST_OPTION, null, Object::toString); + + assertThat(configurationUnderTest.keySet(), is(empty())); + + final Object recovered = configurationUnderTest.get(TEST_OPTION); + assertThat(recovered, is(nullValue())); + + final List<Integer> recoveredList = ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, Integer::valueOf); + assertThat(recoveredList, is(empty())); + } + + @Test + public void nullArrayPutsNothingInConfig() { + final Configuration configurationUnderTest = new Configuration(); + ConfigUtils.encodeArrayToConfig(configurationUnderTest, TEST_OPTION, null, Object::toString); + + assertThat(configurationUnderTest.keySet(), is(empty())); + + final Object recovered = configurationUnderTest.get(TEST_OPTION); + assertThat(recovered, is(nullValue())); + + final List<Integer> recoveredList = ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, Integer::valueOf); + assertThat(recoveredList, is(empty())); + } + + @Test + public void emptyCollectionPutsNothingInConfig() { + final Configuration configurationUnderTest = new Configuration(); + ConfigUtils.encodeCollectionToConfig(configurationUnderTest, TEST_OPTION, Collections.emptyList(), Object::toString); + + assertThat(configurationUnderTest.keySet(), is(empty())); + + final Object recovered = configurationUnderTest.get(TEST_OPTION); + assertThat(recovered, is(nullValue())); + + final List<Integer> recoveredList = ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, Integer::valueOf); + assertThat(recoveredList, is(empty())); + } + + @Test + public void emptyArrayPutsNothingInConfig() { + final Configuration configurationUnderTest = new Configuration(); + ConfigUtils.encodeArrayToConfig(configurationUnderTest, TEST_OPTION, new Integer[5], Object::toString); + + assertThat(configurationUnderTest.keySet(), is(empty())); + + final Object recovered = configurationUnderTest.get(TEST_OPTION); + assertThat(recovered, is(nullValue())); + + final List<Integer> recoveredList = ConfigUtils.decodeListFromConfig(configurationUnderTest, TEST_OPTION, Integer::valueOf); + assertThat(recoveredList, is(empty())); + } +}