KYLIN-1726 apply kylin_job_conf.xml on KafkaFlatTableJob

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5ffc5fa0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5ffc5fa0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5ffc5fa0

Branch: refs/heads/KYLIN-1971
Commit: 5ffc5fa0aa67fa4caf30a58d1b08f64eaba5d3f7
Parents: effb57e
Author: shaofengshi <shaofeng...@apache.org>
Authored: Tue Nov 1 15:27:52 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Wed Nov 2 16:56:00 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java   | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5ffc5fa0/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index e20b20a..5fe6e00 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.source.kafka.hadoop;
 
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
@@ -97,6 +98,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
                 throw new IllegalArgumentException("Invalid Kafka information, 
brokers " + brokers + ", topic " + topic);
             }
 
+            JobEngineConfig jobEngineConfig = new 
JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            job.getConfiguration().addResource(new 
Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, 
String.valueOf(kafkaConfig.getTimeout()));

Reply via email to