http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
----------------------------------------------------------------------
diff --cc 
core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
index 0000000,904c4bd..dd18c91
mode 000000,100644..100644
--- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
+++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
@@@ -1,0 -1,142 +1,142 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.job.metrics;
+ 
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.metrics.MetricsManager;
+ import org.apache.kylin.metrics.lib.impl.RecordEvent;
+ import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+ import org.apache.kylin.metrics.property.JobPropertyEnum;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class JobMetricsFacade {
+     private static final Logger logger = 
LoggerFactory.getLogger(JobMetricsFacade.class);
+ 
+     public static void updateMetrics(JobStatisticsResult jobStats) {
+         if 
(!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForJobEnabled()) {
+             return;
+         }
+         /**
+          * report job related metrics
+          */
+         RecordEvent metricsEvent;
+         if (jobStats.throwable == null) {
+             metricsEvent = new 
TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob());
+             setJobWrapper(metricsEvent, jobStats.user, jobStats.projectName, 
jobStats.cubeName, jobStats.jobId,
+                     jobStats.jobType, jobStats.cubingType);
+             setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, 
jobStats.buildDuration,
+                     jobStats.waitResourceTime, jobStats.perBytesTimeCost, //
+                     jobStats.dColumnDistinct, jobStats.dDictBuilding, 
jobStats.dCubingInmem, jobStats.dHfileConvert);
+         } else {
+             metricsEvent = new 
TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException());
+             setJobExceptionWrapper(metricsEvent, jobStats.user, 
jobStats.projectName, jobStats.cubeName, jobStats.jobId,
+                     jobStats.jobType, jobStats.cubingType, //
+                     jobStats.throwable.getClass());
+         }
+         MetricsManager.getInstance().update(metricsEvent);
+     }
+ 
+     private static void setJobWrapper(RecordEvent metricsEvent, String user, 
String projectName, String cubeName,
+             String jobId, String jobType, String cubingType) {
+         metricsEvent.put(JobPropertyEnum.USER.toString(), user);
+         metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
+         metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+         metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
+         metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
+         metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
+     }
+ 
+     private static void setJobStats(RecordEvent metricsEvent, long tableSize, 
long cubeSize, long buildDuration,
+             long waitResourceTime, double perBytesTimeCost, long 
dColumnDistinct, long dDictBuilding, long dCubingInmem,
+             long dHfileConvert) {
+         metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
+         metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
+         metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 
buildDuration);
+         metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 
waitResourceTime);
+         metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 
perBytesTimeCost);
+         
metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), 
dColumnDistinct);
+         metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 
dDictBuilding);
+         
metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 
dCubingInmem);
+         
metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 
dHfileConvert);
+     }
+ 
+     private static <T extends Throwable> void 
setJobExceptionWrapper(RecordEvent metricsEvent, String user,
+             String projectName, String cubeName, String jobId, String 
jobType, String cubingType,
+             Class<T> throwableClass) {
+         setJobWrapper(metricsEvent, user, projectName, cubeName, jobId, 
jobType, cubingType);
+         metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), 
throwableClass.getName());
+     }
+ 
+     public static class JobStatisticsResult {
+         // dimensions
+         private String user;
+         private String projectName;
+         private String cubeName;
+         private String jobId;
+         private String jobType;
+         private String cubingType;
+ 
+         // statistics
+         private long tableSize;
+         private long cubeSize;
+         private long buildDuration;
+         private long waitResourceTime;
+         private double perBytesTimeCost;
+ 
+         // step statistics
+         private long dColumnDistinct = 0L;
+         private long dDictBuilding = 0L;
+         private long dCubingInmem = 0L;
+         private long dHfileConvert = 0L;
+ 
+         // exception
+         private Throwable throwable;
+ 
+         public void setWrapper(String user, String projectName, String 
cubeName, String jobId, String jobType,
+                 String cubingType) {
+             this.user = user;
 -            this.projectName = projectName;
++            this.projectName = projectName == null ? null : 
projectName.toUpperCase();
+             this.cubeName = cubeName;
+             this.jobId = jobId;
+             this.jobType = jobType;
+             this.cubingType = cubingType;
+         }
+ 
+         public void setJobStats(long tableSize, long cubeSize, long 
buildDuration, long waitResourceTime,
+                 double perBytesTimeCost) {
+             this.tableSize = tableSize;
+             this.cubeSize = cubeSize;
+             this.buildDuration = buildDuration;
+             this.waitResourceTime = waitResourceTime;
+             this.perBytesTimeCost = perBytesTimeCost;
+         }
+ 
+         public void setJobStepStats(long dColumnDistinct, long dDictBuilding, 
long dCubingInmem, long dHfileConvert) {
+             this.dColumnDistinct = dColumnDistinct;
+             this.dDictBuilding = dDictBuilding;
+             this.dCubingInmem = dCubingInmem;
+             this.dHfileConvert = dHfileConvert;
+         }
+ 
+         public void setJobException(Throwable throwable) {
+             this.throwable = throwable;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
----------------------------------------------------------------------
diff --cc 
core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
index fd17370,0000000..ee29b13
mode 100644,000000..100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java
@@@ -1,46 -1,0 +1,46 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * 
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 +*/
 +
 +package org.apache.kylin.job;
 +
 +import org.apache.kylin.job.exception.ExecuteException;
 +import org.apache.kylin.job.execution.ExecutableContext;
 +import org.apache.kylin.job.execution.ExecuteResult;
 +
 +/**
 + */
 +public class FiveSecondSucceedTestExecutable extends BaseTestExecutable {
 +
 +    public FiveSecondSucceedTestExecutable() {
 +        super();
 +    }
 +
 +    public FiveSecondSucceedTestExecutable(int sleepTime) {
 +        super();
 +    }
 +
 +    @Override
 +    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
 +        try {
 +            Thread.sleep(5000);
 +        } catch (InterruptedException e) {
 +            Thread.currentThread().interrupt();
 +        }
-         return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
++        return ExecuteResult.createSucceed();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
----------------------------------------------------------------------
diff --cc 
core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
index 0000000,61b1742..f656c44
mode 000000,100644..100644
--- a/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java
@@@ -1,0 -1,50 +1,50 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.kylin.job;
+ 
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.job.execution.ExecutableContext;
+ import org.apache.kylin.job.execution.ExecuteResult;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  */
+ public class RetryableTestExecutable extends BaseTestExecutable {
+     private static final Logger logger = 
LoggerFactory.getLogger(RetryableTestExecutable.class);
+ 
+     public RetryableTestExecutable() {
+         super();
+     }
+ 
+     @Override
+     protected ExecuteResult doWork(ExecutableContext context) {
+         logger.debug("run retryable exception test. ");
+         String[] exceptions = 
KylinConfig.getInstanceFromEnv().getJobRetryExceptions();
+         Throwable ex = null;
 -        if (exceptions != null && exceptions[0] != null) {
++        if (exceptions != null && exceptions.length > 0) {
+             try {
+                 ex = (Throwable) Class.forName(exceptions[0]).newInstance();
+             } catch (Exception e) {
+                 e.printStackTrace();
+             }
+         }
+         return new ExecuteResult(ExecuteResult.State.ERROR, null, ex);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
----------------------------------------------------------------------
diff --cc core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
index 89057e6,0000000..1826850
mode 100644,000000..100644
--- a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java
@@@ -1,39 -1,0 +1,39 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 +*/
 +
 +package org.apache.kylin.job;
 +
 +import org.apache.kylin.job.exception.ExecuteException;
 +import org.apache.kylin.job.execution.ExecutableContext;
 +import org.apache.kylin.job.execution.ExecuteResult;
 +
 +public class RunningTestExecutable extends BaseTestExecutable {
 +
 +    public RunningTestExecutable() {
 +        super();
 +    }
 +
 +    @Override
 +    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
 +        try {
 +            Thread.sleep(1000);
 +        } catch (InterruptedException e) {
 +        }
-         return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
++        return ExecuteResult.createSucceed();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --cc 
core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index b1fc544,badd483..d1b7d96
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@@ -31,9 -30,7 +31,10 @@@ import org.apache.kylin.common.KylinCon
  import org.apache.kylin.job.BaseTestExecutable;
  import org.apache.kylin.job.ErrorTestExecutable;
  import org.apache.kylin.job.FailedTestExecutable;
 +import org.apache.kylin.job.FiveSecondSucceedTestExecutable;
 +import org.apache.kylin.job.NoErrorStatusExecutable;
+ import org.apache.kylin.job.RetryableTestExecutable;
 +import org.apache.kylin.job.RunningTestExecutable;
  import org.apache.kylin.job.SelfStopExecutable;
  import org.apache.kylin.job.SucceedTestExecutable;
  import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@@ -174,56 -148,18 +175,71 @@@ public class DefaultSchedulerTest exten
      }
  
      @Test
 +    public void tesMetaStoreRecover() throws Exception {
 +        logger.info("tesMetaStoreRecover");
 +        NoErrorStatusExecutable job = new NoErrorStatusExecutable();
 +        ErrorTestExecutable task = new ErrorTestExecutable();
 +        job.addTask(task);
 +        execMgr.addJob(job);
 +        Thread.sleep(2000);
 +        runningJobToError(job.getId());
 +        Thread.sleep(2000);
 +        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(job.getId()).getState());
 +    }
 +
 +    @Test
 +    public void testSchedulerStop() throws Exception {
 +        logger.info("testSchedulerStop");
 +
 +        thrown.expect(RuntimeException.class);
 +        thrown.expectMessage("too long wait time");
 +
 +        DefaultChainedExecutable job = new DefaultChainedExecutable();
 +        BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable();
 +        job.addTask(task1);
 +        execMgr.addJob(job);
 +
 +        //sleep 3s to make sure SucceedTestExecutable is running 
 +        Thread.sleep(3000);
 +        //scheduler failed due to some reason
 +        scheduler.shutdown();
 +
 +        waitForJobFinish(job.getId(), 6000);
 +    }
 +
 +    @Test
 +    public void testSchedulerRestart() throws Exception {
 +        logger.info("testSchedulerRestart");
 +
 +        DefaultChainedExecutable job = new DefaultChainedExecutable();
 +        BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable();
 +        job.addTask(task1);
 +        execMgr.addJob(job);
 +
 +        //sleep 3s to make sure SucceedTestExecutable is running 
 +        Thread.sleep(3000);
 +        //scheduler failed due to some reason
 +        scheduler.shutdown();
 +        //restart
 +        startScheduler();
 +
 +        waitForJobFinish(job.getId(), 10000);
 +        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(job.getId()).getState());
 +        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
 +    }
++    
+     public void testRetryableException() throws Exception {
+         System.setProperty("kylin.job.retry-exception-classes", 
"java.io.FileNotFoundException");
+         System.setProperty("kylin.job.retry", "3");
+         DefaultChainedExecutable job = new DefaultChainedExecutable();
+         BaseTestExecutable task1 = new SucceedTestExecutable();
+         BaseTestExecutable task2 = new RetryableTestExecutable();
+         job.addTask(task1);
+         job.addTask(task2);
 -        jobService.addJob(job);
 -        waitForJobFinish(job.getId());
 -        Assert.assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(task1.getId()).getState());
 -        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(task2.getId()).getState());
 -        Assert.assertEquals(ExecutableState.ERROR, 
jobService.getOutput(job.getId()).getState());
++        execMgr.addJob(job);
++        waitForJobFinish(job.getId(), 10000);
++        Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
++        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(task2.getId()).getState());
++        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(job.getId()).getState());
+     }
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
----------------------------------------------------------------------
diff --cc 
core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index efbc33e,ab55563..f09c47c
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@@ -55,19 -58,51 +55,21 @@@ import com.google.common.collect.Maps
   */
  public class TableMetadataManager {
  
 +    @SuppressWarnings("unused")
      private static final Logger logger = 
LoggerFactory.getLogger(TableMetadataManager.class);
  
+     public static final Serializer<TableDesc> TABLE_SERIALIZER = new 
JsonSerializer<TableDesc>(TableDesc.class);
 -    public static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new 
JsonSerializer<TableExtDesc>(
++    
 +    private static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new 
JsonSerializer<TableExtDesc>(
              TableExtDesc.class);
 -    public static final Serializer<ExternalFilterDesc> 
EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(
 -            ExternalFilterDesc.class);
 -
 -    // static cached instances
 -    private static final ConcurrentMap<KylinConfig, TableMetadataManager> 
CACHE = new ConcurrentHashMap<KylinConfig, TableMetadataManager>();
  
      public static TableMetadataManager getInstance(KylinConfig config) {
 -        TableMetadataManager r = CACHE.get(config);
 -        if (r != null) {
 -            return r;
 -        }
 -
 -        synchronized (TableMetadataManager.class) {
 -            r = CACHE.get(config);
 -            if (r != null) {
 -                return r;
 -            }
 -            try {
 -                r = new TableMetadataManager(config);
 -                CACHE.put(config, r);
 -                if (CACHE.size() > 1) {
 -                    logger.warn("More than one singleton exist, current keys: 
{}", StringUtils
 -                            
.join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, 
String>() {
 -                                @Nullable
 -                                @Override
 -                                public String apply(@Nullable KylinConfig 
input) {
 -                                    return 
String.valueOf(System.identityHashCode(input));
 -                                }
 -                            }), ","));
 -                }
 -
 -                return r;
 -            } catch (IOException e) {
 -                throw new IllegalStateException("Failed to init 
TableMetadataManager from " + config, e);
 -            }
 -        }
 +        return config.getManager(TableMetadataManager.class);
      }
  
 -    public static void clearCache() {
 -        CACHE.clear();
 +    // called by reflection
 +    static TableMetadataManager newInstance(KylinConfig config) throws 
IOException {
 +        return new TableMetadataManager(config);
      }
  
      // 
============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --cc 
core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index fa97596,d8b33c0..7597d40
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@@ -154,20 -146,12 +154,19 @@@ public class FunctionDesc implements Se
      }
  
      public DataType getRewriteFieldType() {
- 
 -        if (isMax() || isMin())
 -            return parameter.getColRefs().get(0).getType();
 -        else if (getMeasureType() instanceof BasicMeasureType)
 -            return returnDataType;
 -        else
 +        if (getMeasureType() instanceof BasicMeasureType) {
 +            if (isMax() || isMin()) {
 +                return parameter.getColRefs().get(0).getType();
 +            } else if (isSum()) {
 +                return parameter.getColRefs().get(0).getType();
 +            } else if (isCount()) {
 +                return DataType.getType("bigint");
 +            } else {
 +                throw new IllegalArgumentException("unknown measure type " + 
getMeasureType());
 +            }
 +        } else {
              return DataType.ANY;
 +        }
      }
  
      public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@@ -247,10 -231,18 +246,18 @@@
          return expression;
      }
  
