This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new f45d21d KYLIN-4320 number of replicas of Cuboid files cannot be configured for Spark engine f45d21d is described below commit f45d21d4591a771a899d41c5a748c09b8ab90293 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Tue Apr 28 16:38:40 2020 +0800 KYLIN-4320 number of replicas of Cuboid files cannot be configured for Spark engine --- .../src/main/java/org/apache/kylin/common/KylinConfigBase.java | 4 ++++ .../main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java | 2 +- .../src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java | 4 ++-- .../src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java | 3 ++- .../src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java | 3 +++ .../main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java | 3 +++ .../main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java | 2 +- .../src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java | 2 +- .../src/main/java/org/apache/kylin/engine/spark/SparkUtil.java | 4 ++-- 9 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3490b7a..16c07c1 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1432,6 +1432,10 @@ public abstract class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override."); } + public String getCuboidDfsReplication() { + return getOptional("kylin.engine.cuboid.dfs.replication", "2"); + } + public Map<String, String> getSparkConfigOverride() { return getPropertiesByPrefix("kylin.engine.spark-conf."); } diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java index 3c532a1..bf0b686 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java @@ -223,7 +223,7 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa final KylinConfig kylinConfig) throws Exception { final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration()); - FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress + FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication and enable compress FlinkUtil.setHadoopConfForCuboid(job, cubeSeg, metaUrl); HadoopOutputFormat<Text, Text> hadoopOF = diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java index 938939f..3d8aac3 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java @@ -121,7 +121,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl } Job job = Job.getInstance(); - FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress + FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication and enable compress HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath)); final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration()); @@ -206,7 +206,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl final String cuboidOutputPath = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level); Job jobInstanceForEachOutputFormat = Job.getInstance(); - FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication=2 and enable compress + FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication and enable compress FlinkUtil.setHadoopConfForCuboid(jobInstanceForEachOutputFormat, cubeSegment, metaUrl); FileOutputFormat.setOutputPath(jobInstanceForEachOutputFormat, new Path(cuboidOutputPath)); diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java index 73f532a..188ddef 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; @@ -102,7 +103,7 @@ public class FlinkUtil { } public static void modifyFlinkHadoopConfiguration(Job job) throws Exception { - job.getConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2 + job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication()); // cuboid intermediate files job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK"); job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index d35f6b6..f12c731 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -144,6 +144,9 @@ public class CuboidJob extends AbstractHadoopJob { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); + //set dfs.replication + job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication()); + // set input configureMapperInputFormat(segment); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index 311ec4f..da7c25e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -76,6 +76,9 @@ public class MergeCuboidJob extends CuboidJob { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); + //set dfs.replication + job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication()); + // set inputs IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getOutputFormat(); outputFormat.configureJobInput(job, input); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 1c0c18f..381ef13 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -130,7 +130,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa JavaSparkContext sc = new JavaSparkContext(conf); sc.sc().addSparkListener(jobListener); HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); - SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress + SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java index a3b13a8..c329f3c 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java @@ -112,7 +112,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray); try (JavaSparkContext sc = new JavaSparkContext(conf)) { - SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress + SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress KylinSparkJobListener jobListener = new KylinSparkJobListener(); sc.sc().addSparkListener(jobListener); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java index fcd24f1..aabc767 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java @@ -137,8 +137,8 @@ public class SparkUtil { job.setOutputFormatClass(SequenceFileOutputFormat.class); } - public static void modifySparkHadoopConfiguration(SparkContext sc) throws Exception { - sc.hadoopConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2 + public static void modifySparkHadoopConfiguration(SparkContext sc, KylinConfig kylinConfig) throws Exception { + sc.hadoopConfiguration().set("dfs.replication", kylinConfig.getCuboidDfsReplication()); // cuboid intermediate files sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK"); sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec