This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4850dacece8a2e90f4333b50e2e8304635a730a2
Author: kyotoYaho <nju_y...@apache.org>
AuthorDate: Tue Dec 18 18:54:51 2018 +0800

    KYLIN-3540 estimate the row counts of source cuboids which are not built & 
remove mandatory cuboids recommendation
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  18 +-
 .../cube/cuboid/algorithm/BPUSCalculator.java      |   2 +-
 .../cube/cuboid/algorithm/CuboidRecommender.java   |  25 ++-
 .../kylin/cube/cuboid/algorithm/CuboidStats.java   | 131 ++++++------
 .../cube/cuboid/algorithm/CuboidStatsUtil.java     | 226 ++++++++++++++++-----
 .../cube/cuboid/algorithm/CuboidStatsUtilTest.java | 162 ++++++++++++---
 .../engine/mr/common/CuboidRecommenderUtil.java    |  44 +++-
 .../engine/mr/common/CuboidStatsReaderUtil.java    |  23 +++
 .../kylin/rest/controller/CubeController.java      |   4 +-
 .../org/apache/kylin/rest/service/CubeService.java |  26 ++-
 10 files changed, 495 insertions(+), 166 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f67f6b3..b63062e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -316,8 +316,7 @@ abstract public class KylinConfigBase implements 