++    public void setExpression(String expression) {
++        this.expression = expression;
++    }
++    
      public ParameterDesc getParameter() {
          return parameter;
      }
  
+     public void setParameter(ParameterDesc parameter) {
+         this.parameter = parameter;
+     }
+ 
 -    public void setExpression(String expression) {
 -        this.expression = expression;
 -    }
 -
      public int getParameterCount() {
          int count = 0;
          for (ParameterDesc p = parameter; p != null; p = 
p.getNextParameter()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --cc 
core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 0029de2,a7d37e7..9b7aaf2
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@@ -50,8 -50,7 +50,33 @@@ import com.google.common.collect.Lists
  public class ProjectInstance extends RootPersistentEntity {
  
      public static final String DEFAULT_PROJECT_NAME = "default";
+ 
++    public static ProjectInstance create(String name, String owner, String 
description, LinkedHashMap<String, String> overrideProps, 
List<RealizationEntry> realizationEntries, List<String> models) {
++        ProjectInstance projectInstance = new ProjectInstance();
++
++        projectInstance.updateRandomUuid();
++        projectInstance.setName(name);
++        projectInstance.setOwner(owner);
++        projectInstance.setDescription(description);
++        projectInstance.setStatus(ProjectStatusEnum.ENABLED);
++        projectInstance.setCreateTimeUTC(System.currentTimeMillis());
++        projectInstance.setOverrideKylinProps(overrideProps);
++
++        if (realizationEntries != null)
++            projectInstance.setRealizationEntries(realizationEntries);
++        else
++            projectInstance.setRealizationEntries(Lists.<RealizationEntry> 
newArrayList());
++        if (models != null)
++            projectInstance.setModels(models);
++        else
++            projectInstance.setModels(new ArrayList<String>());
++        return projectInstance;
++    }
++
++    // 
============================================================================
++
 +    private KylinConfigExt config;
 +
      @JsonProperty("name")
      private String name;
  
@@@ -87,50 -86,53 +112,44 @@@
      @JsonInclude(JsonInclude.Include.NON_NULL)
      private LinkedHashMap<String, String> overrideKylinProps;
  
-     public String getResourcePath() {
-         return concatResourcePath(resourceName());
-     }
 -    private KylinConfigExt config;
++    public void init() {
++        if (name == null)
++            name = ProjectInstance.DEFAULT_PROJECT_NAME;
  
-     public static String concatResourcePath(String projectName) {
-         return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + 
".json";
 -    public String getResourcePath() {
 -        return concatResourcePath(name);
--    }
++        if (realizationEntries == null) {
++            realizationEntries = new ArrayList<RealizationEntry>();
++        }
  
-     public static ProjectInstance create(String name, String owner, String 
description, LinkedHashMap<String, String> overrideProps, 
List<RealizationEntry> realizationEntries, List<String> models) {
-         ProjectInstance projectInstance = new ProjectInstance();
 -    public static String concatResourcePath(String projectName) {
 -        return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + 
".json";
 -    }
++        if (tables == null)
++            tables = new TreeSet<String>();
  
-         projectInstance.updateRandomUuid();
-         projectInstance.setName(name);
-         projectInstance.setOwner(owner);
-         projectInstance.setDescription(description);
-         projectInstance.setStatus(ProjectStatusEnum.ENABLED);
-         projectInstance.setCreateTimeUTC(System.currentTimeMillis());
-         projectInstance.setOverrideKylinProps(overrideProps);
 -    public static String getNormalizedProjectName(String project) {
 -        if (project == null)
 -            throw new IllegalStateException("Trying to normalized a project 
name which is null");
++        if (overrideKylinProps == null) {
++            overrideKylinProps = new LinkedHashMap<>();
++        }
  
-         if (realizationEntries != null)
-             projectInstance.setRealizationEntries(realizationEntries);
-         else
-             projectInstance.setRealizationEntries(Lists.<RealizationEntry> 
newArrayList());
-         if (models != null)
-             projectInstance.setModels(models);
-         else
-             projectInstance.setModels(new ArrayList<String>());
-         return projectInstance;
 -        return project.toUpperCase();
 -    }
++        initConfig();
+ 
 -    public static ProjectInstance create(String name, String owner, String 
description, LinkedHashMap<String, String> overrideProps, 
List<RealizationEntry> realizationEntries, List<String> models) {
 -        ProjectInstance projectInstance = new ProjectInstance();
++        if (StringUtils.isBlank(this.name))
++            throw new IllegalStateException("Project name must not be blank");
 +    }
  
-     public void initConfig() {
 -        projectInstance.updateRandomUuid();
 -        projectInstance.setName(name);
 -        projectInstance.setOwner(owner);
 -        projectInstance.setDescription(description);
 -        projectInstance.setStatus(ProjectStatusEnum.ENABLED);
 -        projectInstance.setCreateTimeUTC(System.currentTimeMillis());
 -        if (overrideProps != null) {
 -            projectInstance.setOverrideKylinProps(overrideProps);
 -        } else {
 -            projectInstance.setOverrideKylinProps(new LinkedHashMap<String, 
String>());
 -        }
 -        if (realizationEntries != null)
 -            projectInstance.setRealizationEntries(realizationEntries);
 -        else
 -            projectInstance.setRealizationEntries(Lists.<RealizationEntry> 
newArrayList());
 -        if (models != null)
 -            projectInstance.setModels(models);
 -        else
 -            projectInstance.setModels(new ArrayList<String>());
 -        return projectInstance;
++    private void initConfig() {
 +        this.config = 
KylinConfigExt.createInstance(KylinConfig.getInstanceFromEnv(), 
this.overrideKylinProps);
      }
  
--    // 
============================================================================
++    public String getResourcePath() {
++        return concatResourcePath(resourceName());
++    }
  
--    public ProjectInstance() {
++    public static String concatResourcePath(String projectName) {
++        return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + 
".json";
      }
  
 +    @Override
 +    public String resourceName() {
 +        return this.name;
 +    }
 +    
      public String getDescription() {
          return description;
      }
@@@ -310,31 -333,6 +329,10 @@@
          return config;
      }
  
 +    public void setConfig(KylinConfigExt config) {
 +        this.config = config;
 +    }
 +
-     public void init() {
-         if (name == null)
-             name = ProjectInstance.DEFAULT_PROJECT_NAME;
- 
-         if (realizationEntries == null) {
-             realizationEntries = new ArrayList<RealizationEntry>();
-         }
- 
-         if (tables == null)
-             tables = new TreeSet<String>();
- 
-         if (overrideKylinProps == null) {
-             overrideKylinProps = new LinkedHashMap<>();
-         }
- 
-         initConfig();
- 
-         if (StringUtils.isBlank(this.name))
-             throw new IllegalStateException("Project name must not be blank");
-     }
- 
      @Override
      public String toString() {
          return "ProjectDesc [name=" + name + "]";

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --cc 
core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index b910ffe,024990f..11ad8bb
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@@ -276,9 -255,44 +281,25 @@@ public abstract class GTCubeStorageQuer
                  }
              }
          }
 -
 -        // expand derived
 -        Set<TblColRef> resultD = Sets.newHashSet();
 -        for (TblColRef col : result) {
 -            if (cubeDesc.isExtendedColumn(col)) {
 -                throw new CubeDesc.CannotFilterExtendedColumnException(col);
 -            }
 -            if (cubeDesc.isDerived(col)) {
 -                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
 -                if (hostInfo.isOneToOne) {
 -                    for (TblColRef hostCol : hostInfo.columns) {
 -                        resultD.add(hostCol);
 -                    }
 -                }
 -                //if not one2one, it will be pruned
 -            } else {
 -                resultD.add(col);
 -            }
 -        }
 -        return resultD;
 +        return result;
      }
  
+     private long getQueryFilterMask(Set<TblColRef> filterColumnD) {
+         long filterMask = 0;
+ 
+         logger.info("Filter column set for query: " + 
filterColumnD.toString());
+         if (filterColumnD.size() != 0) {
+             RowKeyColDesc[] allColumns = 
cubeDesc.getRowkey().getRowKeyColumns();
+             for (int i = 0; i < allColumns.length; i++) {
+                 if (filterColumnD.contains(allColumns[i].getColRef())) {
+                     filterMask |= 1L << allColumns[i].getBitIndex();
+                 }
+             }
+         }
+         logger.info("Filter mask is: " + filterMask);
+         return filterMask;
+     }
+ 
      public boolean isNeedStorageAggregation(Cuboid cuboid, 
Collection<TblColRef> groupD,
              Collection<TblColRef> singleValueD) {
          HashSet<TblColRef> temp = Sets.newHashSet();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 8e21f5c,8fbc0c9..faac724
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@@ -18,7 -18,9 +18,10 @@@
  
  package org.apache.kylin.engine.mr;
  
 -import org.apache.kylin.cube.CubeManager;
++import java.util.List;
++
  import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.cube.cuboid.CuboidUtil;
  import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
  import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
  import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@@ -57,7 -62,12 +61,12 @@@ public class BatchCubingJobBuilder2 ext
          inputSide.addStepPhase1_CreateFlatTable(result);
  
          // Phase 2: Build Dictionary
 -        result.addTask(createFactDistinctColumnsStepWithStats(jobId));
 +        result.addTask(createFactDistinctColumnsStep(jobId));
+ 
+         if (isEnableUHCDictStep()) {
+             result.addTask(createBuildUHCDictStep(jobId));
+         }
+ 
          result.addTask(createBuildDictionaryStep(jobId));
          result.addTask(createSaveStatisticsStep(jobId));
          outputSide.addStepPhase2_BuildDictionary(result);
@@@ -75,8 -85,22 +84,22 @@@
          return result;
      }
  
+     private boolean isEnableUHCDictStep() {
+         if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
+             return false;
+         }
+ 
 -        List<TblColRef> uhcColumns = 
CubeManager.getInstance(config.getConfig()).getAllUHCColumns(seg.getCubeDesc());
++        List<TblColRef> uhcColumns = seg.getCubeDesc().getAllUHCColumns();
+         if (uhcColumns.size() == 0) {
+             return false;
+         }
+ 
+         return true;
+     }
+ 
      protected void addLayerCubingSteps(final CubingJob result, final String 
jobId, final String cuboidRootPath) {
-         final int maxLevel = seg.getCuboidScheduler().getBuildLevel();
+         // Don't know statistics so that tree cuboid scheduler is not 
determined. Determine the maxLevel at runtime
+         final int maxLevel = 
CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
          // base cuboid step
          
result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 
0), jobId));
          // n dim cuboid steps

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 6393abf,abf7e02..6f26c35
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@@ -99,9 -142,11 +142,11 @@@ public class CubingJob extends DefaultC
          format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
          result.setDeployEnvName(kylinConfig.getDeployEnv());
          result.setProjectName(projList.get(0).getName());
+         result.setJobType(jobType);
          CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), 
result.getParams());
          CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+         CubingExecutableUtil.setSegmentName(seg.getName(), 
result.getParams());
 -        result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() 
