This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new ed20926 KYLIN-4115 Always load kafkaConsumerProperties ed20926 is described below commit ed2092616211e3306d35aeaa0296e6d41b55b273 Author: chenzhx <c...@apache.org> AuthorDate: Fri Jul 26 18:18:13 2019 +0800 KYLIN-4115 Always load kafkaConsumerProperties --- .../main/java/org/apache/kylin/source/kafka/KafkaSource.java | 5 ++++- .../java/org/apache/kylin/source/kafka/util/KafkaClient.java | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index d261201..440c5b3 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -45,6 +46,7 @@ import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.hive.HiveMetadataExplorer; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.apache.kylin.source.kafka.util.KafkaClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +108,8 @@ public class KafkaSource implements ISource { .getKafkaConfig(cube.getRootFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); final String topic = kafkaConfig.getTopic(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { + Properties property = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), property)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); logger.info("Get {} partitions for topic {} ", partitionInfos.size(), topic); for (PartitionInfo partitionInfo : partitionInfos) { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index a781f8a..e8ce87d 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -57,16 +57,16 @@ public class KafkaClient { private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { Properties props = new Properties(); - if (properties != null) { - for (Map.Entry entry : properties.entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } - } props.put("bootstrap.servers", brokers); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("group.id", consumerGroup); props.put("enable.auto.commit", "false"); + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } return props; }