This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 0ec1c4d [FLINK-20018] Allow escaping in 'pipeline.cached-files' and 'pipeline.default-kryo-serializers' 0ec1c4d is described below commit 0ec1c4d6ccf7c811b0328cf00e0833ae588aa112 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Nov 6 11:07:31 2020 +0100 [FLINK-20018] Allow escaping in 'pipeline.cached-files' and 'pipeline.default-kryo-serializers' This commit enables escaping in options that expect a map of string-string entries. It lets users pass options such as e.g. pipeline.cached-files=name:file1,path:'oss://bucket/file1' --- .../apache/flink/api/common/ExecutionConfig.java | 12 +--- .../flink/api/common/cache/DistributedCache.java | 11 +--- .../flink/configuration/ConfigurationUtils.java | 24 ++++++++ .../configuration/StructuredOptionsSplitter.java | 2 - .../StructuredOptionsSplitterTest.java | 5 +- ...ecutionEnvironmentComplexConfigurationTest.java | 66 +++++++++++++++++++++- 6 files changed, 93 insertions(+), 27 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 73ff232..8526997 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.MetricOptions; @@ -33,7 +34,6 @@ import org.apache.flink.util.Preconditions; import com.esotericsoftware.kryo.Serializer; import java.io.Serializable; -import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -1234,15 +1234,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut ClassLoader classLoader, List<String> kryoSerializers) { return kryoSerializers.stream() - .map(v -> Arrays.stream(v.split(",")) - .map(p -> p.split(":")) - .collect( - Collectors.toMap( - arr -> arr[0], // entry key - arr -> arr[1] // entry value - ) - ) - ) + .map(ConfigurationUtils::parseMap) .collect(Collectors.toMap( m -> loadClass(m.get("class"), classLoader, "Could not load class for kryo serialization"), m -> loadClass(m.get("serializer"), classLoader, "Could not load serializer's class"), diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java index 079cfab..8b58b97 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.cache; import org.apache.flink.annotation.Public; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.core.fs.Path; import java.io.File; @@ -197,15 +198,7 @@ public class DistributedCache { */ public static List<Tuple2<String, DistributedCacheEntry>> parseCachedFilesFromString(List<String> files) { return files.stream() - .map(v -> Arrays.stream(v.split(",")) - .map(p -> p.split(":")) - .collect( - Collectors.toMap( - arr -> arr[0], // key name - arr -> arr[1] // value - ) - ) - ) + .map(ConfigurationUtils::parseMap) .map(m -> Tuple2.of( m.get("name"), new DistributedCacheEntry( diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index d4ff3b4..f2ca5c8 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL; @@ -79,6 +80,29 @@ public class ConfigurationUtils { return splitPaths(configValue); } + /** + * Parses a string as a map of strings. The expected format of the map is: + * <pre> + * key1:value1,key2:value2 + * </pre> + * + * <p>Parts of the string can be escaped by wrapping with single or double quotes. + * + * @param stringSerializedMap a string to parse + * + * @return parsed map + */ + public static Map<String, String> parseMap(String stringSerializedMap) { + return StructuredOptionsSplitter.splitEscaped(stringSerializedMap, ',').stream() + .map(p -> StructuredOptionsSplitter.splitEscaped(p, ':')) + .collect( + Collectors.toMap( + arr -> arr.get(0), // key name + arr -> arr.get(1) // value + ) + ); + } + public static Time getStandaloneClusterStartupPeriodTime(Configuration configuration) { final Time timeout; long standaloneClusterStartupPeriodTime = configuration.getLong(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java b/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java index 8be69c6..a237969 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java @@ -161,8 +161,6 @@ class StructuredOptionsSplitter { char c = string.charAt(i); if (c == delimiter) { return i; - } else if (c == '\'' || c == '"') { - throw new IllegalArgumentException("Could not split string. Illegal quoting at position: " + i); } builder.append(c); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java b/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java index 323e55d..ae73d6a 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java @@ -53,7 +53,7 @@ public class StructuredOptionsSplitterTest { TestSpec.split("'A;B';'C'", ';').expect("A;B", "C"), TestSpec.split("A;B;C", ';').expect("A", "B", "C"), TestSpec.split("'AB''D;B';C", ';').expect("AB'D;B", "C"), - TestSpec.split("A'BD;B';C", ';').expectException("Could not split string. Illegal quoting at position: 1"), + TestSpec.split("A'BD;B';C", ';').expect("A'BD", "B'", "C"), TestSpec.split("'AB'D;B;C", ';').expectException("Could not split string. Illegal quoting at position: 3"), TestSpec.split("'A", ';').expectException("Could not split string. Quoting was not closed properly."), TestSpec.split("C;'", ';').expectException("Could not split string. Quoting was not closed properly."), @@ -63,8 +63,7 @@ public class StructuredOptionsSplitterTest { TestSpec.split("\"A;B\";\"C\"", ';').expect("A;B", "C"), TestSpec.split("A;B;C", ';').expect("A", "B", "C"), TestSpec.split("\"AB\"\"D;B\";C", ';').expect("AB\"D;B", "C"), - TestSpec.split("A\"BD;B\";C", ';') - .expectException("Could not split string. Illegal quoting at position: 1"), + TestSpec.split("A\"BD;B\";C", ';').expect("A\"BD", "B\"", "C"), TestSpec.split("\"AB\"D;B;C", ';') .expectException("Could not split string. Illegal quoting at position: 3"), TestSpec.split("\"A", ';').expectException("Could not split string. Quoting was not closed properly."), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java index d966afc..c140875 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java @@ -30,11 +30,16 @@ import org.apache.flink.core.execution.JobListener; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.junit.Test; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; import static org.hamcrest.CoreMatchers.equalTo; @@ -66,21 +71,50 @@ public class StreamExecutionEnvironmentComplexConfigurationTest { @Test public void testLoadingCachedFilesFromConfiguration() { StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment(); - envFromConfiguration.registerCachedFile("/tmp3", "file3", true); + envFromConfiguration.registerCachedFile("/tmp4", "file4", true); Configuration configuration = new Configuration(); - configuration.setString("pipeline.cached-files", "name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2"); + configuration.setString( + "pipeline.cached-files", + "name:file1,path:/tmp1,executable:true;" + + "name:file2,path:/tmp2;" + + "name:file3,path:'oss://bucket/file1'"); // mutate config according to configuration envFromConfiguration.configure(configuration, Thread.currentThread().getContextClassLoader()); assertThat(envFromConfiguration.getCachedFiles(), equalTo(Arrays.asList( Tuple2.of("file1", new DistributedCache.DistributedCacheEntry("/tmp1", true)), - Tuple2.of("file2", new DistributedCache.DistributedCacheEntry("/tmp2", false)) + Tuple2.of("file2", new DistributedCache.DistributedCacheEntry("/tmp2", false)), + Tuple2.of( + "file3", + new DistributedCache.DistributedCacheEntry("oss://bucket/file1", false)) ))); } @Test + public void testLoadingKryoSerializersFromConfiguration() { + StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment(); + + Configuration configuration = new Configuration(); + configuration.setString( + "pipeline.default-kryo-serializers", + "class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo'" + + ",serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'"); + + // mutate config according to configuration + envFromConfiguration.configure( + configuration, + Thread.currentThread().getContextClassLoader()); + + LinkedHashMap<Object, Object> serializers = new LinkedHashMap<>(); + serializers.put(CustomPojo.class, CustomPojoSerializer.class); + assertThat( + envFromConfiguration.getConfig().getDefaultKryoSerializerClasses(), + equalTo(serializers)); + } + + @Test public void testNotOverridingStateBackendWithDefaultsFromConfiguration() { StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment(); envFromConfiguration.setStateBackend(new MemoryStateBackend()); @@ -156,4 +190,30 @@ public class StreamExecutionEnvironmentComplexConfigurationTest { } } + + /** + * A dummy class to specify a Kryo serializer for. + */ + public static class CustomPojo { + } + + /** + * A dummy Kryo serializer which can be registered. + */ + public static class CustomPojoSerializer extends Serializer<CustomPojo> { + @Override + public void write( + Kryo kryo, + Output output, + CustomPojo object) { + } + + @Override + public CustomPojo read( + Kryo kryo, + Input input, + Class<CustomPojo> type) { + return null; + } + } }