+ " - " + seg.getName() + " - "
 +        result.setName(jobType + " CUBE - " + 
seg.getCubeInstance().getDisplayName() + " - " + seg.getName() + " - "
                  + format.format(new Date(System.currentTimeMillis())));
          result.setSubmitter(submitter);
          
result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
@@@ -197,6 -259,44 +259,44 @@@
          super.onExecuteFinished(result, executableContext);
      }
  
+     protected void onStatusChange(ExecutableContext context, ExecuteResult 
result, ExecutableState state) {
+         super.onStatusChange(context, result, state);
+ 
+         updateMetrics(context, result, state);
+     }
+ 
+     protected void updateMetrics(ExecutableContext context, ExecuteResult 
result, ExecutableState state) {
+         JobMetricsFacade.JobStatisticsResult jobStats = new 
JobMetricsFacade.JobStatisticsResult();
 -        jobStats.setWrapper(getSubmitter(), 
ProjectInstance.getNormalizedProjectName(getProjectName()),
++        jobStats.setWrapper(getSubmitter(), getProjectName(),
+                 CubingExecutableUtil.getCubeName(getParams()), getId(), 
getJobType(),
+                 getAlgorithm() == null ? "NULL" : getAlgorithm().toString());
+ 
+         if (state == ExecutableState.SUCCEED) {
+             jobStats.setJobStats(findSourceSizeBytes(), findCubeSizeBytes(), 
getDuration(), getMapReduceWaitTime(),
+                     getPerBytesTimeCost(findSourceSizeBytes(), 
getDuration()));
+             if (CubingJobTypeEnum.getByName(getJobType()) == 
CubingJobTypeEnum.BUILD) {
+                 jobStats.setJobStepStats(
+                         
getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(),
+                         
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(),
+                         
getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(),
+                         
getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration());
+             }
+         } else if (state == ExecutableState.ERROR) {
+             jobStats.setJobException(result.getThrowable() != null ? 
result.getThrowable() : new Exception());
+         }
+         JobMetricsFacade.updateMetrics(jobStats);
+     }
+ 
+     private static double getPerBytesTimeCost(long size, long timeCost) {
+         if (size <= 0) {
+             return 0;
+         }
+         if (size < MIN_SOURCE_SIZE) {
+             size = MIN_SOURCE_SIZE;
+         }
+         return timeCost * 1.0 / size;
+     }
+ 
      /**
       * build fail because the metadata store has problem.
       * @param exception

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 2c0f9f6,ade07e9..8925a8e
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@@ -462,7 -468,9 +471,12 @@@ public abstract class AbstractHadoopJo
          }
      }
  
+     public static KylinConfig 
loadKylinConfigFromHdfs(SerializableConfiguration conf, String uri) {
+         HadoopUtil.setCurrentConfiguration(conf.get());
++        return loadKylinConfigFromHdfs(uri);
++    }
+ 
 +    public static KylinConfig loadKylinConfigFromHdfs(String uri) {
          if (uri == null)
              throw new IllegalArgumentException("meta url should not be null");
  

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 52b6af5,50c589a..64163ad
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@@ -66,6 -68,9 +68,9 @@@ public interface BatchConstants 
      String CFG_OUTPUT_PARTITION = "partition";
      String CFG_MR_SPARK_JOB = "mr.spark.job";
      String CFG_SPARK_META_URL = "spark.meta.url";
+     String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
+ 
 -    String CFG_HLL_SHARD_BASE = "mapreduce.partition.hll.shard.base";
++    String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
  
      /**
       * command line ARGuments

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index fcd3162,8b9b928..3c054a3
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@@ -40,6 -40,6 +40,7 @@@ import org.apache.hadoop.io.SequenceFil
  import org.apache.hadoop.io.SequenceFile.Reader.Option;
  import org.apache.hadoop.util.ReflectionUtils;
  import org.apache.kylin.common.KylinConfig;
++import org.apache.kylin.common.persistence.RawResource;
  import org.apache.kylin.common.persistence.ResourceStore;
  import org.apache.kylin.common.util.ByteArray;
  import org.apache.kylin.common.util.Bytes;
@@@ -80,51 -80,50 +81,54 @@@ public class CubeStatsReader 
      final CuboidScheduler cuboidScheduler;
  
      public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) 
throws IOException {
+         this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
+     }
+ 
+     /**
+      * @param cuboidScheduler if it's null, part of it's functions will not 
be supported
+      */
+     public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler 
cuboidScheduler, KylinConfig kylinConfig)
+             throws IOException {
          ResourceStore store = ResourceStore.getStore(kylinConfig);
-         cuboidScheduler = cubeSegment.getCuboidScheduler();
          String statsKey = cubeSegment.getStatisticsResourcePath();
--        File tmpSeqFile = 
writeTmpSeqFile(store.getResource(statsKey).inputStream);
-         Reader reader = null;
- 
-         try {
-             Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
- 
-             Path path = new Path(HadoopUtil.fixWindowsPath("file://" + 
tmpSeqFile.getAbsolutePath()));
-             Option seqInput = SequenceFile.Reader.file(path);
-             reader = new SequenceFile.Reader(hadoopConf, seqInput);
- 
-             int percentage = 100;
-             int mapperNumber = 0;
-             double mapperOverlapRatio = 0;
-             Map<Long, HLLCounter> counterMap = Maps.newHashMap();
- 
-             LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
-             BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
-             while (reader.next(key, value)) {
-                 if (key.get() == 0L) {
-                     percentage = Bytes.toInt(value.getBytes());
-                 } else if (key.get() == -1) {
-                     mapperOverlapRatio = Bytes.toDouble(value.getBytes());
-                 } else if (key.get() == -2) {
-                     mapperNumber = Bytes.toInt(value.getBytes());
-                 } else if (key.get() > 0) {
-                     HLLCounter hll = new 
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
-                     ByteArray byteArray = new ByteArray(value.getBytes());
-                     hll.readRegisters(byteArray.asBuffer());
-                     counterMap.put(key.get(), hll);
-                 }
-             }
- 
-             this.seg = cubeSegment;
-             this.samplingPercentage = percentage;
-             this.mapperNumberOfFirstBuild = mapperNumber;
-             this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio;
-             this.cuboidRowEstimatesHLL = counterMap;
++        RawResource resource = store.getResource(statsKey);
++        if (resource == null)
++            throw new IllegalStateException("Missing resource at " + 
statsKey);
++        
++        File tmpSeqFile = writeTmpSeqFile(resource.inputStream);
+         Path path = new Path(HadoopUtil.fixWindowsPath("file://" + 
tmpSeqFile.getAbsolutePath()));
+ 
+         CubeStatsResult cubeStatsResult = new CubeStatsResult(path, 
kylinConfig.getCubeStatsHLLPrecision());
+         tmpSeqFile.delete();
+ 
+         this.seg = cubeSegment;
+         this.cuboidScheduler = cuboidScheduler;
+         this.samplingPercentage = cubeStatsResult.getPercentage();
+         this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
+         this.mapperOverlapRatioOfFirstBuild = 
cubeStatsResult.getMapperOverlapRatio();
+         this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+     }
  
-         } finally {
-             IOUtils.closeStream(reader);
-             tmpSeqFile.delete();
-         }
+     /**
+      * Read statistics from
+      * @param path
+      * rather than
+      * @param cubeSegment
+      *
+      * Since the statistics are from
+      * @param path
+      * cuboid scheduler should be provided by default
+      */
+     public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler 
cuboidScheduler, KylinConfig kylinConfig, Path path)
+             throws IOException {
+         CubeStatsResult cubeStatsResult = new CubeStatsResult(path, 
kylinConfig.getCubeStatsHLLPrecision());
+ 
+         this.seg = cubeSegment;
+         this.cuboidScheduler = cuboidScheduler;
+         this.samplingPercentage = cubeStatsResult.getPercentage();
+         this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
+         this.mapperOverlapRatioOfFirstBuild = 
cubeStatsResult.getMapperOverlapRatio();
+         this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
      }
  
      private File writeTmpSeqFile(InputStream inputStream) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
