Repository: kylin
Updated Branches:
  refs/heads/master fbf2b3814 -> dc1671ca5


KYLIN-1566 use a separate kylin_job_conf.xml for in-mem cubing

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/209068b9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/209068b9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/209068b9

Branch: refs/heads/master
Commit: 209068b943bf4a90efe4df618e1aaf5cbfe49cde
Parents: 1b54a40
Author: shaofengshi <shaofeng...@apache.org>
Authored: Fri Apr 15 16:11:44 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Sat Apr 16 09:05:40 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin_job_conf_inmem.xml             | 98 ++++++++++++++++++++
 .../apache/kylin/common/KylinConfigBase.java    | 19 ----
 .../kylin/job/engine/JobEngineConfig.java       | 44 ++++++---
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  3 +-
 .../kylin/engine/mr/JobBuilderSupport.java      | 13 ++-
 .../kylin/engine/mr/steps/InMemCuboidJob.java   | 11 ---
 .../cardinality/HiveColumnCardinalityJob.java   |  2 +-
 7 files changed, 140 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/build/conf/kylin_job_conf_inmem.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_job_conf_inmem.xml 
b/build/conf/kylin_job_conf_inmem.xml
new file mode 100644
index 0000000..55bf9ed
--- /dev/null
+++ b/build/conf/kylin_job_conf_inmem.xml
@@ -0,0 +1,98 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+
+    <property>
+        <name>mapreduce.job.split.metainfo.maxsize</name>
+        <value>-1</value>
+        <description>The maximum permissible size of the split metainfo file.
+            The JobTracker won't attempt to read split metainfo files bigger 
than
+            the configured value. No limits if set to -1.
+        </description>
+    </property>
+
+       <property>
+               <name>mapred.compress.map.output</name>
+               <value>true</value>
+               <description>Compress map outputs</description>
+       </property>
+
+       <property>
+               <name>mapred.map.output.compression.codec</name>
+               <value>org.apache.hadoop.io.compress.SnappyCodec</value>
+               <description>The compression codec to use for map outputs
+               </description>
+       </property>
+
+       <property>
+               <name>mapred.output.compress</name>
+               <value>true</value>
+               <description>Compress the output of a MapReduce 
job</description>
+       </property>
+
+       <property>
+               <name>mapred.output.compression.codec</name>
+               <value>org.apache.hadoop.io.compress.SnappyCodec</value>
+               <description>The compression codec to use for job outputs
+               </description>
+       </property>
+
+       <property>
+               <name>mapred.output.compression.type</name>
+               <value>BLOCK</value>
+               <description>The compression type to use for job 
outputs</description>
+       </property>
+
+
+    <property>
+        <name>mapreduce.job.max.split.locations</name>
+        <value>2000</value>
+        <description>No description</description>
+    </property>
+
+    <property>
+        <name>dfs.replication</name>
+        <value>2</value>
+        <description>Block replication</description>
+    </property>
+
+    <property>
+        <name>mapred.task.timeout</name>
+        <value>3600000</value>
+        <description>Set task timeout to 1 hour</description>
+    </property>
+
+       <!--Additional config for in-mem cubing, giving mapper more memory -->
+       <property>
+               <name>mapreduce.map.memory.mb</name>
+               <value>3072</value>
+               <description></description>
+       </property>
+
+       <property>
+               <name>mapreduce.map.java.opts</name>
+               <value>-Xmx2700m</value>
+               <description></description>
+       </property>
+
+       <property>
+               <name>mapreduce.task.io.sort.mb</name>
+               <value>200</value>
+               <description></description>
+       </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 51aa8aa..4d65c1d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -531,25 +531,6 @@ abstract public class KylinConfigBase implements 
Serializable {
         return percent;
     }
 