Serializable {
     public String getReadHdfsWorkingDirectory() {
         if (StringUtils.isNotEmpty(getHBaseClusterFs())) {
             Path workingDir = new Path(getHdfsWorkingDirectory());
-            return new Path(getHBaseClusterFs(), 
Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
-                    + "/";
+            return new Path(getHBaseClusterFs(), 
Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
         }
 
         return getHdfsWorkingDirectory();
@@ -644,8 +643,12 @@ abstract public class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.cube.cubeplanner.recommend-cache-max-size", 
"200"));
     }
 
-    public long getCubePlannerMandatoryRollUpThreshold() {
-        return 
Long.parseLong(getOptional("kylin.cube.cubeplanner.mandatory-rollup-threshold", 
"1000"));
+    public double getCubePlannerQueryUncertaintyRatio() {
+        return 
Double.parseDouble(getOptional("kylin.cube.cubeplanner.query-uncertainty-ratio",
 "0.1"));
+    }
+
+    public double getCubePlannerBPUSMinBenefitRatio() {
+        return 
Double.parseDouble(getOptional("kylin.cube.cubeplanner.bpus-min-benefit-ratio", 
"0.01"));
     }
 
     public int getCubePlannerAgreedyAlgorithmAutoThreshold() {
@@ -1910,12 +1913,13 @@ abstract public class KylinConfigBase implements 
Serializable {
     }
 
     public int getSmallCellMetadataWarningThreshold() {
-        return 
Integer.parseInt(getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
-                String.valueOf(100 << 20))); //100mb
+        return Integer.parseInt(
+                
getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", 
String.valueOf(100 << 20))); //100mb
     }
 
     public int getSmallCellMetadataErrorThreshold() {
-        return 
Integer.parseInt(getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold",
 String.valueOf(1 << 30))); // 1gb
+        return Integer.parseInt(
+                
getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", 
String.valueOf(1 << 30))); // 1gb
     }
 
     public int getJdbcResourceStoreMaxCellSize() {
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java
index 6316858..39c52da 100755
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java
@@ -142,7 +142,7 @@ public class BPUSCalculator implements BenefitPolicy {
     }
 
     public double getMinBenefitRatio() {
-        return 0.01;
+        return cuboidStats.getBpusMinBenefitRatio();
     }
 
     @Override
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
index baacb51..0e6a844 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
@@ -154,12 +154,11 @@ public class CuboidRecommender {
 
         Map<Long, Long> recommendCuboidsWithStats = Maps.newLinkedHashMap();
         for (Long cuboid : recommendCuboidList) {
-            if (cuboid.equals(cuboidStats.getBaseCuboid())) {
-                recommendCuboidsWithStats.put(cuboid, 
cuboidStats.getCuboidCount(cuboid));
-            } else if 
(cuboidStats.getAllCuboidsForSelection().contains(cuboid)) {
-                recommendCuboidsWithStats.put(cuboid, 
cuboidStats.getCuboidCount(cuboid));
+            if (cuboid == 0L) {
+                // for zero cuboid, just simply recommend the cheapest cuboid.
+                handleCuboidZeroRecommend(cuboidStats, 
recommendCuboidsWithStats);
             } else {
-                recommendCuboidsWithStats.put(cuboid, -1L);
+                recommendCuboidsWithStats.put(cuboid, 
cuboidStats.getCuboidCount(cuboid));
             }
         }
 
@@ -168,4 +167,20 @@ public class CuboidRecommender {
         }
         return recommendCuboidsWithStats;
     }
+
+    private void handleCuboidZeroRecommend(CuboidStats cuboidStats, Map<Long, 
Long> recommendCuboidsWithStats) {
+        Map<Long, Long> statistics = cuboidStats.getStatistics();
+        Long cheapestCuboid = null;
+        Long cheapestCuboidCount = Long.MAX_VALUE;
+        for (Map.Entry<Long, Long> cuboidStatsEntry : statistics.entrySet()) {
+            if (cuboidStatsEntry.getValue() < cheapestCuboidCount) {
+                cheapestCuboid = cuboidStatsEntry.getKey();
+                cheapestCuboidCount = cuboidStatsEntry.getValue();
+            }
+        }
+        if (cheapestCuboid != null) {
+            logger.info("recommend cuboid:{} instead of cuboid zero", 
cheapestCuboid);
+            recommendCuboidsWithStats.put(cheapestCuboid, cheapestCuboidCount);
+        }
+    }
 }
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java
index 78a6c5b..c22ab9a 100755
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStats.java
@@ -18,57 +18,72 @@
 
 package org.apache.kylin.cube.cuboid.algorithm;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 public class CuboidStats {
     private static final Logger logger = 
LoggerFactory.getLogger(CuboidStats.class);
 
+    static final double WEIGHT_FOR_UN_QUERY = 0.2;
+    static final double BPUS_MIN_BENEFIT_RATIO = 0.001;
+
     public static class Builder {
 
         private static final long THRESHOLD_ROLL_UP_FOR_MANDATORY = 1000L;
 
         // Required parameters
         private String key;
+        private Long nTotalCuboids;
         private Long baseCuboid;
+        private double queryUncertaintyRatio = WEIGHT_FOR_UN_QUERY;
+        private double bpusMinBenefitRatio = BPUS_MIN_BENEFIT_RATIO;
         private Map<Long, Long> statistics;
         private Map<Long, Double> size;
 
         // Optional parameters - initialized to default values
         private Set<Long> mandatoryCuboids = null;
         //// These two properties are for generating mandatory cuboids
-        private Map<Long, Map<Long, Long>> rollingUpCountSourceMap = null;
-        private Long rollUpThresholdForMandatory = null;
+        private Map<Long, Map<Long, Pair<Long, Long>>> rollingUpCountSourceMap 
= null;
 
         private Map<Long, Long> hitFrequencyMap = null;
         private Map<Long, Map<Long, Long>> scanCountSourceMap = null;
 
         public Builder(String key, Long baseCuboid, Map<Long, Long> 
statistics, Map<Long, Double> size) {
+            this(key, baseCuboid, baseCuboid, statistics, size);
+        }
+
+        public Builder(String key, Long nTotalCuboids, Long baseCuboid, 
Map<Long, Long> statistics,
+                Map<Long, Double> size) {
             this.key = key;
+            this.nTotalCuboids = nTotalCuboids;
             this.baseCuboid = baseCuboid;
             this.statistics = statistics;
             this.size = size;
         }
 
-        public Builder setRollingUpCountSourceMap(Map<Long, Map<Long, Long>> 
rollingUpCountSourceMap) {
-            this.rollingUpCountSourceMap = rollingUpCountSourceMap;
-            this.rollUpThresholdForMandatory = THRESHOLD_ROLL_UP_FOR_MANDATORY;
+        public Builder setQueryUncertaintyRatio(double queryUncertaintyRatio) {
+            this.queryUncertaintyRatio = queryUncertaintyRatio;
             return this;
         }
 
-        public Builder setRollingUpCountSourceMap(Map<Long, Map<Long, Long>> 
rollingUpCountSourceMap,
-                                                  long 
rollUpThresholdForMandatory) {
+        public Builder setBPUSMinBenefitRatio(double bpusMinBenefitRatio) {
+            this.bpusMinBenefitRatio = bpusMinBenefitRatio;
+            return this;
+        }
+
+        public Builder setRollingUpCountSourceMap(Map<Long, Map<Long, 
Pair<Long, Long>>> rollingUpCountSourceMap) {
             this.rollingUpCountSourceMap = rollingUpCountSourceMap;
-            this.rollUpThresholdForMandatory = rollUpThresholdForMandatory;
             return this;
         }
 
@@ -87,6 +102,10 @@ public class CuboidStats {
             return this;
         }
 
+        public Map<Long, Double> estimateCuboidsSize(Map<Long, Long> 
statistics) {
+            return null;
+        }
+
         public CuboidStats build() {
             Preconditions.checkNotNull(key, "key should not be null");
             Preconditions.checkNotNull(baseCuboid, "baseCuboid should not be 
null");
@@ -96,23 +115,36 @@ public class CuboidStats {
                     "row count should exist for base cuboid " + baseCuboid);
             Preconditions.checkState(statistics.keySet().equals(size.keySet()),
                     "statistics & size should own the same key set");
+            statistics = CuboidStatsUtil.adjustCuboidStats(statistics);
+
+            if (hitFrequencyMap != null && rollingUpCountSourceMap != null) {
+                Map<Long, Double> cuboidHitProbabilityMap = 
CuboidStatsUtil.calculateCuboidHitProbability(
+                        hitFrequencyMap.keySet(), hitFrequencyMap, 
nTotalCuboids, queryUncertaintyRatio);
+                Map<Long, Long> srcCuboidsStats = 
CuboidStatsUtil.generateSourceCuboidStats(statistics,
+                        cuboidHitProbabilityMap, rollingUpCountSourceMap);
+
+                statistics.putAll(srcCuboidsStats);
+
+                Map<Long, Double> estimatedSize = 
estimateCuboidsSize(statistics);
+                if (estimatedSize != null && !estimatedSize.isEmpty()) {
+                    size = Maps.newHashMap(estimatedSize);
+                }
+            }
+
             if (mandatoryCuboids == null) {
                 mandatoryCuboids = Sets.newHashSet();
-            }
-            if (rollingUpCountSourceMap != null) {
-                
mandatoryCuboids.addAll(CuboidStatsUtil.generateMandatoryCuboidSet(statistics, 
hitFrequencyMap,
-                        rollingUpCountSourceMap, rollUpThresholdForMandatory));
+            } else if (!mandatoryCuboids.isEmpty()) {
+                
statistics.putAll(CuboidStatsUtil.complementRowCountForCuboids(statistics, 
mandatoryCuboids));
             }
 
-            return new CuboidStats(key, baseCuboid, mandatoryCuboids, 
statistics, size, hitFrequencyMap,
-                    scanCountSourceMap);
+            return new CuboidStats(key, baseCuboid, queryUncertaintyRatio, 
bpusMinBenefitRatio, mandatoryCuboids,
+                    statistics, size, hitFrequencyMap, scanCountSourceMap);
         }
     }
 
-    private static final double WEIGHT_FOR_UN_QUERY = 0.2;
-
     private String key;
     private long baseCuboid;
+    private double bpusMinBenefitRatio;
     private ImmutableSet<Long> mandatoryCuboidSet;
     private ImmutableSet<Long> selectionCuboidSet;
     private ImmutableMap<Long, Long> cuboidCountMap;
@@ -123,11 +155,13 @@ public class CuboidStats {
     private ImmutableMap<Long, List<Long>> directChildrenCache;
     private Map<Long, Set<Long>> allDescendantsCache;
 
-    private CuboidStats(String key, long baseCuboidId, Set<Long> 
mandatoryCuboids, Map<Long, Long> statistics,
-                        Map<Long, Double> size, Map<Long, Long> 
hitFrequencyMap, Map<Long, Map<Long, Long>> scanCountSourceMap) {
+    private CuboidStats(String key, long baseCuboidId, double 
queryUncertaintyRatio, double bpusMinBenefitRatio,
+            Set<Long> mandatoryCuboids, Map<Long, Long> statistics, Map<Long, 
Double> size,
+            Map<Long, Long> hitFrequencyMap, Map<Long, Map<Long, Long>> 
scanCountSourceMap) {
 
         this.key = key;
         this.baseCuboid = baseCuboidId;
+        this.bpusMinBenefitRatio = bpusMinBenefitRatio;
         /** Initial mandatory cuboids */
         Set<Long> cuboidsForMandatory = Sets.newHashSet(mandatoryCuboids);
         //Always add base cuboid.
@@ -141,44 +175,19 @@ public class CuboidStats {
         cuboidsForSelection.removeAll(cuboidsForMandatory);
 
         //There's no overlap between mandatoryCuboidSet and selectionCuboidSet
-        this.mandatoryCuboidSet = 
ImmutableSet.<Long>builder().addAll(cuboidsForMandatory).build();
-        this.selectionCuboidSet = 
ImmutableSet.<Long>builder().addAll(cuboidsForSelection).build();
+        this.mandatoryCuboidSet = ImmutableSet.<Long> 
builder().addAll(cuboidsForMandatory).build();
+        this.selectionCuboidSet = ImmutableSet.<Long> 
builder().addAll(cuboidsForSelection).build();
         if (selectionCuboidSet.isEmpty()) {
             logger.warn("The selection set should not be empty!!!");
         }
 
-        /** Initialize row count for mandatory cuboids */
-        CuboidStatsUtil.complementRowCountForMandatoryCuboids(statistics, 
baseCuboid, mandatoryCuboidSet);
-
-        this.cuboidCountMap = ImmutableMap.<Long, 
Long>builder().putAll(statistics).build();
-        this.cuboidSizeMap = ImmutableMap.<Long, 
Double>builder().putAll(size).build();
+        this.cuboidCountMap = ImmutableMap.<Long, Long> 
builder().putAll(statistics).build();
+        this.cuboidSizeMap = ImmutableMap.<Long, Double> 
builder().putAll(size).build();
 
         /** Initialize the hit probability for each selection cuboid */
-        Map<Long, Double> tmpCuboidHitProbabilityMap = 
Maps.newHashMapWithExpectedSize(selectionCuboidSet.size());
-        if (hitFrequencyMap != null) {
-            long totalHitFrequency = 0L;
-            for (Map.Entry<Long, Long> hitFrequency : 
hitFrequencyMap.entrySet()) {
-                if (selectionCuboidSet.contains(hitFrequency.getKey())) {
-                    totalHitFrequency += hitFrequency.getValue();
-                }
-            }
-
-            final double unitUncertainProb = WEIGHT_FOR_UN_QUERY / 
selectionCuboidSet.size();
-            for (Long cuboid : selectionCuboidSet) {
-                //Calculate hit probability for each cuboid
-                if (hitFrequencyMap.get(cuboid) != null) {
-                    tmpCuboidHitProbabilityMap.put(cuboid, unitUncertainProb
-                            + (1 - WEIGHT_FOR_UN_QUERY) * 
hitFrequencyMap.get(cuboid) / totalHitFrequency);
-                } else {
-                    tmpCuboidHitProbabilityMap.put(cuboid, unitUncertainProb);
-                }
-            }
-        } else {
-            for (Long cuboid : selectionCuboidSet) {
-                tmpCuboidHitProbabilityMap.put(cuboid, 1.0 / 
selectionCuboidSet.size());
-            }
-        }
-        this.cuboidHitProbabilityMap = ImmutableMap.<Long, 
Double>builder().putAll(tmpCuboidHitProbabilityMap).build();
+        Map<Long, Double> tmpCuboidHitProbabilityMap = 
CuboidStatsUtil.calculateCuboidHitProbability(selectionCuboidSet,
+                hitFrequencyMap, selectionCuboidSet.size(), 
queryUncertaintyRatio);
+        this.cuboidHitProbabilityMap = ImmutableMap.<Long, Double> 
builder().putAll(tmpCuboidHitProbabilityMap).build();
 
         /** Initialize the scan count when query for each selection cuboid + 
one base cuboid */
         Map<Long, Long> tmpCuboidScanCountMap = 
Maps.newHashMapWithExpectedSize(1 + selectionCuboidSet.size());
@@ -186,16 +195,16 @@ public class CuboidStats {
         for (Long cuboid : selectionCuboidSet) {
             tmpCuboidScanCountMap.put(cuboid, getExpScanCount(cuboid, 
statistics, scanCountSourceMap));
         }
-        this.cuboidScanCountMap = ImmutableMap.<Long, 
Long>builder().putAll(tmpCuboidScanCountMap).build();
+        this.cuboidScanCountMap = ImmutableMap.<Long, Long> 
builder().putAll(tmpCuboidScanCountMap).build();
 
-        this.directChildrenCache = ImmutableMap.<Long, List<Long>>builder()
+        this.directChildrenCache = ImmutableMap.<Long, List<Long>> builder()
                 
.putAll(CuboidStatsUtil.createDirectChildrenCache(statistics.keySet())).build();
 
         this.allDescendantsCache = Maps.newConcurrentMap();
     }
 
     private long getExpScanCount(long sourceCuboid, Map<Long, Long> statistics,
-                                 Map<Long, Map<Long, Long>> 
scanCountSourceMap) {
+            Map<Long, Map<Long, Long>> scanCountSourceMap) {
         Preconditions.checkNotNull(statistics.get(sourceCuboid),
                 "The statistics for source cuboid " + sourceCuboid + " does 
not exist!!!");
         if (scanCountSourceMap == null || scanCountSourceMap.get(sourceCuboid) 
== null
@@ -216,6 +225,10 @@ public class CuboidStats {
         }
     }
 
+    public double getBpusMinBenefitRatio() {
+        return bpusMinBenefitRatio;
+    }
+
     public Set<Long> getAllDescendants(long cuboid) {
         Set<Long> allDescendants = Sets.newLinkedHashSet();
         if (selectionCuboidSet.contains(cuboid)) {
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java
index dc3471b..90eafdd 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java
@@ -27,81 +27,208 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.kylin.common.util.Pair;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 public class CuboidStatsUtil {
 
     /**
-     * For generating mandatory cuboids,
-     * a cuboid is mandatory if the expectation of rolling up count exceeds a 
threshold
+     * According to the cuboid hit frequencies and query uncertainty ratio
+     * calculate each cuboid hit probability
+     * @param selectionCuboidSet subset of cuboid domain which needs 
probability
+     * @param nTotalCuboids number of cuboids needs to be considered, mainly 
for each cuboid's uncertainty weight
      * */
-    public static Set<Long> generateMandatoryCuboidSet(Map<Long, Long> 
statistics, Map<Long, Long> hitFrequencyMap,
-            Map<Long, Map<Long, Long>> rollingUpCountSourceMap, final long 
rollUpThresholdForMandatory) {
-        Set<Long> mandatoryCuboidSet = Sets.newHashSet();
-        if (hitFrequencyMap == null || hitFrequencyMap.isEmpty() || 
rollingUpCountSourceMap == null
-                || rollingUpCountSourceMap.isEmpty()) {
-            return mandatoryCuboidSet;
-        }
-        long totalHitFrequency = 0L;
-        for (long hitFrequency : hitFrequencyMap.values()) {
-            totalHitFrequency += hitFrequency;
+    public static Map<Long, Double> calculateCuboidHitProbability(Set<Long> 
selectionCuboidSet,
+            Map<Long, Long> hitFrequencyMap, long nTotalCuboids, double 
queryUncertaintyRatio) {
+        Map<Long, Double> cuboidHitProbabilityMap = 
Maps.newHashMapWithExpectedSize(selectionCuboidSet.size());
+        if (hitFrequencyMap == null || hitFrequencyMap.isEmpty()) {
+            for (Long cuboid : selectionCuboidSet) {
+                cuboidHitProbabilityMap.put(cuboid, 1.0 / nTotalCuboids);
+            }
+        } else {
+            long totalHitFrequency = 0L;
+            for (Map.Entry<Long, Long> hitFrequency : 
hitFrequencyMap.entrySet()) {
+                totalHitFrequency += hitFrequency.getValue();
+            }
+
+            final double unitUncertainProb = queryUncertaintyRatio / 
nTotalCuboids;
+            for (Long cuboid : selectionCuboidSet) {
+                //Calculate hit probability for each cuboid
+                if (hitFrequencyMap.get(cuboid) != null) {
+                    cuboidHitProbabilityMap.put(cuboid, unitUncertainProb
+                            + (1 - queryUncertaintyRatio) * 
hitFrequencyMap.get(cuboid) / totalHitFrequency);
+                } else {
+                    cuboidHitProbabilityMap.put(cuboid, unitUncertainProb);
+                }
+            }
         }
 
-        if (totalHitFrequency == 0) {
-            return mandatoryCuboidSet;
+        return cuboidHitProbabilityMap;
+    }
+
+    /**
+     * @param statistics for cuboid row count
+     * @param rollingUpSourceMap the key of the outer map is source cuboid,
+     *                           the key of the inner map is target cuboid,
+     *                                  if cube is optimized multiple times, 
target cuboid may change
+     *                           the first element of the pair is the rollup 
row count
+     *                           the second element of the pair is the return 
row count
+     * @return source cuboids with estimated row count
+     */
+    public static Map<Long, Long> generateSourceCuboidStats(Map<Long, Long> 
statistics,
+            Map<Long, Double> cuboidHitProbabilityMap, Map<Long, Map<Long, 
Pair<Long, Long>>> rollingUpSourceMap) {
+        Map<Long, Long> srcCuboidsStats = Maps.newHashMap();
+        if (cuboidHitProbabilityMap == null || 
cuboidHitProbabilityMap.isEmpty() || rollingUpSourceMap == null
+                || rollingUpSourceMap.isEmpty()) {
+            return srcCuboidsStats;
         }
 
-        for (Map.Entry<Long, Long> hitFrequency : hitFrequencyMap.entrySet()) {
-            long cuboid = hitFrequency.getKey();
+        for (Long cuboid : cuboidHitProbabilityMap.keySet()) {
+            if (statistics.get(cuboid) != null) {
+                continue;
+            }
+            Map<Long, Pair<Long, Long>> innerRollingUpTargetMap = 
rollingUpSourceMap.get(cuboid);
+            if (innerRollingUpTargetMap == null || 
innerRollingUpTargetMap.isEmpty()) {
+                continue;
+            }
 
-            if (isCuboidMandatory(cuboid, statistics, 
rollingUpCountSourceMap)) {
-                long totalEstScanCount = 0L;
-                for (long estScanCount : 
rollingUpCountSourceMap.get(cuboid).values()) {
-                    totalEstScanCount += estScanCount;
-                }
-                totalEstScanCount /= 
rollingUpCountSourceMap.get(cuboid).size();
-                if ((hitFrequency.getValue() * 1.0 / totalHitFrequency)
-                        * totalEstScanCount >= rollUpThresholdForMandatory) {
-                    mandatoryCuboidSet.add(cuboid);
+            long totalEstRowCount = 0L;
+            int nEffective = 0;
+            boolean ifHasStats = false;
+            // if ifHasStats equals true, then source cuboid row count = (1 - 
rollup ratio) * target cuboid row count
+            //                            else source cuboid row count = 
returned row count collected directly
+            for (Long tgtCuboid : innerRollingUpTargetMap.keySet()) {
+                Pair<Long, Long> rollingupStats = 
innerRollingUpTargetMap.get(tgtCuboid);
+                if (statistics.get(tgtCuboid) != null) {
+                    if (!ifHasStats) {
+                        totalEstRowCount = 0L;
+                        nEffective = 0;
+                        ifHasStats = true;
+                    }
+                    double rollupRatio = calculateRollupRatio(rollingupStats);
+                    totalEstRowCount += (1 - rollupRatio) * 
statistics.get(tgtCuboid);
+                    nEffective++;
+                } else {
+                    if (ifHasStats) {
+                        continue;
+                    }
+                    totalEstRowCount += rollingupStats.getSecond();
+                    nEffective++;
                 }
             }
-        }
-        return mandatoryCuboidSet;
-    }
 
-    private static boolean isCuboidMandatory(Long cuboid, Map<Long, Long> 
statistics, Map<Long, Map<Long, Long>> rollingUpCountSourceMap) {
-        return !statistics.containsKey(cuboid) && 
rollingUpCountSourceMap.containsKey(cuboid) && 
!rollingUpCountSourceMap.get(cuboid).isEmpty();
+            srcCuboidsStats.put(cuboid, totalEstRowCount / nEffective);
+        }
+        srcCuboidsStats.remove(0L);
+        adjustCuboidStats(srcCuboidsStats, statistics);
+        return srcCuboidsStats;
     }
 
     /**
      * Complement row count for mandatory cuboids
      * with its best parent's row count
      * */
-    public static void complementRowCountForMandatoryCuboids(Map<Long, Long> 
statistics, long baseCuboid,
-            Set<Long> mandatoryCuboidSet) {
+    public static Map<Long, Long> complementRowCountForCuboids(final Map<Long, 
Long> statistics, Set<Long> cuboids) {
+        Map<Long, Long> result = 
Maps.newHashMapWithExpectedSize(cuboids.size());
+
         // Sort entries order by row count asc
-        SortedSet<Map.Entry<Long, Long>> sortedStatsSet = new TreeSet<>(
-                new Comparator<Map.Entry<Long, Long>>() {
-                    public int compare(Map.Entry<Long, Long> o1, 
Map.Entry<Long, Long> o2) {
-                        return o1.getValue().compareTo(o2.getValue());
-                    }
-                });
-        //sortedStatsSet.addAll(statistics.entrySet()); KYLIN-3580
-        for(Map.Entry<Long, Long> entry : statistics.entrySet()){
-            sortedStatsSet.add(entry);
-        }
-        for (Long cuboid : mandatoryCuboidSet) {
+        SortedSet<Map.Entry<Long, Long>> sortedStatsSet = new TreeSet<>(new 
Comparator<Map.Entry<Long, Long>>() {
+            public int compare(Map.Entry<Long, Long> o1, Map.Entry<Long, Long> 
o2) {
+                int ret = o1.getValue().compareTo(o2.getValue());
+                return ret == 0 ? o1.getKey().compareTo(o2.getKey()) : ret;
+            }
+        });
+        sortedStatsSet.addAll(statistics.entrySet());
+        for (Long cuboid : cuboids) {
             if (statistics.get(cuboid) == null) {
                 // Get estimate row count for mandatory cuboid
-                long tmpRowCount = -1;
                 for (Map.Entry<Long, Long> entry : sortedStatsSet) {
                     if (isDescendant(cuboid, entry.getKey())) {
-                        tmpRowCount = entry.getValue();
+                        result.put(cuboid, entry.getValue());
+                        break;
+                    }
+                }
+            } else {
+                result.put(cuboid, statistics.get(cuboid));
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * adjust cuboid row count, make sure parent not less than child
+     */
+    public static Map<Long, Long> adjustCuboidStats(Map<Long, Long> 
statistics) {
+        Map<Long, Long> ret = 
Maps.newHashMapWithExpectedSize(statistics.size());
+
+        List<Long> cuboids = Lists.newArrayList(statistics.keySet());
+        Collections.sort(cuboids);
+
+        for (Long cuboid : cuboids) {
+            Long rowCount = statistics.get(cuboid);
+            for (Long childCuboid : ret.keySet()) {
+                if (isDescendant(childCuboid, cuboid)) {
+                    Long childRowCount = ret.get(childCuboid);
+                    if (rowCount < childRowCount) {
+                        rowCount = childRowCount;
+                    }
+                }
+            }
+            ret.put(cuboid, rowCount);
+        }
+
+        return ret;
+    }
+
+    public static void adjustCuboidStats(Map<Long, Long> 
mandatoryCuboidsWithStats, Map<Long, Long> statistics) {
+        List<Long> mandatoryCuboids = 
Lists.newArrayList(mandatoryCuboidsWithStats.keySet());
+        Collections.sort(mandatoryCuboids);
+
+        List<Long> selectedCuboids = Lists.newArrayList(statistics.keySet());
+        Collections.sort(selectedCuboids);
+
+        for (int i = 0; i < mandatoryCuboids.size(); i++) {
+            Long mCuboid = mandatoryCuboids.get(i);
+            if (statistics.get(mCuboid) != null) {
+                mandatoryCuboidsWithStats.put(mCuboid, 
statistics.get(mCuboid));
+                continue;
+            }
+            int k = 0;
+            // Make sure mCuboid's row count larger than its children's row 
count in statistics
+            for (; k < selectedCuboids.size(); k++) {
+                Long sCuboid = selectedCuboids.get(k);
+                if (sCuboid > mCuboid) {
+                    break;
+                }
+                if (isDescendant(sCuboid, mCuboid)) {
+                    Long childRowCount = statistics.get(sCuboid);
+                    if (childRowCount > 
mandatoryCuboidsWithStats.get(mCuboid)) {
+                        mandatoryCuboidsWithStats.put(mCuboid, childRowCount);
+                    }
+                }
+            }
+            // Make sure mCuboid's row count larger than its children's row 
count in mandatoryCuboids
+            for (int j = 0; j < i; j++) {
+                Long cCuboid = mandatoryCuboids.get(j);
+                if (isDescendant(cCuboid, mCuboid)) {
+                    Long childRowCount = 
mandatoryCuboidsWithStats.get(cCuboid);
+                    if (childRowCount > 
mandatoryCuboidsWithStats.get(mCuboid)) {
+                        mandatoryCuboidsWithStats.put(mCuboid, childRowCount);
+                    }
+                }
+            }
+            // Make sure mCuboid's row count lower than its parents' row count 
in statistics
+            for (; k < selectedCuboids.size(); k++) {
+                Long sCuboid = selectedCuboids.get(k);
+                if (isDescendant(mCuboid, sCuboid)) {
+                    Long parentRowCount = statistics.get(sCuboid);
+                    if (parentRowCount < 
mandatoryCuboidsWithStats.get(mCuboid)) {
+                        mandatoryCuboidsWithStats.put(mCuboid, parentRowCount);
                     }
                 }
-                statistics.put(cuboid, tmpRowCount < 0 ? 
statistics.get(baseCuboid) : tmpRowCount);
             }
         }
     }
@@ -207,4 +334,9 @@ public class CuboidStatsUtil {
     public static boolean isDescendant(long cuboidToCheck, long parentCuboid) {
         return (cuboidToCheck & parentCuboid) == cuboidToCheck;
     }
+
+    private static double calculateRollupRatio(Pair<Long, Long> rollupStats) {
+        double rollupInputCount = rollupStats.getFirst() + 
rollupStats.getSecond();
+        return rollupInputCount == 0 ? 0 : 1.0 * rollupStats.getFirst() / 
rollupInputCount;
+    }
 }
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java
 
b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java
index ba8833a..1ab96a2 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtilTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.util.Pair;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,6 +45,9 @@ public class CuboidStatsUtilTest {
      *              /      |     /
      *         00000100  00000010
      * */
+
+    private final static long baseCuboidId = 255L;
+
     private Set<Long> generateCuboidSet() {
         return Sets.newHashSet(255L, 159L, 239L, 50L, 199L, 6L, 4L, 2L);
     }
@@ -53,11 +57,11 @@ public class CuboidStatsUtilTest {
 
         countMap.put(255L, 10000L);
         countMap.put(159L, 10000L);
-        countMap.put(50L, 10000L);
-        countMap.put(199L, 10000L);
-        countMap.put(6L, 10000L);
-        countMap.put(4L, 10000L);
-        countMap.put(2L, 10000L);
+        countMap.put(50L, 800L);
+        countMap.put(199L, 200L);
+        countMap.put(6L, 60L);
+        countMap.put(4L, 40L);
+        countMap.put(2L, 20L);
 
         return countMap;
     }
@@ -67,26 +71,46 @@ public class CuboidStatsUtilTest {
 
         long totalHitFrequency = 10000L;
 
-        hitFrequencyMap.put(239L, (long) (totalHitFrequency * 0.5));
+        hitFrequencyMap.put(239L, (long) (totalHitFrequency * 0.2));
         hitFrequencyMap.put(50L, (long) (totalHitFrequency * 0.2));
-        hitFrequencyMap.put(2L, (long) (totalHitFrequency * 0.25));
+        hitFrequencyMap.put(2L, (long) (totalHitFrequency * 0.24));
         hitFrequencyMap.put(178L, (long) (totalHitFrequency * 0.05));
+        hitFrequencyMap.put(187L, (long) (totalHitFrequency * 0.3));
+        hitFrequencyMap.put(0L, (long) (totalHitFrequency * 0.01));
 
         return hitFrequencyMap;
     }
 
-    private Map<Long, Map<Long, Long>> simulateRollingUpCount() {
-        Map<Long, Map<Long, Long>> rollingUpCountMap = Maps.newLinkedHashMap();
+    private Map<Long, Double> simulateHitProbability(long nCuboids) {
+        Map<Long, Long> hitFrequencyMap = simulateHitFrequency();
+        return 
CuboidStatsUtil.calculateCuboidHitProbability(hitFrequencyMap.keySet(), 
hitFrequencyMap, nCuboids,
+                CuboidStats.WEIGHT_FOR_UN_QUERY);
+    }
+
+    private Map<Long, Map<Long, Pair<Long, Long>>> simulateRollingUpCount() {
+        Map<Long, Map<Long, Pair<Long, Long>>> rollingUpCountMap = 
Maps.newLinkedHashMap();
+
+        rollingUpCountMap.put(239L, new HashMap<Long, Pair<Long, Long>>() {
+            {
+                put(255L, new Pair<>(990L, 10L));
+            }
+        });
+
+        rollingUpCountMap.put(178L, new HashMap<Long, Pair<Long, Long>>() {
+            {
+                put(255L, new Pair<>(4999L, 1L));
+            }
+        });
 
-        rollingUpCountMap.put(239L, new HashMap<Long, Long>() {
+        rollingUpCountMap.put(187L, new HashMap<Long, Pair<Long, Long>>() {
             {
-                put(255L, 4000L);
+                put(251L, new Pair<>(3000L, 1000L));
             }
         });
 
-        rollingUpCountMap.put(178L, new HashMap<Long, Long>() {
+        rollingUpCountMap.put(0L, new HashMap<Long, Pair<Long, Long>>() {
             {
-                put(255L, 5000L);
+                put(2L, new Pair<>(19L, 1L));
             }
         });
 
@@ -101,25 +125,113 @@ public class CuboidStatsUtilTest {
 
     @Test
     public void generateMandatoryCuboidSetTest() {
-        Set<Long> mandatoryCuboidSet = 
CuboidStatsUtil.generateMandatoryCuboidSet(simulateCount(),
-                simulateHitFrequency(), simulateRollingUpCount(), 1000L);
-        Assert.assertTrue(mandatoryCuboidSet.contains(239L));
-        Assert.assertTrue(!mandatoryCuboidSet.contains(178L));
+        Map<Long, Long> srcCuboidSet = 
CuboidStatsUtil.generateSourceCuboidStats(simulateCount(),
+                simulateHitProbability(baseCuboidId), 
simulateRollingUpCount());
+
+        Assert.assertTrue(srcCuboidSet.get(239L) == 200L);
+        Assert.assertTrue(srcCuboidSet.get(187L) == 1000L);
+        Assert.assertTrue(srcCuboidSet.get(178L) == 800L);
+
+        Assert.assertTrue(!srcCuboidSet.containsKey(0L));
     }
 
     @Test
     public void complementRowCountForMandatoryCuboidsTest() {
         Map<Long, Long> countMap = simulateCount();
-        Set<Long> mandatoryCuboidSet = 
CuboidStatsUtil.generateMandatoryCuboidSet(countMap, simulateHitFrequency(),
-                simulateRollingUpCount(), 1000L);
-        for (long mandatoryCuboid : mandatoryCuboidSet) {
+        Map<Long, Long> srcCuboidsStats = 
CuboidStatsUtil.generateSourceCuboidStats(countMap,
+                simulateHitProbability(baseCuboidId), 
simulateRollingUpCount());
+        for (long mandatoryCuboid : srcCuboidsStats.keySet()) {
             Assert.assertNull(countMap.get(mandatoryCuboid));
         }
-        CuboidStatsUtil.complementRowCountForMandatoryCuboids(countMap, 255L, 
mandatoryCuboidSet);
-        for (long mandatoryCuboid : mandatoryCuboidSet) {
-            Assert.assertNotNull(countMap.get(mandatoryCuboid));
-        }
-        Assert.assertTrue(countMap.get(239L) == 10000L);
+        Assert.assertTrue(srcCuboidsStats.get(239L) == 200L);
+
+        Map<Long, Long> mandatoryCuboidsWithStats2 = Maps.newHashMap();
+        mandatoryCuboidsWithStats2.put(215L, countMap.get(255L));
+        mandatoryCuboidsWithStats2.put(34L, countMap.get(50L));
+        Assert.assertEquals(mandatoryCuboidsWithStats2,
+                CuboidStatsUtil.complementRowCountForCuboids(countMap, 
mandatoryCuboidsWithStats2.keySet()));
+    }
+
+    @Test
+    public void testAdjustMandatoryCuboidStats() {
+        Map<Long, Long> statistics = Maps.newHashMap();
+        statistics.put(60160L, 1212L);
+
+        Map<Long, Long> cuboidsWithStats = Maps.newHashMap();
+        cuboidsWithStats.put(65280L, 1423L);
+        cuboidsWithStats.put(63232L, 2584421L);
+        cuboidsWithStats.put(61184L, 132L);
+        cuboidsWithStats.put(57088L, 499L);
+        cuboidsWithStats.put(55040L, 708L);
+        cuboidsWithStats.put(38656L, 36507L);
+
+        Map<Long, Double> cuboidHitProbabilityMap = Maps.newHashMap();
+        cuboidHitProbabilityMap.put(65280L, 0.2);
+        cuboidHitProbabilityMap.put(63232L, 0.16);
+        cuboidHitProbabilityMap.put(61184L, 0.16);
+        cuboidHitProbabilityMap.put(57088L, 0.16);
+        cuboidHitProbabilityMap.put(55040L, 0.16);
+        cuboidHitProbabilityMap.put(38656L, 0.16);
+
+        Map<Long, Long> cuboidsWithStatsExpected = 
Maps.newHashMap(cuboidsWithStats);
+        cuboidsWithStatsExpected.put(65280L, 2584421L);
+        cuboidsWithStatsExpected.put(57088L, 36507L);
+        cuboidsWithStatsExpected.put(55040L, 36507L);
+        cuboidsWithStatsExpected.put(61184L, 1212L);
+
+        CuboidStatsUtil.adjustCuboidStats(cuboidsWithStats, statistics);
+        Assert.assertEquals(cuboidsWithStatsExpected, cuboidsWithStats);
+    }
+
+    /**
+     *                   1111(70)                                           
1111(90)
+     *                 /         \                                        /    
     \
+     *             1011(90)       \                                    
1111(90)      \
+     *               |             \                                     |     
       \
+     *             0011(40)     1110(50)          ==========>          
0011(80)     1110(80)
+     *            /       \   /        \                              /       
\   /        \
+     *        0001(20)    0010(80)    0100(60)                    0001(20)    
0010(80)    0100(60)
+     *
+     *
+     *                                                                         
+
+     *
+     *
+     *                                                                     /   
   \
+     *                                                                    /    
    \
+     *                                                                   /     
   1001(85)
+     *                                                                  /      
     \
+     *                                                                0111(70) 
    1000(100)
+     * */
+    @Test
+    public void testAdjustCuboidStats() {
+        Map<Long, Long> statistics = Maps.newHashMap();
+        statistics.put(1L, 20L);
+        statistics.put(2L, 80L);
+        statistics.put(4L, 60L);
+        statistics.put(3L, 40L);
+        statistics.put(11L, 90L);
+        statistics.put(14L, 50L);
+        statistics.put(15L, 70L);
+
+        Map<Long, Long> cuboidsWithStatsExpected = Maps.newHashMap(statistics);
+        cuboidsWithStatsExpected.put(3L, 80L);
+        cuboidsWithStatsExpected.put(14L, 80L);
+        cuboidsWithStatsExpected.put(15L, 90L);
+
+        statistics = CuboidStatsUtil.adjustCuboidStats(statistics);
+        Assert.assertEquals(cuboidsWithStatsExpected, statistics);
+
+        Map<Long, Long> mandatoryCuboidsWithStats = Maps.newHashMap();
+        mandatoryCuboidsWithStats.put(7L, 70L);
+        mandatoryCuboidsWithStats.put(8L, 100L);
+        mandatoryCuboidsWithStats.put(9L, 85L);
+        CuboidStatsUtil.adjustCuboidStats(mandatoryCuboidsWithStats, 
statistics);
+
+        Map<Long, Long> mandatoryCuboidsWithStatsExpected = Maps.newHashMap();
+        mandatoryCuboidsWithStatsExpected.put(7L, 80L);
+        mandatoryCuboidsWithStatsExpected.put(8L, 80L);
+        mandatoryCuboidsWithStatsExpected.put(9L, 85L);
+        Assert.assertEquals(mandatoryCuboidsWithStatsExpected, 
mandatoryCuboidsWithStats);
     }
 
     @Test
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index 05458b6..2bffe86 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.cuboid.algorithm.CuboidRecommender;
 import org.apache.kylin.cube.cuboid.algorithm.CuboidStats;
 import org.slf4j.Logger;
@@ -65,23 +67,45 @@ public class CuboidRecommenderUtil {
 
     /** Trigger cube planner phase two for optimization */
     public static Map<Long, Long> getRecommendCuboidList(CubeInstance cube, 
Map<Long, Long> hitFrequencyMap,
-            Map<Long, Map<Long, Long>> rollingUpCountSourceMap) throws 
IOException {
+            Map<Long, Map<Long, Pair<Long, Long>>> rollingUpCountSourceMap) 
throws IOException {
 
+        CuboidScheduler cuboidScheduler = cube.getCuboidScheduler();
+        Set<Long> currentCuboids = cuboidScheduler.getAllCuboidIds();
         Pair<Map<Long, Long>, Map<Long, Double>> statsPair = 
CuboidStatsReaderUtil
-                
.readCuboidStatsAndSizeFromCube(cube.getCuboidScheduler().getAllCuboidIds(), 
cube);
+                .readCuboidStatsAndSizeFromCube(currentCuboids, cube);
+        long baseCuboid = cuboidScheduler.getBaseCuboidId();
+        if (statsPair.getFirst().get(baseCuboid) == null || 
statsPair.getFirst().get(baseCuboid) == 0L) {
+            logger.info("Base cuboid count in cuboid statistics is 0.");
+            return null;
+        }
 
+        KylinConfig config = cube.getConfig();
         String key = cube.getName();
-        long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
-        CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, 
statsPair.getFirst(), statsPair.getSecond())
-                
.setHitFrequencyMap(hitFrequencyMap).setRollingUpCountSourceMap(rollingUpCountSourceMap,
-                        
cube.getConfig().getCubePlannerMandatoryRollUpThreshold())
+        double queryUncertaintyRatio = 
config.getCubePlannerQueryUncertaintyRatio();
+        double bpusMinBenefitRatio = 
config.getCubePlannerBPUSMinBenefitRatio();
+        CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, 
statsPair.getFirst(),
+                statsPair.getSecond()) {
+            @Override
+            public Map<Long, Double> estimateCuboidsSize(Map<Long, Long> 
statistics) {
+                try {
+                    return 
CuboidStatsReaderUtil.readCuboidSizeFromCube(statistics, cube);
+                } catch (IOException e) {
+                    logger.warn("Fail to get cuboid size from cube due to ", 
e);
+                    return null;
+                }
+            }
+        }.setQueryUncertaintyRatio(queryUncertaintyRatio) //
+                .setBPUSMinBenefitRatio(bpusMinBenefitRatio) //
+                .setHitFrequencyMap(hitFrequencyMap) //
+                .setRollingUpCountSourceMap(rollingUpCountSourceMap) //
                 .build();
-        return 
CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, 
cube.getConfig());
+        return 
CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, config);
     }
 
     /** For future segment level recommend */
     public static Map<Long, Long> getRecommendCuboidList(CubeSegment segment, 
Map<Long, Long> hitFrequencyMap,
-            Map<Long, Map<Long, Long>> rollingUpCountSourceMap, boolean 
ifForceRecommend) throws IOException {
+            Map<Long, Map<Long, Pair<Long, Long>>> rollingUpCountSourceMap, 
boolean ifForceRecommend)
+            throws IOException {
         if (segment == null) {
             return null;
         }
@@ -103,9 +127,7 @@ public class CuboidRecommenderUtil {
         String key = cube.getName() + "-" + segment.getName();
         CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, 
cubeStatsReader.getCuboidRowEstimatesHLL(),
                 
cubeStatsReader.getCuboidSizeMap()).setHitFrequencyMap(hitFrequencyMap)
-                        .setRollingUpCountSourceMap(rollingUpCountSourceMap,
-                                
segment.getConfig().getCubePlannerMandatoryRollUpThreshold())
-                        .build();
+                        
.setRollingUpCountSourceMap(rollingUpCountSourceMap).build();
         return 
CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, 
segment.getConfig(),
                 ifForceRecommend);
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
index 1542aa2..ee615c3 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
@@ -68,6 +68,29 @@ public class CuboidStatsReaderUtil {
         return statisticsMerged.isEmpty() ? null : statisticsMerged;
     }
 
+    public static Map<Long, Double> readCuboidSizeFromCube(Map<Long, Long> 
statistics, CubeInstance cube)
+            throws IOException {
+        List<CubeSegment> segmentList = 
cube.getSegments(SegmentStatusEnum.READY);
+        Map<Long, Double> sizeMerged = 
Maps.newHashMapWithExpectedSize(statistics.size());
+        for (CubeSegment pSegment : segmentList) {
+            CubeStatsReader pReader = new CubeStatsReader(pSegment, null, 
pSegment.getConfig());
+            Map<Long, Double> pSizeMap = 
CubeStatsReader.getCuboidSizeMapFromRowCount(pSegment, statistics,
+                    pReader.sourceRowCount);
+            for (Long pCuboid : statistics.keySet()) {
+                Double pSize = sizeMerged.get(pCuboid);
+                sizeMerged.put(pCuboid, pSize == null ? pSizeMap.get(pCuboid) 
: pSize + pSizeMap.get(pCuboid));
+            }
+        }
+        int nSegment = segmentList.size();
+        if (nSegment <= 1) {
+            return sizeMerged;
+        }
+        for (Long pCuboid : statistics.keySet()) {
+            sizeMerged.put(pCuboid, sizeMerged.get(pCuboid) / nSegment);
+        }
+        return sizeMerged;
+    }
+
     private static void readCuboidStatsFromSegments(Set<Long> cuboidSet, 
List<CubeSegment> segmentList,
             final Map<Long, Long> statisticsMerged, final Map<Long, Double> 
sizeMerged) throws IOException {
         if (segmentList == null || segmentList.isEmpty()) {
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 11753ec..08b03a4 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -32,6 +32,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -901,7 +902,8 @@ public class CubeController extends BasicController {
     private Map<Long, Long> getRecommendCuboidList(CubeInstance cube) throws 
IOException {
         // Get cuboid source info
         Map<Long, Long> optimizeHitFrequencyMap = 
getSourceCuboidHitFrequency(cube.getName());
-        Map<Long, Map<Long, Long>> rollingUpCountSourceMap = 
cubeService.getCuboidRollingUpStats(cube.getName());
+        Map<Long, Map<Long, Pair<Long, Long>>> rollingUpCountSourceMap = 
cubeService
+                .getCuboidRollingUpStats(cube.getName());
         return cubeService.getRecommendCuboidStatistics(cube, 
optimizeHitFrequencyMap, rollingUpCountSourceMap);
     }
 
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 4944b58..a9fbb97 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -893,7 +894,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
 
     /** cube planner services */
     public Map<Long, Long> getRecommendCuboidStatistics(CubeInstance cube, 
Map<Long, Long> hitFrequencyMap,
-            Map<Long, Map<Long, Long>> rollingUpCountSourceMap) throws 
IOException {
+            Map<Long, Map<Long, Pair<Long, Long>>> rollingUpCountSourceMap) 
throws IOException {
         aclEvaluate.checkProjectAdminPermission(cube.getProject());
         return CuboidRecommenderUtil.getRecommendCuboidList(cube, 
hitFrequencyMap, rollingUpCountSourceMap);
     }
@@ -906,12 +907,16 @@ public class CubeService extends BasicService implements 
InitializingBean {
         return formattedQueryCount;
     }
 
-    public Map<Long, Map<Long, Long>> formatRollingUpStats(List<List<String>> 
orgRollingUpCount) {
-        Map<Long, Map<Long, Long>> formattedRollingUpStats = 
Maps.newLinkedHashMap();
+    public Map<Long, Map<Long, Pair<Long, Long>>> 
formatRollingUpStats(List<List<String>> orgRollingUpCount) {
+        Map<Long, Map<Long, Pair<Long, Long>>> formattedRollingUpStats = 
Maps.newLinkedHashMap();
         for (List<String> rollingUp : orgRollingUpCount) {
-            Map<Long, Long> childMap = Maps.newLinkedHashMap();
-            childMap.put(Long.parseLong(rollingUp.get(1)), (long) 
Double.parseDouble(rollingUp.get(2)));
-            formattedRollingUpStats.put(Long.parseLong(rollingUp.get(0)), 
childMap);
+            Map<Long, Pair<Long, Long>> childMap = Maps.newLinkedHashMap();
+            Long srcCuboid = Long.parseLong(rollingUp.get(0));
+            Long tgtCuboid = Long.parseLong(rollingUp.get(1));
+            Long rollupCount = (long) Double.parseDouble(rollingUp.get(2));
+            Long returnCount = (long) Double.parseDouble(rollingUp.get(3));
+            childMap.put(tgtCuboid, new Pair<>(rollupCount, returnCount));
+            formattedRollingUpStats.put(srcCuboid, childMap);
         }
         return formattedRollingUpStats;
     }
@@ -929,15 +934,16 @@ public class CubeService extends BasicService implements 
InitializingBean {
         return formatQueryCount(orgHitFrequency);
     }
 
-    public Map<Long, Map<Long, Long>> getCuboidRollingUpStats(String cubeName) 
{
+    public Map<Long, Map<Long, Pair<Long, Long>>> 
getCuboidRollingUpStats(String cubeName) {
         String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString();
-        String cuboidTarget = QueryCubePropertyEnum.CUBOID_TARGET.toString();
+        String cuboidTgt = QueryCubePropertyEnum.CUBOID_TARGET.toString();
         String aggCount = QueryCubePropertyEnum.AGGR_COUNT.toString();
+        String returnCount = QueryCubePropertyEnum.RETURN_COUNT.toString();
         String table = 
getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube());
-        String sql = "select " + cuboidSource + ", " + cuboidTarget + ", avg(" 
+ aggCount + ")" //
+        String sql = "select " + cuboidSource + ", " + cuboidTgt + ", avg(" + 
aggCount + "), avg(" + returnCount + ")"//
                 + " from " + table //
                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" + 
cubeName + "' " //
-                + " group by " + cuboidSource + ", " + cuboidTarget;
+                + " group by " + cuboidSource + ", " + cuboidTgt;
         List<List<String>> orgRollingUpCount = 
queryService.querySystemCube(sql).getResults();
         return formatRollingUpStats(orgRollingUpCount);
     }

Reply via email to