index 0000000,1809ff0..0e56287
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
@@@ -1,0 -1,54 +1,60 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.engine.mr.common;
+ 
+ import java.io.IOException;
+ import java.util.Comparator;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.cube.cuboid.Cuboid;
+ import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+ import org.apache.kylin.cube.cuboid.CuboidScheduler;
+ import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
+ 
+ import com.google.common.collect.Lists;
+ 
+ public class CuboidSchedulerUtil {
+ 
+     public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment 
segment, String cuboidModeName) {
 -        return getCuboidSchedulerByMode(segment, 
CuboidModeEnum.getByModeName(cuboidModeName));
++        if (cuboidModeName == null)
++            return segment.getCuboidScheduler();
++        else
++            return getCuboidSchedulerByMode(segment, 
CuboidModeEnum.getByModeName(cuboidModeName));
+     }
+ 
+     public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment 
segment, CuboidModeEnum cuboidMode) {
 -        return getCuboidScheduler(segment, 
segment.getCubeInstance().getCuboidsByMode(cuboidMode));
++        if (cuboidMode == CuboidModeEnum.CURRENT || cuboidMode == null)
++            return segment.getCuboidScheduler();
++        else
++            return getCuboidScheduler(segment, 
segment.getCubeInstance().getCuboidsByMode(cuboidMode));
+     }
+ 
+     public static CuboidScheduler getCuboidScheduler(CubeSegment segment, 
Set<Long> cuboidSet) {
+         try {
+             Map<Long, Long> cuboidsWithRowCnt = 
CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment);
+             Comparator<Long> comparator = cuboidsWithRowCnt == null ? 
Cuboid.cuboidSelectComparator
+                     : new 
TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt);
+             return new TreeCuboidScheduler(segment.getCubeDesc(), 
Lists.newArrayList(cuboidSet), comparator);
+         } catch (IOException e) {
+             throw new RuntimeException("Fail to cube stats for segment" + 
segment + " due to " + e);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index ad13425,a230517..ad8b954
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@@ -59,23 -67,13 +70,17 @@@ public class JobInfoConverter 
              return null;
          }
  
 +        CubingJob cubeJob = (CubingJob) job;
