This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new f45d21d  KYLIN-4320 number of replicas of Cuboid files cannot be 
configured for Spark engine
f45d21d is described below

commit f45d21d4591a771a899d41c5a748c09b8ab90293
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Tue Apr 28 16:38:40 2020 +0800

    KYLIN-4320 number of replicas of Cuboid files cannot be configured for 
Spark engine
---
 .../src/main/java/org/apache/kylin/common/KylinConfigBase.java        | 4 ++++
 .../main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java   | 2 +-
 .../src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java | 4 ++--
 .../src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java        | 3 ++-
 .../src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java     | 3 +++
 .../main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java    | 3 +++
 .../main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java   | 2 +-
 .../src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java | 2 +-
 .../src/main/java/org/apache/kylin/engine/spark/SparkUtil.java        | 4 ++--
 9 files changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3490b7a..16c07c1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1432,6 +1432,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override.");
     }
 
+    public String getCuboidDfsReplication() {
+        return getOptional("kylin.engine.cuboid.dfs.replication", "2");
+    }
+
     public Map<String, String> getSparkConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 3c532a1..bf0b686 100644
--- 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -223,7 +223,7 @@ public class FlinkCubingByLayer extends AbstractApplication 
implements Serializa
             final KylinConfig kylinConfig) throws Exception {
         final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
         final SerializableConfiguration sConf = new 
SerializableConfiguration(job.getConfiguration());
-        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set 
dfs.replication=2 and enable compress
+        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication 
and enable compress
         FlinkUtil.setHadoopConfForCuboid(job, cubeSeg, metaUrl);
 
         HadoopOutputFormat<Text, Text> hadoopOF =
diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 938939f..3d8aac3 100644
--- 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -121,7 +121,7 @@ public class FlinkCubingMerge extends AbstractApplication 
implements Serializabl
         }
 
         Job job = Job.getInstance();
-        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set 
dfs.replication=2 and enable compress
+        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication 
and enable compress
 
         HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
         final SerializableConfiguration sConf = new 
SerializableConfiguration(job.getConfiguration());
@@ -206,7 +206,7 @@ public class FlinkCubingMerge extends AbstractApplication 
implements Serializabl
                 final String cuboidOutputPath = 
FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
 
                 Job jobInstanceForEachOutputFormat = Job.getInstance();
-                
FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // 
set dfs.replication=2 and enable compress
+                
FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // 
set dfs.replication and enable compress
                 
FlinkUtil.setHadoopConfForCuboid(jobInstanceForEachOutputFormat, cubeSegment, 
metaUrl);
 
                 FileOutputFormat.setOutputPath(jobInstanceForEachOutputFormat, 
new Path(cuboidOutputPath));
diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
index 73f532a..188ddef 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
@@ -102,7 +103,7 @@ public class FlinkUtil {
     }
 
     public static void modifyFlinkHadoopConfiguration(Job job) throws 
Exception {
-        job.getConfiguration().set("dfs.replication", "2"); // cuboid 
intermediate files, replication=2
+        job.getConfiguration().set("dfs.replication", 
KylinConfig.getInstanceFromEnv().getCuboidDfsReplication()); // cuboid 
intermediate files
         
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", 
"true");
         
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", 
"BLOCK");
         
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", 
"org.apache.hadoop.io.compress.DefaultCodec"); // or 
org.apache.hadoop.io.compress.SnappyCodec
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d35f6b6..f12c731 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -144,6 +144,9 @@ public class CuboidJob extends AbstractHadoopJob {
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
+            //set dfs.replication
+            job.getConfiguration().set("dfs.replication", 
KylinConfig.getInstanceFromEnv().getCuboidDfsReplication());
+
             // set input
             configureMapperInputFormat(segment);
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 311ec4f..da7c25e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -76,6 +76,9 @@ public class MergeCuboidJob extends CuboidJob {
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
+            //set dfs.replication
+            job.getConfiguration().set("dfs.replication", 
KylinConfig.getInstanceFromEnv().getCuboidDfsReplication());
+
             // set inputs
             IMROutput2.IMRMergeOutputFormat outputFormat = 
MRUtil.getBatchMergeOutputSide2(cubeSeg).getOutputFormat();
             outputFormat.configureJobInput(job, input);
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 1c0c18f..381ef13 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -130,7 +130,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         JavaSparkContext sc = new JavaSparkContext(conf);
         sc.sc().addSparkListener(jobListener);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set 
dfs.replication=2 and enable compress
+        SparkUtil.modifySparkHadoopConfiguration(sc.sc(), 
AbstractHadoopJob.loadKylinConfigFromHdfs(new 
SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set 
dfs.replication and enable compress
         final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
         KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index a3b13a8..c329f3c 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -112,7 +112,7 @@ public class SparkCubingMerge extends AbstractApplication 
implements Serializabl
         conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
 
         try (JavaSparkContext sc = new JavaSparkContext(conf)) {
-            SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set 
dfs.replication=2 and enable compress
+            SparkUtil.modifySparkHadoopConfiguration(sc.sc(), 
AbstractHadoopJob.loadKylinConfigFromHdfs(new 
SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set 
dfs.replication and enable compress
             KylinSparkJobListener jobListener = new KylinSparkJobListener();
             sc.sc().addSparkListener(jobListener);
 
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index fcd24f1..aabc767 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -137,8 +137,8 @@ public class SparkUtil {
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
     }
 
-    public static void modifySparkHadoopConfiguration(SparkContext sc) throws 
Exception {
-        sc.hadoopConfiguration().set("dfs.replication", "2"); // cuboid 
intermediate files, replication=2
+    public static void modifySparkHadoopConfiguration(SparkContext sc, 
KylinConfig kylinConfig) throws Exception {
+        sc.hadoopConfiguration().set("dfs.replication", 
kylinConfig.getCuboidDfsReplication()); // cuboid intermediate files
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", 
"true");
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", 
"BLOCK");
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec",
 "org.apache.hadoop.io.compress.DefaultCodec"); // or 
org.apache.hadoop.io.compress.SnappyCodec

Reply via email to