KYLIN-1726 apply kylin_job_conf.xml on KafkaFlatTableJob
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5ffc5fa0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5ffc5fa0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5ffc5fa0 Branch: refs/heads/KYLIN-1971 Commit: 5ffc5fa0aa67fa4caf30a58d1b08f64eaba5d3f7 Parents: effb57e Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Nov 1 15:27:52 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Nov 2 16:56:00 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5ffc5fa0/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index e20b20a..5fe6e00 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -18,6 +18,7 @@ package org.apache.kylin.source.kafka.hadoop; +import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; @@ -97,6 +98,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic); } + JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));