KYLIN-1726 apply kylin_job_conf.xml on KafkaFlatTableJob

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8172d0ba
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8172d0ba
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8172d0ba

Branch: refs/heads/v1.6.0-rc1-cdh5.7
Commit: 8172d0ba3e6768c803528b457db6cc3a464c8e14
Parents: a8922e8
Author: shaofengshi <shaofeng...@apache.org>
Authored: Tue Nov 1 15:27:52 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Tue Nov 1 15:27:52 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java   | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8172d0ba/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index e20b20a..5fe6e00 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.source.kafka.hadoop;
 
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
@@ -97,6 +98,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
                 throw new IllegalArgumentException("Invalid Kafka information, 
brokers " + brokers + ", topic " + topic);
             }
 
+            JobEngineConfig jobEngineConfig = new 
JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            job.getConfiguration().addResource(new 
Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, 
String.valueOf(kafkaConfig.getTimeout()));

Reply via email to