This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1147676c605d3a91a9b3ad7e91cb1bd070a3e025 Author: Bonan Hou <[email protected]> AuthorDate: Tue Jul 26 10:02:13 2022 +0800 fix lafla source config when consumerConfigProperties='' (#16731) (cherry picked from commit 1206a37246317cdafd356dc7bd0caf9c2cc9cbc7) --- .../org/apache/pulsar/io/kafka/KafkaSourceConfig.java | 2 ++ .../io/kafka/source/KafkaAbstractSourceTest.java | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java index fd2d130840b..28b5944ff30 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.kafka; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import java.io.File; @@ -154,6 +155,7 @@ public class KafkaSourceConfig implements Serializable { public static KafkaSourceConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); + mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT); return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSourceConfig.class); } } \ No newline at end of file diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 89797636de9..a9a5c22eb41 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.source; +import com.google.common.collect.ImmutableMap; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.security.auth.SecurityProtocol; @@ -41,6 +42,7 @@ import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; @@ -100,6 +102,23 @@ public class KafkaAbstractSourceTest { source.close(); } + @Test + public void loadConsumerConfigPropertiesFromMapTest() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put("consumerConfigProperties", ""); + KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config); + assertNotNull(kafkaSourceConfig); + assertNull(kafkaSourceConfig.getConsumerConfigProperties()); + + config.put("consumerConfigProperties", null); + kafkaSourceConfig = KafkaSourceConfig.load(config); + assertNull(kafkaSourceConfig.getConsumerConfigProperties()); + + config.put("consumerConfigProperties", ImmutableMap.of("foo", "bar")); + kafkaSourceConfig = KafkaSourceConfig.load(config); + assertEquals(kafkaSourceConfig.getConsumerConfigProperties(), ImmutableMap.of("foo", "bar")); + } + @Test public final void loadFromYamlFileTest() throws IOException { File yamlFile = getFile("kafkaSourceConfig.yaml");