-    public Map<String, String> getCubingInMemMRJobConfOverride() {
-        // in-mem cubing requires big memory, however dev env (sandbox) may 
not have that much
-        String defaultOverride = isDevEnv() ? "" : 
"mapreduce.map.java.opts=-Xmx2700m;  mapreduce.map.memory.mb=3072;  
mapreduce.task.io.sort.mb=200";
-        String override = 
getOptional("kylin.job.cubing.inmem.mrjob_conf_override", defaultOverride);
-
-        Map<String, String> result = Maps.newHashMap();
-        for (String pair : override.split(";")) {
-            int cut = pair.indexOf('=');
-            if (cut < 0)
-                continue;
-            String k = pair.substring(0, cut).trim();
-            String v = pair.substring(cut + 1).trim();
-            if (k.isEmpty() || v.isEmpty())
-                continue;
-            result.put(k, v);
-        }
-        return result;
-    }
-
     public String getHbaseDefaultCompressionCodec() {
         return getOptional("kylin.hbase.default.compression.codec", "");
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java 
b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 546c033..fb4ce68 100644
--- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -33,8 +33,10 @@ import org.slf4j.LoggerFactory;
  */
 public class JobEngineConfig {
     private static final Logger logger = 
LoggerFactory.getLogger(JobEngineConfig.class);
-    public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
-    public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
+    public static final String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
+    public static final String HIVE_CONF_FILENAME = "kylin_hive_conf";
+    public static final String DEFAUL_JOB_CONF_SUFFIX = "";
+    public static final String IN_MEM_JOB_CONF_SUFFIX = "inmem";
 
     private static File getJobConfig(String fileName) {
         String path = System.getProperty(KylinConfig.KYLIN_CONF);
@@ -49,10 +51,10 @@ public class JobEngineConfig {
         return null;
     }
 
-    private String getHadoopJobConfFilePath(RealizationCapacity capaticy, 
boolean appendSuffix) throws IOException {
+    private String getHadoopJobConfFilePath(String suffix, boolean 
appendSuffix) throws IOException {
         String hadoopJobConfFile;
-        if (capaticy != null && appendSuffix) {
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + 
capaticy.toString().toLowerCase() + ".xml");
+        if (suffix != null && appendSuffix) {
+            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + 
suffix.toLowerCase() + ".xml");
         } else {
             hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
         }
@@ -69,19 +71,31 @@ public class JobEngineConfig {
         return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
     }
 
-    public String getHadoopJobConfFilePath(RealizationCapacity capaticy) 
throws IOException {
-        String path = getHadoopJobConfFilePath(capaticy, true);
-        if (!StringUtils.isEmpty(path)) {
-            logger.info("Chosen job conf is : " + path);
-            return path;
+    /**
+     *
+     * @param suffix job config file suffix name; if be null, will use the 
default job conf
+     * @return the job config file path
+     * @throws IOException
+     */
+    public String getHadoopJobConfFilePath(String jobType, String capacity) 
throws IOException {
+        String suffix;
+        if(!StringUtils.isEmpty(jobType)) {
+            suffix = jobType + "_" + capacity;
         } else {
-            path = getHadoopJobConfFilePath(capaticy, false);
-            if (!StringUtils.isEmpty(path)) {
-                logger.info("Chosen job conf is : " + path);
-                return path;
+            suffix = capacity;
+        }
+        String path = getHadoopJobConfFilePath(suffix, true);
+        if (StringUtils.isEmpty(path)) {
+            path = getHadoopJobConfFilePath(jobType, true);
+            if (StringUtils.isEmpty(path)) {
+                path = getHadoopJobConfFilePath(jobType, false);
+                if (StringUtils.isEmpty(path)) {
+                    path = "";
+                }
             }
         }
-        return "";
+        logger.info("Chosen job conf is : " + path);
+        return path;
     }
 
     public String getHiveConfFilePath() throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 0b1bd90..a1c9cd9 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -30,6 +30,7 @@ import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,7 +109,7 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
         MapReduceExecutable cubeStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, ((CubeSegment) 
seg).getCubeDesc().getModel());
+        appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX, 
((CubeSegment) seg).getCubeDesc().getModel());
 
         cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c4fc6b9..841c402 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
@@ -162,9 +161,17 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/secondary_index/";
     }
 
-    public void appendMapReduceParameters(StringBuilder buf, DataModelDesc 
modelDesc) {
+    public void appendMapReduceParameters(StringBuilder buf, DataModelDesc 
dataModelDesc) {
+        appendMapReduceParameters(buf, JobEngineConfig.DEFAUL_JOB_CONF_SUFFIX, 
dataModelDesc.getCapacity().toString());
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf, String jobType, 
DataModelDesc dataModelDesc) {
+        appendMapReduceParameters(buf, jobType, 
dataModelDesc.getCapacity().toString());
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf, String jobType, 
String capacity) {
         try {
-            String jobConf = 
config.getHadoopJobConfFilePath(modelDesc.getCapacity());
+            String jobConf = config.getHadoopJobConfFilePath(jobType, 
capacity);
             if (jobConf != null && jobConf.length() > 0) {
                 buf.append(" -conf ").append(jobConf);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index f440b22..e7bbdf1 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -101,9 +101,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             logger.info("Starting: " + job.getJobName());
             
-            // some special tuning for in-mem MR job
-            overrideJobConf(job.getConfiguration(), config);
-
             setJobClasspath(job);
 
             // add metadata to distributed cache
@@ -112,8 +109,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, 
segmentName);
-            long timeout = 1000 * 60 * 60L; // 1 hour
-            job.getConfiguration().set("mapred.task.timeout", 
String.valueOf(timeout));
 
             // set input
             IMRTableInputFormat flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
@@ -149,12 +144,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
         }
     }
 
-    private void overrideJobConf(Configuration jobConf, KylinConfig 
kylinConfig) {
-        for (Entry<String, String> entry : 
kylinConfig.getCubingInMemMRJobConfOverride().entrySet()) {
-            jobConf.set(entry.getKey(), entry.getValue());
-        }
-    }
-
     private int calculateReducerNum(CubeSegment cubeSeg) throws IOException {
         KylinConfig kylinConfig = cubeSeg.getConfig();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 9162208..3ce0ab2 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -71,7 +71,7 @@ public class HiveColumnCardinalityJob extends 
AbstractHadoopJob {
             Configuration conf = getConf();
 
             JobEngineConfig jobEngineConfig = new 
JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null));
+            conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null, 
null));
 
             job = Job.getInstance(conf, jobName);
 

Reply via email to