Repository: kylin Updated Branches: refs/heads/v1.6.0-rc1 a8922e87b -> 8172d0ba3
KYLIN-1726 apply kylin_job_conf.xml on KafkaFlatTableJob Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8172d0ba Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8172d0ba Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8172d0ba Branch: refs/heads/v1.6.0-rc1 Commit: 8172d0ba3e6768c803528b457db6cc3a464c8e14 Parents: a8922e8 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Nov 1 15:27:52 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Nov 1 15:27:52 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8172d0ba/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index e20b20a..5fe6e00 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -18,6 +18,7 @@ package org.apache.kylin.source.kafka.hadoop; +import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; @@ -97,6 +98,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic); } + JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));