- 
 +        CubeInstance cube = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
 +                
.getCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
 +
-         Output output = outputs.get(job.getId());
          final JobInstance result = new JobInstance();
          result.setName(job.getName());
-         if (cube != null) {
-             result.setRelatedCube(cube.getDisplayName());
-         } else {
-             
result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
-         }
 -        
result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
 -        
result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
++        result.setRelatedCube(cube != null ? cube.getDisplayName() : 
CubingExecutableUtil.getCubeName(cubeJob.getParams()));
 +        
result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
          result.setLastModified(output.getLastModified());
-         result.setSubmitter(cubeJob.getSubmitter());
-         result.setUuid(cubeJob.getId());
+         result.setSubmitter(job.getSubmitter());
+         result.setUuid(job.getId());
          result.setType(CubeBuildTypeEnum.BUILD);
          result.setStatus(parseToJobStatus(output.getState()));
          result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, 
CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index 0000000,b249f12..8fc26b4
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@@ -1,0 -1,131 +1,132 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *  
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *  
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.kylin.engine.mr.common;
+ 
+ import java.io.IOException;
+ import java.util.Map;
+ 
+ import org.apache.hadoop.mapreduce.Reducer;
+ import org.apache.kylin.common.KylinConfig;
++import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.cube.cuboid.CuboidScheduler;
+ import org.apache.kylin.cube.model.CubeDesc;
+ import org.apache.kylin.job.exception.JobException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class MapReduceUtil {
+ 
+     private static final Logger logger = 
LoggerFactory.getLogger(MapReduceUtil.class);
+ 
+     /**
+      * @return reducer number for calculating hll
+      */
 -    public static int getHLLShardBase(CubeSegment segment) {
 -        int nCuboids = segment.getCuboidScheduler().getAllCuboidIds().size();
 -        int shardBase = (nCuboids - 1) / 
segment.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
++    public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
++        int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
++        int shardBase = (nCuboids - 1) / 
cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
+ 
 -        int hllMaxReducerNumber = 
segment.getConfig().getHadoopJobHLLMaxReducerNumber();
++        int hllMaxReducerNumber = 
cube.getConfig().getHadoopJobHLLMaxReducerNumber();
+         if (shardBase > hllMaxReducerNumber) {
+             shardBase = hllMaxReducerNumber;
+         }
+         return shardBase;
+     }
+ 
+     /**
+      * @param cuboidScheduler specified can provide more flexibility
+      * */
+     public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, 
CuboidScheduler cuboidScheduler,
+             double totalMapInputMB, int level)
+             throws ClassNotFoundException, IOException, InterruptedException, 
JobException {
+         CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+         KylinConfig kylinConfig = cubeDesc.getConfig();
+ 
+         double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
+         double reduceCountRatio = 
kylinConfig.getDefaultHadoopJobReducerCountRatio();
+         logger.info("Having per reduce MB " + perReduceInputMB + ", reduce 
count ratio " + reduceCountRatio + ", level "
+                 + level);
+ 
+         CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
cuboidScheduler, kylinConfig);
+ 
+         double parentLayerSizeEst, currentLayerSizeEst, 
adjustedCurrentLayerSizeEst;
+ 
+         if (level == -1) {
+             //merge case
+             double estimatedSize = cubeStatsReader.estimateCubeSize();
+             adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? 
totalMapInputMB : estimatedSize;
+             logger.debug("estimated size {}, input size {}, 
adjustedCurrentLayerSizeEst: {}", estimatedSize,
+                     totalMapInputMB, adjustedCurrentLayerSizeEst);
+         } else if (level == 0) {
+             //base cuboid case TODO: the estimation could be very WRONG 
because it has no correction
+             adjustedCurrentLayerSizeEst = 
cubeStatsReader.estimateLayerSize(0);
+             logger.debug("adjustedCurrentLayerSizeEst: {}", 
adjustedCurrentLayerSizeEst);
+         } else {
+             parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+             currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+             adjustedCurrentLayerSizeEst = totalMapInputMB / 
parentLayerSizeEst * currentLayerSizeEst;
+             logger.debug(
+                     "totalMapInputMB: {}, parentLayerSizeEst: {}, 
currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+                     totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, 
adjustedCurrentLayerSizeEst);
+         }
+ 
+         // number of reduce tasks
+         int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / 
perReduceInputMB * reduceCountRatio + 0.99);
+ 
+         // adjust reducer number for cube which has DISTINCT_COUNT measures 
for better performance
+         if (cubeDesc.hasMemoryHungryMeasures()) {
+             logger.debug("Multiply reducer num by 4 to boost performance for 
memory hungry measures");
+             numReduceTasks = numReduceTasks * 4;
+         }
+ 
+         // at least 1 reducer by default
+         numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
+         // no more than 500 reducer by default
+         numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
+ 
+         return numReduceTasks;
+     }
+ 
+     public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, 
CuboidScheduler cuboidScheduler)
+             throws IOException {
+         KylinConfig kylinConfig = cubeSeg.getConfig();
+ 
+         Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, 
cuboidScheduler, kylinConfig).getCuboidSizeMap();
+         double totalSizeInM = 0;
+         for (Double cuboidSize : cubeSizeMap.values()) {
+             totalSizeInM += cuboidSize;
+         }
+ 
+         double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
+         double reduceCountRatio = 
kylinConfig.getDefaultHadoopJobReducerCountRatio();
+ 
+         // number of reduce tasks
+         int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB 
* reduceCountRatio);
+ 
+         // at least 1 reducer by default
+         numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), 
numReduceTasks);
+         // no more than 500 reducer by default
+         numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), 
numReduceTasks);
+ 
+         logger.info("Having total map input MB " + Math.round(totalSizeInM));
+         logger.info("Having per reduce MB " + perReduceInputMB);
+         logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + 
numReduceTasks);
+         return numReduceTasks;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index 0f7281f,4efcb96..09db8e9
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@@ -107,8 -101,30 +101,30 @@@ public class StatisticsDecisionUtil 
              return;
          }
  
