This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new eacd108 Fix connectors nested configs (#4067) eacd108 is described below commit eacd1082421c8e463b05f7f0b95e092b0fb5310b Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Apr 17 17:15:27 2019 -0700 Fix connectors nested configs (#4067) * fix update connectors with nested configs * clean up logging --- .../apache/pulsar/functions/utils/SinkConfigUtils.java | 14 ++++++++++++-- .../pulsar/functions/utils/SourceConfigUtils.java | 17 +++++++++++++++-- .../pulsar/functions/utils/SinkConfigUtilsTest.java | 10 +++++++++- .../pulsar/functions/utils/SourceConfigUtilsTest.java | 10 +++++++++- 4 files changed, 45 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index bbbe533..d0e71c8 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.utils; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.AllArgsConstructor; @@ -31,6 +32,7 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -239,8 +241,16 @@ public class SinkConfigUtils { sinkConfig.setArchive("builtin://" + functionDetails.getSink().getBuiltin()); } if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs())) { - Type type = new TypeToken<Map<String, String>>() {}.getType(); - sinkConfig.setConfigs(new Gson().fromJson(functionDetails.getSink().getConfigs(), type)); + TypeReference<HashMap<String,Object>> typeRef + = new TypeReference<HashMap<String,Object>>() {}; + Map<String, Object> configMap; + try { + configMap = ObjectMapperFactory.getThreadLocal().readValue(functionDetails.getSink().getConfigs(), typeRef); + } catch (IOException e) { + log.error("Failed to read configs for sink {}", FunctionCommon.getFullyQualifiedName(functionDetails), e); + throw new RuntimeException(e); + } + sinkConfig.setConfigs(configMap); } if (!isEmpty(functionDetails.getSecretsMap())) { Type type = new TypeToken<Map<String, Object>>() {}.getType(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index b4a812d..f223583 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -19,16 +19,19 @@ package org.apache.pulsar.functions.utils; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -38,12 +41,14 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Type; import java.nio.file.Path; +import java.util.HashMap; import java.util.Map; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType; +@Slf4j public class SourceConfigUtils { @Getter @@ -155,8 +160,16 @@ public class SourceConfigUtils { sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin()); } if (!StringUtils.isEmpty(sourceSpec.getConfigs())) { - Type type = new TypeToken<Map<String, String>>() {}.getType(); - sourceConfig.setConfigs(new Gson().fromJson(sourceSpec.getConfigs(), type)); + TypeReference<HashMap<String,Object>> typeRef + = new TypeReference<HashMap<String,Object>>() {}; + Map<String, Object> configMap; + try { + configMap = ObjectMapperFactory.getThreadLocal().readValue(sourceSpec.getConfigs(), typeRef); + } catch (IOException e) { + log.error("Failed to read configs for source {}", FunctionCommon.getFullyQualifiedName(functionDetails), e); + throw new RuntimeException(e); + } + sourceConfig.setConfigs(configMap); } if (!isEmpty(functionDetails.getSecretsMap())) { Type type = new TypeToken<Map<String, Object>>() {}.getType(); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 1bde64d..d72698d 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -53,7 +53,15 @@ public class SinkConfigUtilsTest { inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); sinkConfig.setInputSpecs(inputSpecs); sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - sinkConfig.setConfigs(new HashMap<>()); + + Map<String, String> producerConfigs = new HashMap<>(); + producerConfigs.put("security.protocal", "SASL_PLAINTEXT"); + Map<String, Object> configs = new HashMap<>(); + configs.put("topic", "kafka"); + configs.put("bootstrapServers", "server-1,server-2"); + configs.put("producerConfigProperties", producerConfigs); + + sinkConfig.setConfigs(configs); sinkConfig.setRetainOrdering(false); sinkConfig.setAutoAck(true); sinkConfig.setTimeoutMs(2000l); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index f8c138b..ccdbcdc 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -51,7 +51,15 @@ public class SourceConfigUtilsTest { sourceConfig.setParallelism(1); sourceConfig.setRuntimeFlags("-DKerberos"); sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - sourceConfig.setConfigs(new HashMap<>()); + + Map<String, String> consumerConfigs = new HashMap<>(); + consumerConfigs.put("security.protocal", "SASL_PLAINTEXT"); + Map<String, Object> configs = new HashMap<>(); + configs.put("topic", "kafka"); + configs.put("bootstrapServers", "server-1,server-2"); + configs.put("consumerConfigProperties", consumerConfigs); + + sourceConfig.setConfigs(configs); Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null)); SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails);