This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eacd108  Fix connectors nested configs (#4067)
eacd108 is described below

commit eacd1082421c8e463b05f7f0b95e092b0fb5310b
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed Apr 17 17:15:27 2019 -0700

    Fix connectors nested configs (#4067)
    
    * fix update connectors with nested configs
    
    * clean up logging
---
 .../apache/pulsar/functions/utils/SinkConfigUtils.java  | 14 ++++++++++++--
 .../pulsar/functions/utils/SourceConfigUtils.java       | 17 +++++++++++++++--
 .../pulsar/functions/utils/SinkConfigUtilsTest.java     | 10 +++++++++-
 .../pulsar/functions/utils/SourceConfigUtilsTest.java   | 10 +++++++++-
 4 files changed, 45 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index bbbe533..d0e71c8 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.functions.utils;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.AllArgsConstructor;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -239,8 +241,16 @@ public class SinkConfigUtils {
             sinkConfig.setArchive("builtin://" + 
functionDetails.getSink().getBuiltin());
         }
         if 
(!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs()))
 {
-            Type type = new TypeToken<Map<String, String>>() {}.getType();
-            sinkConfig.setConfigs(new 
Gson().fromJson(functionDetails.getSink().getConfigs(), type));
+            TypeReference<HashMap<String,Object>> typeRef
+                    = new TypeReference<HashMap<String,Object>>() {};
+            Map<String, Object> configMap;
+            try {
+               configMap = 
ObjectMapperFactory.getThreadLocal().readValue(functionDetails.getSink().getConfigs(),
 typeRef);
+            } catch (IOException e) {
+                log.error("Failed to read configs for sink {}", 
FunctionCommon.getFullyQualifiedName(functionDetails), e);
+                throw new RuntimeException(e);
+            }
+            sinkConfig.setConfigs(configMap);
         }
         if (!isEmpty(functionDetails.getSecretsMap())) {
             Type type = new TypeToken<Map<String, Object>>() {}.getType();
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index b4a812d..f223583 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -19,16 +19,19 @@
 
 package org.apache.pulsar.functions.utils;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -38,12 +41,14 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.file.Path;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static 
org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
 
+@Slf4j
 public class SourceConfigUtils {
 
     @Getter
@@ -155,8 +160,16 @@ public class SourceConfigUtils {
             sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin());
         }
         if (!StringUtils.isEmpty(sourceSpec.getConfigs())) {
-            Type type = new TypeToken<Map<String, String>>() {}.getType();
-            sourceConfig.setConfigs(new 
Gson().fromJson(sourceSpec.getConfigs(), type));
+            TypeReference<HashMap<String,Object>> typeRef
+                    = new TypeReference<HashMap<String,Object>>() {};
+            Map<String, Object> configMap;
+            try {
+                configMap = 
ObjectMapperFactory.getThreadLocal().readValue(sourceSpec.getConfigs(), 
typeRef);
+            } catch (IOException e) {
+                log.error("Failed to read configs for source {}", 
FunctionCommon.getFullyQualifiedName(functionDetails), e);
+                throw new RuntimeException(e);
+            }
+            sourceConfig.setConfigs(configMap);
         }
         if (!isEmpty(functionDetails.getSecretsMap())) {
             Type type = new TypeToken<Map<String, Object>>() {}.getType();
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 1bde64d..d72698d 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -53,7 +53,15 @@ public class SinkConfigUtilsTest {
         inputSpecs.put("test-input", 
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
         sinkConfig.setInputSpecs(inputSpecs);
         
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        sinkConfig.setConfigs(new HashMap<>());
+
+        Map<String, String> producerConfigs = new HashMap<>();
+        producerConfigs.put("security.protocal", "SASL_PLAINTEXT");
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("topic", "kafka");
+        configs.put("bootstrapServers", "server-1,server-2");
+        configs.put("producerConfigProperties", producerConfigs);
+
+        sinkConfig.setConfigs(configs);
         sinkConfig.setRetainOrdering(false);
         sinkConfig.setAutoAck(true);
         sinkConfig.setTimeoutMs(2000l);
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index f8c138b..ccdbcdc 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -51,7 +51,15 @@ public class SourceConfigUtilsTest {
         sourceConfig.setParallelism(1);
         sourceConfig.setRuntimeFlags("-DKerberos");
         
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        sourceConfig.setConfigs(new HashMap<>());
+
+        Map<String, String> consumerConfigs = new HashMap<>();
+        consumerConfigs.put("security.protocal", "SASL_PLAINTEXT");
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("topic", "kafka");
+        configs.put("bootstrapServers", "server-1,server-2");
+        configs.put("consumerConfigProperties", consumerConfigs);
+
+        sourceConfig.setConfigs(configs);
         Function.FunctionDetails functionDetails = 
SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
         SourceConfig convertedConfig = 
SourceConfigUtils.convertFromDetails(functionDetails);
 

Reply via email to