shaofengshi closed pull request #213: KYLIN-3509 Allocate more memory for merge-dictionary step URL: https://github.com/apache/kylin/pull/213
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 58d9caacbc..4895bf0745 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1140,6 +1140,11 @@ public String getKylinJobMRLibDir() { return getPropertiesByPrefix("kylin.engine.spark-conf."); } + public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) { + return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + "."); + } + + public double getDefaultHadoopJobReducerInputMB() { return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500")); } diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 23c0730afa..e505def0a3 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -312,6 +312,10 @@ kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false +### Spark conf for specific job +kylin.engine.spark-conf-mergedict.spark.executor.memory=6G +kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2 + # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime #kylin.engine.spark-conf.spark.yarn.archive=hdfs://namenode:8020/kylin/spark/spark-libs.jar #kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 560293cf44..5735a80975 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -71,4 +71,6 @@ private ExecutableConstants() { public static final String STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE = "Update Lookup Snapshot Cache to Query Engine"; public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE = "Take Snapshot to Metadata Store"; public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info"; + + public static final String SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict"; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index d9027082b6..88a58ae5a2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -26,6 +26,7 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.MergeDictionaryJob; import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,8 +79,8 @@ public MapReduceExecutable createMergeDictionaryStep(CubeSegment seg, String job MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable(); mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY); StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX); - appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID)); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java index 4487610885..eb67fefaa3 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java @@ -89,6 +89,7 @@ public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, sparkExecutable.setJobId(jobID); sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY); + sparkExecutable.setSparkConfigName(ExecutableConstants.SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY); StringBuilder jars = new StringBuilder(); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 612239741f..dea820698e 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -63,6 +63,7 @@ private static final String JARS = "jars"; private static final String JOB_ID = "jobId"; private static final String COUNTER_SAVE_AS = "CounterSaveAs"; + private static final String CONFIG_NAME = "configName"; public void setClassName(String className) { this.setParam(CLASS_NAME, className); @@ -84,6 +85,17 @@ public String getCounterSaveAs() { return getParam(COUNTER_SAVE_AS); } + /** + * set spark override conf for specific job + */ + public void setSparkConfigName(String configName) { + this.setParam(CONFIG_NAME, configName); + } + + public String getSparkConfigName() { + return getParam(CONFIG_NAME); + } + private String formatArgs() { StringBuilder stringBuilder = new StringBuilder(); for (Map.Entry<String, String> entry : getParams().entrySet()) { @@ -92,7 +104,7 @@ private String formatArgs() { if (entry.getKey().equals(CLASS_NAME)) { stringBuilder.insert(0, tmp); } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID) - || entry.getKey().equals(COUNTER_SAVE_AS)) { + || entry.getKey().equals(COUNTER_SAVE_AS) || entry.getKey().equals(CONFIG_NAME)) { // JARS is for spark-submit, not for app continue; } else { @@ -221,6 +233,13 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio } Map<String, String> sparkConfs = config.getSparkConfigOverride(); + + String sparkConfigName = getSparkConfigName(); + if (sparkConfigName != null) { + Map<String, String> sparkSpecificConfs = config.getSparkConfigOverrideWithSpecificName(sparkConfigName); + sparkConfs.putAll(sparkSpecificConfs); + } + for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()) .append(" "); diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index edced8f1f3..e6a6bd6c9b 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -202,6 +202,10 @@ kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false +### Spark conf for specific job +kylin.engine.spark-conf-mergedict.spark.executor.memory=1G +kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2 + ### QUERY PUSH DOWN ### #kylin.query.pushdown.runner-class-name=org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services