This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 0ec1c4d  [FLINK-20018] Allow escaping in 'pipeline.cached-files' and 
'pipeline.default-kryo-serializers'
0ec1c4d is described below

commit 0ec1c4d6ccf7c811b0328cf00e0833ae588aa112
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Nov 6 11:07:31 2020 +0100

    [FLINK-20018] Allow escaping in 'pipeline.cached-files' and 
'pipeline.default-kryo-serializers'
    
    This commit enables escaping in options that expect a map of
    string-string entries. It lets users pass options such as e.g.
    
    pipeline.cached-files=name:file1,path:'oss://bucket/file1'
---
 .../apache/flink/api/common/ExecutionConfig.java   | 12 +---
 .../flink/api/common/cache/DistributedCache.java   | 11 +---
 .../flink/configuration/ConfigurationUtils.java    | 24 ++++++++
 .../configuration/StructuredOptionsSplitter.java   |  2 -
 .../StructuredOptionsSplitterTest.java             |  5 +-
 ...ecutionEnvironmentComplexConfigurationTest.java | 66 +++++++++++++++++++++-
 6 files changed, 93 insertions(+), 27 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 73ff232..8526997 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.MetricOptions;
@@ -33,7 +34,6 @@ import org.apache.flink.util.Preconditions;
 import com.esotericsoftware.kryo.Serializer;
 
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -1234,15 +1234,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                        ClassLoader classLoader,
                        List<String> kryoSerializers) {
                return kryoSerializers.stream()
-                       .map(v -> Arrays.stream(v.split(","))
-                               .map(p -> p.split(":"))
-                               .collect(
-                                       Collectors.toMap(
-                                               arr -> arr[0], // entry key
-                                               arr -> arr[1] // entry value
-                                       )
-                               )
-                       )
+                       .map(ConfigurationUtils::parseMap)
                        .collect(Collectors.toMap(
                                m -> loadClass(m.get("class"), classLoader, 
"Could not load class for kryo serialization"),
                                m -> loadClass(m.get("serializer"), 
classLoader, "Could not load serializer's class"),
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 079cfab..8b58b97 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.cache;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.core.fs.Path;
 
 import java.io.File;
@@ -197,15 +198,7 @@ public class DistributedCache {
         */
        public static List<Tuple2<String, DistributedCacheEntry>> 
parseCachedFilesFromString(List<String> files) {
                return files.stream()
-                       .map(v -> Arrays.stream(v.split(","))
-                               .map(p -> p.split(":"))
-                               .collect(
-                                       Collectors.toMap(
-                                               arr -> arr[0], // key name
-                                               arr -> arr[1] // value
-                                       )
-                               )
-                       )
+                       .map(ConfigurationUtils::parseMap)
                        .map(m -> Tuple2.of(
                                m.get("name"),
                                new DistributedCacheEntry(
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index d4ff3b4..f2ca5c8 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
@@ -79,6 +80,29 @@ public class ConfigurationUtils {
                return splitPaths(configValue);
        }
 
+       /**
+        * Parses a string as a map of strings. The expected format of the map 
is:
+        * <pre>
+        * key1:value1,key2:value2
+        * </pre>
+        *
+        * <p>Parts of the string can be escaped by wrapping with single or 
double quotes.
+        *
+        * @param stringSerializedMap a string to parse
+        *
+        * @return parsed map
+        */
+       public static Map<String, String> parseMap(String stringSerializedMap) {
+               return 
StructuredOptionsSplitter.splitEscaped(stringSerializedMap, ',').stream()
+                       .map(p -> StructuredOptionsSplitter.splitEscaped(p, 
':'))
+                       .collect(
+                               Collectors.toMap(
+                                       arr -> arr.get(0), // key name
+                                       arr -> arr.get(1) // value
+                               )
+                       );
+       }
+
        public static Time getStandaloneClusterStartupPeriodTime(Configuration 
configuration) {
                final Time timeout;
                long standaloneClusterStartupPeriodTime = 
configuration.getLong(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME);
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
index 8be69c6..a237969 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
@@ -161,8 +161,6 @@ class StructuredOptionsSplitter {
                        char c = string.charAt(i);
                        if (c == delimiter) {
                                return i;
-                       } else if (c == '\'' || c == '"') {
-                               throw new IllegalArgumentException("Could not 
split string. Illegal quoting at position: " + i);
                        }
 
                        builder.append(c);
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
index 323e55d..ae73d6a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
@@ -53,7 +53,7 @@ public class StructuredOptionsSplitterTest {
                        TestSpec.split("'A;B';'C'", ';').expect("A;B", "C"),
                        TestSpec.split("A;B;C", ';').expect("A", "B", "C"),
                        TestSpec.split("'AB''D;B';C", ';').expect("AB'D;B", 
"C"),
-                       TestSpec.split("A'BD;B';C", ';').expectException("Could 
not split string. Illegal quoting at position: 1"),
+                       TestSpec.split("A'BD;B';C", ';').expect("A'BD", "B'", 
"C"),
                        TestSpec.split("'AB'D;B;C", ';').expectException("Could 
not split string. Illegal quoting at position: 3"),
                        TestSpec.split("'A", ';').expectException("Could not 
split string. Quoting was not closed properly."),
                        TestSpec.split("C;'", ';').expectException("Could not 
split string. Quoting was not closed properly."),
@@ -63,8 +63,7 @@ public class StructuredOptionsSplitterTest {
                        TestSpec.split("\"A;B\";\"C\"", ';').expect("A;B", "C"),
                        TestSpec.split("A;B;C", ';').expect("A", "B", "C"),
                        TestSpec.split("\"AB\"\"D;B\";C", 
';').expect("AB\"D;B", "C"),
-                       TestSpec.split("A\"BD;B\";C", ';')
-                               .expectException("Could not split string. 
Illegal quoting at position: 1"),
+                       TestSpec.split("A\"BD;B\";C", ';').expect("A\"BD", 
"B\"", "C"),
                        TestSpec.split("\"AB\"D;B;C", ';')
                                .expectException("Could not split string. 
Illegal quoting at position: 3"),
                        TestSpec.split("\"A", ';').expectException("Could not 
split string. Quoting was not closed properly."),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
index d966afc..c140875 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
@@ -30,11 +30,16 @@ import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -66,21 +71,50 @@ public class 
StreamExecutionEnvironmentComplexConfigurationTest {
        @Test
        public void testLoadingCachedFilesFromConfiguration() {
                StreamExecutionEnvironment envFromConfiguration = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               envFromConfiguration.registerCachedFile("/tmp3", "file3", true);
+               envFromConfiguration.registerCachedFile("/tmp4", "file4", true);
 
                Configuration configuration = new Configuration();
-               configuration.setString("pipeline.cached-files", 
"name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2");
+               configuration.setString(
+                       "pipeline.cached-files",
+                       "name:file1,path:/tmp1,executable:true;"
+                               + "name:file2,path:/tmp2;"
+                               + "name:file3,path:'oss://bucket/file1'");
 
                // mutate config according to configuration
                envFromConfiguration.configure(configuration, 
Thread.currentThread().getContextClassLoader());
 
                assertThat(envFromConfiguration.getCachedFiles(), 
equalTo(Arrays.asList(
                        Tuple2.of("file1", new 
DistributedCache.DistributedCacheEntry("/tmp1", true)),
-                       Tuple2.of("file2", new 
DistributedCache.DistributedCacheEntry("/tmp2", false))
+                       Tuple2.of("file2", new 
DistributedCache.DistributedCacheEntry("/tmp2", false)),
+                       Tuple2.of(
+                               "file3",
+                               new 
DistributedCache.DistributedCacheEntry("oss://bucket/file1", false))
                )));
        }
 
        @Test
+       public void testLoadingKryoSerializersFromConfiguration() {
+               StreamExecutionEnvironment envFromConfiguration = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               Configuration configuration = new Configuration();
+               configuration.setString(
+                       "pipeline.default-kryo-serializers",
+                       
"class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo'"
+                               + 
",serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'");
+
+               // mutate config according to configuration
+               envFromConfiguration.configure(
+                       configuration,
+                       Thread.currentThread().getContextClassLoader());
+
+               LinkedHashMap<Object, Object> serializers = new 
LinkedHashMap<>();
+               serializers.put(CustomPojo.class, CustomPojoSerializer.class);
+               assertThat(
+                       
envFromConfiguration.getConfig().getDefaultKryoSerializerClasses(),
+                       equalTo(serializers));
+       }
+
+       @Test
        public void 
testNotOverridingStateBackendWithDefaultsFromConfiguration() {
                StreamExecutionEnvironment envFromConfiguration = 
StreamExecutionEnvironment.getExecutionEnvironment();
                envFromConfiguration.setStateBackend(new MemoryStateBackend());
@@ -156,4 +190,30 @@ public class 
StreamExecutionEnvironmentComplexConfigurationTest {
 
                }
        }
+
+       /**
+        * A dummy class to specify a Kryo serializer for.
+        */
+       public static class CustomPojo {
+       }
+
+       /**
+        * A dummy Kryo serializer which can be registered.
+        */
+       public static class CustomPojoSerializer extends Serializer<CustomPojo> 
{
+               @Override
+               public void write(
+                               Kryo kryo,
+                               Output output,
+                               CustomPojo object) {
+               }
+
+               @Override
+               public CustomPojo read(
+                               Kryo kryo,
+                               Input input,
+                               Class<CustomPojo> type) {
+                       return null;
+               }
+       }
 }

Reply via email to