reswqa commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464593996


##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -255,6 +256,51 @@ public class PipelineOptions {
                                                     + " sure that only tags 
are written.")
                                     .build());
 
+    public static final ConfigOption<Map<String, String>> SERIALIZATION_CONFIG 
=

Review Comment:
   Could you please post a screenshot corresponding to the rendered config 
option.



##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -69,8 +75,11 @@ public final class SerializerConfig implements Serializable {
 
     private LinkedHashSet<Class<?>> registeredPojoTypes = new 
LinkedHashSet<>();
 
-    private LinkedHashMap<Class<?>, Class<TypeInfoFactory<?>>> 
registeredTypeFactories =
-            new LinkedHashMap<>();
+    // Order is not required as we will traverse the type hierarchy up to find 
the closest type
+    // information factory
+    // when extracting the type information.

Review Comment:
   This new line seems unnecessary.



##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -408,4 +431,104 @@ private <T extends Class> T loadClass(
             throw new IllegalArgumentException(errorMessage, e);
         }
     }
+
+    private void parseSerializationConfigWithExceptionHandling(
+            ClassLoader classLoader, Map<String, String> serializationConfigs) 
{
+        try {
+            parseSerializationConfig(classLoader, serializationConfigs);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    String.format("Could not configure serializers from %s.", 
serializationConfigs),
+                    e);
+        }
+    }
+
+    private void parseSerializationConfig(
+            ClassLoader classLoader, Map<String, String> serializationConfigs) 
{
+        final Map<Class<?>, Map<String, String>> serializationConfigByClass =
+                serializationConfigs.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        e ->
+                                                loadClass(
+                                                        e.getKey(),
+                                                        classLoader,
+                                                        "Could not load class 
for serialization config"),
+                                        e -> 
ConfigurationUtils.parseStringToMap(e.getValue())));
+        for (Map.Entry<Class<?>, Map<String, String>> entry :
+                serializationConfigByClass.entrySet()) {
+            Class<?> type = entry.getKey();
+            Map<String, String> config = entry.getValue();
+            String configType = config.get("type");

Review Comment:
   We may need to throw more explicit exceptions for Null case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to