+         CubeInstance cube = segment.getCubeInstance();
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setCuboids(recommendCuboidsWithStats);
 -        CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder);
 +        CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite());
 +        update.setCuboids(recommendCuboidsWithStats);
 +        CubeManager.getInstance(cube.getConfig()).updateCube(update);
      }
+ 
+     public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) {
+         CubeInstance cube = segment.getCubeInstance();
+         if (!cube.getConfig().isCubePlannerEnabled())
+             return false;
+ 
+         if (cube.getSegments(SegmentStatusEnum.READY_PENDING).size() > 0) {
+             logger.info("Has read pending segments and will not enable cube 
planner.");
+             return false;
+         }
+         List<CubeSegment> readySegments = 
cube.getSegments(SegmentStatusEnum.READY);
+         List<CubeSegment> newSegments = 
cube.getSegments(SegmentStatusEnum.NEW);
+         if (newSegments.size() <= 1 && //
+                 (readySegments.size() == 0 || //
+                         
(cube.getConfig().isCubePlannerEnabledForExistingCube() && readySegments.size() 
== 1
+                                 && 
readySegments.get(0).getSegRange().equals(segment.getSegRange())))) {
+             return true;
+         } else {
+             return false;
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index 0000000,8f64272..f3bdabd
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@@ -1,0 -1,120 +1,120 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.engine.mr.steps;
+ 
+ import java.io.IOException;
+ 
+ import org.apache.commons.cli.Options;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.Job;
+ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.cube.CubeManager;
+ import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.apache.kylin.engine.mr.common.MapReduceUtil;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
+ 
+     private static final Logger logger = 
LoggerFactory.getLogger(CalculateStatsFromBaseCuboidJob.class);
+ 
+     @Override
+     public int run(String[] args) throws Exception {
+         Options options = new Options();
+ 
+         try {
+             options.addOption(OPTION_JOB_NAME);
+             options.addOption(OPTION_CUBE_NAME);
+             options.addOption(OPTION_SEGMENT_ID);
+             options.addOption(OPTION_INPUT_PATH);
+             options.addOption(OPTION_OUTPUT_PATH);
+             options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+             options.addOption(OPTION_CUBOID_MODE);
+             parseOptions(options, args);
+ 
+             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+             String cubeName = getOptionValue(OPTION_CUBE_NAME);
+             String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+             Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+             String statistics_sampling_percent = 
getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+             String cuboidMode = getOptionValue(OPTION_CUBOID_MODE);
+ 
+             CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+             CubeInstance cube = cubeMgr.getCube(cubeName);
+             CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+ 
+             job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, 
cuboidMode);
+             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, 
cubeName);
+             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
+             
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, 
statistics_sampling_percent);
+             logger.info("Starting: " + job.getJobName());
+ 
+             setJobClasspath(job, cube.getConfig());
+ 
+             setupMapper(input);
+             setupReducer(output, cubeSegment);
+ 
+             attachSegmentMetadataWithDict(cubeSegment, 
job.getConfiguration());
+ 
+             return waitForCompletion(job);
+ 
+         } catch (Exception e) {
+             logger.error("error in CalculateStatsFromBaseCuboidJob", e);
+             printUsage(options);
+             throw e;
+         } finally {
+             if (job != null)
+                 cleanupTempConfFile(job.getConfiguration());
+         }
+     }
+ 
+     private void setupMapper(Path input) throws IOException {
+         FileInputFormat.setInputPaths(job, input);
+         job.setMapperClass(CalculateStatsFromBaseCuboidMapper.class);
+         job.setInputFormatClass(SequenceFileInputFormat.class);
+         job.setMapOutputKeyClass(Text.class);
+         job.setMapOutputValueClass(Text.class);
+     }
+ 
+     private void setupReducer(Path output, CubeSegment cubeSeg) throws 
IOException {
 -        int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg);
 -        job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, 
