This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new ed20926  KYLIN-4115 Always  load kafkaConsumerProperties
ed20926 is described below

commit ed2092616211e3306d35aeaa0296e6d41b55b273
Author: chenzhx <c...@apache.org>
AuthorDate: Fri Jul 26 18:18:13 2019 +0800

    KYLIN-4115 Always  load kafkaConsumerProperties
---
 .../main/java/org/apache/kylin/source/kafka/KafkaSource.java   |  5 ++++-
 .../java/org/apache/kylin/source/kafka/util/KafkaClient.java   | 10 +++++-----
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index d261201..440c5b3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -45,6 +46,7 @@ import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.hive.HiveMetadataExplorer;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,7 +108,8 @@ public class KafkaSource implements ISource {
                 .getKafkaConfig(cube.getRootFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
         final String topic = kafkaConfig.getTopic();
-        try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+        Properties property = 
KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
+        try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cube.getName(), property)) {
             final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
             logger.info("Get {} partitions for topic {} ", 
partitionInfos.size(), topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index a781f8a..e8ce87d 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -57,16 +57,16 @@ public class KafkaClient {
 
     private static Properties constructDefaultKafkaConsumerProperties(String 
brokers, String consumerGroup, Properties properties) {
         Properties props = new Properties();
-        if (properties != null) {
-            for (Map.Entry entry : properties.entrySet()) {
-                props.put(entry.getKey(), entry.getValue());
-            }
-        }
         props.put("bootstrap.servers", brokers);
         props.put("key.deserializer", StringDeserializer.class.getName());
         props.put("value.deserializer", StringDeserializer.class.getName());
         props.put("group.id", consumerGroup);
         props.put("enable.auto.commit", "false");
+        if (properties != null) {
+            for (Map.Entry entry : properties.entrySet()) {
+                props.put(entry.getKey(), entry.getValue());
+            }
+        }
         return props;
     }
 

Reply via email to