hllShardBase);
++        int hllShardBase = 
MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
++        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, 
hllShardBase);
+ 
+         job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
+         job.setOutputFormatClass(SequenceFileOutputFormat.class);
+         job.setOutputKeyClass(NullWritable.class);
+         job.setOutputValueClass(Text.class);
+         job.setNumReduceTasks(hllShardBase);
+ 
+         FileOutputFormat.setOutputPath(job, output);
+         job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, 
output.toString());
+ 
+         deletePath(job.getConfiguration(), output);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
index 0000000,70db21b..8b84844
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java
@@@ -1,0 -1,59 +1,59 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.kylin.engine.mr.steps;
+ 
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.Partitioner;
+ import org.apache.kylin.common.util.Bytes;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  */
+ public class CalculateStatsFromBaseCuboidPartitioner extends 
Partitioner<Text, Text> implements Configurable {
+     private static final Logger logger = 
LoggerFactory.getLogger(CalculateStatsFromBaseCuboidPartitioner.class);
+ 
+     private Configuration conf;
+     private int hllShardBase = 1;
+ 
+     @Override
+     public int getPartition(Text key, Text value, int numReduceTasks) {
+         Long cuboidId = Bytes.toLong(key.getBytes());
+         int shard = cuboidId.hashCode() % hllShardBase;
+         if (shard < 0) {
+             shard += hllShardBase;
+         }
+         return numReduceTasks - shard - 1;
+     }
+ 
+     @Override
+     public void setConf(Configuration conf) {
+         this.conf = conf;
 -        hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1);
++        hllShardBase = conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1);
+         logger.info("shard base for hll is " + hllShardBase);
+     }
+ 
+     @Override
+     public Configuration getConf() {
+         return conf;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 98ebbb4,d64d300..a457677
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@@ -72,7 -77,16 +77,16 @@@ public class CreateDictionaryJob extend
  
              @Override
              public Dictionary<String> getDictionary(TblColRef col) throws 
IOException {
-                 Path colDir = new Path(factColumnsInputPath, 
col.getIdentity());
+                 CubeManager cubeManager = CubeManager.getInstance(config);
+                 CubeInstance cube = cubeManager.getCube(cubeName);
 -                List<TblColRef> uhcColumns = 
CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor());
++                List<TblColRef> uhcColumns = 
cube.getDescriptor().getAllUHCColumns();
+ 
+                 Path colDir;
+                 if (uhcColumns.contains(col)) {
+                     colDir = new Path(dictPath, col.getIdentity());
+                 } else {
+                     colDir = new Path(factColumnsInputPath, 
col.getIdentity());
+                 }
                  FileSystem fs = HadoopUtil.getWorkingFileSystem();
  
                  Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, 
col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
index a1bba6e,e06077a..b48b19b
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@@ -38,7 -38,7 +38,8 @@@ import com.google.common.collect.Lists
  public class CubingExecutableUtil {
  
      public static final String CUBE_NAME = "cubeName";
 +    public static final String DISPALY_NAME = "displayName";
+     public static final String SEGMENT_NAME = "segmentName";
      public static final String SEGMENT_ID = "segmentId";
      public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
      public static final String STATISTICS_PATH = "statisticsPath";

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 5fcfe42,141ca99..9bede82
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@@ -18,25 -18,52 +18,62 @@@
  
  package org.apache.kylin.engine.mr.steps;
  
++import java.io.IOException;
++
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Partitioner;
++import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.Bytes;
  import org.apache.kylin.common.util.BytesUtil;
++import org.apache.kylin.cube.CubeInstance;
++import org.apache.kylin.cube.CubeManager;
++import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  /**
   */
- public class FactDistinctColumnPartitioner extends 
Partitioner<SelfDefineSortableKey, Text> {
+ public class FactDistinctColumnPartitioner extends 
Partitioner<SelfDefineSortableKey, Text> implements Configurable {
+     private static final Logger logger = 
LoggerFactory.getLogger(FactDistinctColumnPartitioner.class);
+ 
+     private Configuration conf;
 -    private int hllShardBase = 1;
++    private FactDistinctColumnsReducerMapping reducerMapping;
++
++    @Override
++    public void setConf(Configuration conf) {
++        this.conf = conf;
++
++        KylinConfig config;
++        try {
++            config = AbstractHadoopJob.loadKylinPropsAndMetadata();
++        } catch (IOException e) {
++            throw new RuntimeException(e);
++        }
++        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
++        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
++
++        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
++                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
++    }
  
      @Override
      public int getPartition(SelfDefineSortableKey skey, Text value, int 
numReduceTasks) {
          Text key = skey.getText();
--        if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) {
-             // the last reducer is for merging hll
-             return numReduceTasks - 1;
-         } else if (key.getBytes()[0] == 
FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) {
-             // the last but one reducer is for partition col
-             return numReduceTasks - 2;
 -            // the last $hllShard reducers are for merging hll
++        if (key.getBytes()[0] == 
FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+             Long cuboidId = Bytes.toLong(key.getBytes(), 1, 
Bytes.SIZEOF_LONG);
 -            int shard = cuboidId.hashCode() % hllShardBase;
 -            if (shard < 0) {
 -                shard += hllShardBase;
 -            }
 -            return numReduceTasks - shard - 1;
 -        } else if (key.getBytes()[0] == 
FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) {
 -            // the last but one reducer is for partition col
 -            return numReduceTasks - hllShardBase - 1;
++            return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
++        } else if (key.getBytes()[0] == 
FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) {
++            return reducerMapping.getReducerIdForDatePartitionColumn();
          } else {
              return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
          }
      }
+ 
+     @Override
 -    public void setConf(Configuration conf) {
 -        this.conf = conf;
 -        hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1);
 -        logger.info("shard base for hll is " + hllShardBase);
 -    }
 -
 -    @Override
+     public Configuration getConf() {
+         return conf;
+     }
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index ac8ce26,5200950..cc4f260
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@@ -19,7 -19,7 +19,6 @@@
  package org.apache.kylin.engine.mr.steps;
  
  import java.io.IOException;
--import java.util.Set;
  
  import org.apache.commons.cli.Options;
  import org.apache.hadoop.fs.Path;
@@@ -42,7 -42,8 +41,6 @@@ import org.apache.kylin.engine.mr.IMRIn
  import org.apache.kylin.engine.mr.MRUtil;
  import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
  import org.apache.kylin.engine.mr.common.BatchConstants;
 -import org.apache.kylin.engine.mr.common.MapReduceUtil;
--import org.apache.kylin.metadata.model.TblColRef;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -79,21 -82,21 +77,6 @@@ public class FactDistinctColumnsJob ext
              // add metadata to distributed cache
              CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
              CubeInstance cube = cubeMgr.getCube(cubeName);
--            Set<TblColRef> columnsNeedDict = 
cube.getDescriptor().getAllColumnsNeedDictionaryBuilt();
--
--            int reducerCount = columnsNeedDict.size();
--            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
--
--            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
--            for (int index : uhcIndex) {
--                if (index == 1) {
--                    reducerCount += uhcReducerCount - 1;
--                }
--            }
--
--            if (reducerCount > 255) {
--                throw new IllegalArgumentException("The max reducer number 
for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 
'kylin.engine.mr.uhc-reducer-count'");
--            }
  
              job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, 
cubeName);
              job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
@@@ -114,7 -118,7 +97,7 @@@
              }
  
              setupMapper(segment);
-             setupReducer(output, reducerCount + 2);
 -            setupReducer(output, segment, statistics_enabled, reducerCount);
++            setupReducer(output, segment);
  
              attachCubeMetadata(cube, job.getConfiguration());
  
@@@ -143,22 -147,29 +126,32 @@@
          job.setMapOutputValueClass(Text.class);
      }
  
-     private void setupReducer(Path output, int numberOfReducers) throws 
IOException {
 -    private void setupReducer(Path output, CubeSegment cubeSeg, String 
statistics_enabled, int reducerCount)
++    private void setupReducer(Path output, CubeSegment cubeSeg)
+             throws IOException {
 -        int numberOfReducers = reducerCount;
 -        if ("true".equalsIgnoreCase(statistics_enabled)) {
 -            int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg);
 -            job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, 
hllShardBase);
 -            numberOfReducers += (1 + hllShardBase);
++        FactDistinctColumnsReducerMapping reducerMapping = new 
FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
++        int numberOfReducers = reducerMapping.getTotalReducerNum();
++        if (numberOfReducers > 250) {
++            throw new IllegalArgumentException(
++                    "The max reducer number for FactDistinctColumnsJob is 
250, but now it is "
++                            + numberOfReducers
++                            + ", decrease 
'kylin.engine.mr.uhc-reducer-count'");
+         }
++
          job.setReducerClass(FactDistinctColumnsReducer.class);
          job.setPartitionerClass(FactDistinctColumnPartitioner.class);
          job.setNumReduceTasks(numberOfReducers);
++        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, 
reducerMapping.getCuboidRowCounterReducerNum());
  
--        //make each reducer output to respective dir
++        // make each reducer output to respective dir
          MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, 
SequenceFileOutputFormat.class, NullWritable.class, Text.class);
          MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, 
SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
          MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, 
LongWritable.class, BytesWritable.class);
          MultipleOutputs.addNamedOutput(job, 
BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, 
NullWritable.class, LongWritable.class);
  
--
          FileOutputFormat.setOutputPath(job, output);
          job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, 
output.toString());
  
--        //prevent to create zero-sized default output
++        // prevent to create zero-sized default output
          LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
  
          deletePath(job.getConfiguration(), output);

Reply via email to