http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 89bb11e,575c7b3..569b810
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@@ -54,20 -56,20 +56,17 @@@ public class FactDistinctColumnsMapper<
          BYTES
      }
  
-     protected CuboidScheduler cuboidScheduler = null;
 -    protected boolean collectStatistics = false;
      protected int nRowKey;
      private Integer[][] allCuboidsBitSet = null;
      private HLLCounter[] allCuboidsHLL = null;
      private Long[] cuboidIds;
-     private HashFunction hf = null;
      private int rowCount = 0;
      private int samplingPercentage;
-     //private ByteArray[] row_hashcodes = null;
-     private long[] rowHashCodesLong = null;
      private ByteBuffer tmpbuf;
+ 
+     private CuboidStatCalculator[] cuboidStatCalculators;
+ 
      private static final Text EMPTY_TEXT = new Text();
--    public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
--    public static final byte MARK_FOR_HLL = (byte) 0xFF;
  
      private int partitionColumnIndex = -1;
      private boolean needFetchPartitionCol = true;
@@@ -81,70 -80,95 +77,93 @@@
      protected void doSetup(Context context) throws IOException {
          super.doSetup(context);
          tmpbuf = ByteBuffer.allocate(4096);
 -        collectStatistics = 
Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
 -        if (collectStatistics) {
 -            samplingPercentage = Integer
 -                    
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
 -            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
 -
 -            Set<Long> cuboidIdSet = 
Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
 -            if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSeg)) {
 -                // For cube planner, for every prebuilt cuboid, its related 
row count stats should be calculated
 -                // If the precondition for trigger cube planner phase one is 
satisfied, we need to calculate row count stats for mandatory cuboids.
 -                
cuboidIdSet.addAll(cubeSeg.getCubeDesc().getMandatoryCuboids());
 -            }
 -            cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
 -            allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+ 
 -            allCuboidsHLL = new HLLCounter[cuboidIds.length];
 -            for (int i = 0; i < cuboidIds.length; i++) {
 -                allCuboidsHLL[i] = new 
HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
 -            }
 +        samplingPercentage = Integer
 +                
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
-         cuboidScheduler = cubeDesc.getInitialCuboidScheduler();
 +        nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
  
-         List<Long> cuboidIdList = Lists.newArrayList();
-         List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
-         addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
- 
-         allCuboidsBitSet = allCuboidsBitSetList.toArray(new 
Integer[cuboidIdList.size()][]);
-         cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
 -            TblColRef partitionColRef = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
 -            if (partitionColRef != null) {
 -                partitionColumnIndex = 
intermediateTableDesc.getColumnIndex(partitionColRef);
 -            }
++        Set<Long> cuboidIdSet = 
Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
++        if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSeg)) {
++            // For cube planner, for every prebuilt cuboid, its related row 
count stats should be calculated
++            // If the precondition for trigger cube planner phase one is 
satisfied, we need to calculate row count stats for mandatory cuboids.
++            cuboidIdSet.addAll(cubeSeg.getCubeDesc().getMandatoryCuboids());
++        }
++        cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
++        allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
  
 -            // check whether need fetch the partition col values
 -            if (partitionColumnIndex < 0) {
 -                // if partition col not on cube, no need
 -                needFetchPartitionCol = false;
 -            } else {
 -                needFetchPartitionCol = true;
 -            }
 -            //for KYLIN-2518 backward compatibility
 -            boolean isUsePutRowKeyToHllNewAlgorithm;
 -            if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
 -                isUsePutRowKeyToHllNewAlgorithm = false;
 -                logger.info("Found KylinVersion : {}. Use old algorithm for 
cuboid sampling.", cubeDesc.getVersion());
 -            } else {
 -                isUsePutRowKeyToHllNewAlgorithm = true;
 -                logger.info(
 -                        "Found KylinVersion : {}. Use new algorithm for 
cuboid sampling. About the details of the new algorithm, please refer to 
KYLIN-2518",
 -                        cubeDesc.getVersion());
 -            }
 +        allCuboidsHLL = new HLLCounter[cuboidIds.length];
 +        for (int i = 0; i < cuboidIds.length; i++) {
 +            allCuboidsHLL[i] = new 
HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
 +        }
  
 -            int calculatorNum = getStatsThreadNum(cuboidIds.length);
 -            cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
 -            int splitSize = cuboidIds.length / calculatorNum;
 -            if (splitSize <= 0) {
 -                splitSize = 1;
 -            }
 -            for (int i = 0; i < calculatorNum; i++) {
 -                HLLCounter[] cuboidsHLLSplit;
 -                Integer[][] cuboidsBitSetSplit;
 -                Long[] cuboidIdSplit;
 -                int start = i * splitSize;
 -                if (start >= cuboidIds.length) {
 -                    break;
 -                }
 -                int end = (i + 1) * splitSize;
 -                if (i == calculatorNum - 1) {// last split
 -                    end = cuboidIds.length;
 -                }
 +        TblColRef partitionColRef = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
 +        if (partitionColRef != null) {
 +            partitionColumnIndex = 
intermediateTableDesc.getColumnIndex(partitionColRef);
 +        }
  
 -                cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, 
end);
 -                cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, 
start, end);
 -                cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
 -                CuboidStatCalculator calculator = new CuboidStatCalculator(i,
 -                        intermediateTableDesc.getRowKeyColumnIndexes(), 
cuboidIdSplit, cuboidsBitSetSplit,
 -                        isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
 -                cuboidStatCalculators[i] = calculator;
 -                calculator.start();
 +        // check whether need fetch the partition col values
 +        if (partitionColumnIndex < 0) {
 +            // if partition col not on cube, no need
 +            needFetchPartitionCol = false;
 +        } else {
 +            needFetchPartitionCol = true;
 +        }
 +        //for KYLIN-2518 backward compatibility
++        boolean isUsePutRowKeyToHllNewAlgorithm;
 +        if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
 +            isUsePutRowKeyToHllNewAlgorithm = false;
-             hf = Hashing.murmur3_32();
 +            logger.info("Found KylinVersion : {}. Use old algorithm for 
cuboid sampling.", cubeDesc.getVersion());
 +        } else {
 +            isUsePutRowKeyToHllNewAlgorithm = true;
-             rowHashCodesLong = new long[nRowKey];
-             hf = Hashing.murmur3_128();
 +            logger.info(
 +                    "Found KylinVersion : {}. Use new algorithm for cuboid 
sampling. About the details of the new algorithm, please refer to KYLIN-2518",
 +                    cubeDesc.getVersion());
 +        }
 +
-     }
++        int calculatorNum = getStatsThreadNum(cuboidIds.length);
++        cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
++        int splitSize = cuboidIds.length / calculatorNum;
++        if (splitSize <= 0) {
++            splitSize = 1;
++        }
++        for (int i = 0; i < calculatorNum; i++) {
++            HLLCounter[] cuboidsHLLSplit;
++            Integer[][] cuboidsBitSetSplit;
++            Long[] cuboidIdSplit;
++            int start = i * splitSize;
++            if (start >= cuboidIds.length) {
++                break;
+             }
++            int end = (i + 1) * splitSize;
++            if (i == calculatorNum - 1) {// last split
++                end = cuboidIds.length;
++            }
 +
-     private void addCuboidBitSet(long cuboidId, List<Integer[]> 
allCuboidsBitSet, List<Long> allCuboids) {
-         allCuboids.add(cuboidId);
-         Integer[] indice = new Integer[Long.bitCount(cuboidId)];
++            cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
++            cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, 
end);
++            cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
++            CuboidStatCalculator calculator = new CuboidStatCalculator(i,
++                    intermediateTableDesc.getRowKeyColumnIndexes(), 
cuboidIdSplit, cuboidsBitSetSplit,
++                    isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
++            cuboidStatCalculators[i] = calculator;
++            calculator.start();
+         }
+     }
  
-         long mask = Long.highestOneBit(baseCuboidId);
-         int position = 0;
-         for (int i = 0; i < nRowKey; i++) {
-             if ((mask & cuboidId) > 0) {
-                 indice[position] = i;
-                 position++;
-             }
-             mask = mask >> 1;
+     private int getStatsThreadNum(int cuboidNum) {
+         int unitNum = 
cubeDesc.getConfig().getCuboidNumberPerStatsCalculator();
+         if (unitNum <= 0) {
+             logger.warn("config from getCuboidNumberPerStatsCalculator() " + 
unitNum + " is should larger than 0");
+             logger.info("Will use single thread for cuboid statistics 
calculation");
+             return 1;
          }
  
-         allCuboidsBitSet.add(indice);
-         Collection<Long> children = 
cuboidScheduler.getSpanningCuboid(cuboidId);
-         for (Long childId : children) {
-             addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
+         int maxCalculatorNum = 
cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber();
+         int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+         if (calculatorNum > maxCalculatorNum) {
+             calculatorNum = maxCalculatorNum;
          }
+         return calculatorNum;
      }
  
      @Override
@@@ -158,15 -182,16 +177,8 @@@
                  if (fieldValue == null)
                      continue;
  
--                int reducerIndex;
--                if (uhcIndex[i] == 0) {
--                    //for the normal dictionary column
--                    reducerIndex = columnIndexToReducerBeginId.get(i);
--                } else {
--                    //for the uhc
-                     reducerIndex = columnIndexToReducerBeginId.get(i) + 
(fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
 -                    reducerIndex = columnIndexToReducerBeginId.get(i)
 -                            + (fieldValue.hashCode() & 0x7fffffff) % 
uhcReducerCount;
--                }
--
++                int reducerIndex = reducerMapping.getReducerIdForDictCol(i, 
fieldValue);
++                
                  tmpbuf.clear();
                  byte[] valueBytes = Bytes.toBytes(fieldValue);
                  int size = valueBytes.length + 1;
@@@ -187,28 -213,26 +200,24 @@@
                  }
              }
  
 -            if (collectStatistics) {
 -                if (rowCount % 100 < samplingPercentage) {
 -                    putRowKeyToHLL(row);
 -                }
 +            if (rowCount % 100 < samplingPercentage) {
-                 if (isUsePutRowKeyToHllNewAlgorithm) {
-                     putRowKeyToHLLNew(row);
-                 } else {
-                     putRowKeyToHLLOld(row);
-                 }
++                putRowKeyToHLL(row);
 +            }
  
 -                if (needFetchPartitionCol == true) {
 -                    String fieldValue = row[partitionColumnIndex];
 -                    if (fieldValue != null) {
 -                        tmpbuf.clear();
 -                        byte[] valueBytes = Bytes.toBytes(fieldValue);
 -                        int size = valueBytes.length + 1;
 -                        if (size >= tmpbuf.capacity()) {
 -                            tmpbuf = 
ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
 -                        }
 -                        tmpbuf.put(MARK_FOR_PARTITION_COL);
 -                        tmpbuf.put(valueBytes);
 -                        outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
 -                        sortableKey.init(outputKey, (byte) 0);
 -                        context.write(sortableKey, EMPTY_TEXT);
 +            if (needFetchPartitionCol == true) {
 +                String fieldValue = row[partitionColumnIndex];
 +                if (fieldValue != null) {
 +                    tmpbuf.clear();
 +                    byte[] valueBytes = Bytes.toBytes(fieldValue);
 +                    int size = valueBytes.length + 1;
 +                    if (size >= tmpbuf.capacity()) {
 +                        tmpbuf = 
ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
                      }
-                     tmpbuf.put(MARK_FOR_PARTITION_COL);
++                    tmpbuf.put((byte) 
FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL);
 +                    tmpbuf.put(valueBytes);
 +                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
 +                    sortableKey.init(outputKey, (byte) 0);
 +                    context.write(sortableKey, EMPTY_TEXT);
                  }
              }
              rowCount++;
@@@ -224,78 -254,171 +239,169 @@@
          return size;
      }
  
-     private void putRowKeyToHLLOld(String[] row) {
-         //generate hash for each row key column
-         byte[][] rowHashCodes = new byte[nRowKey][];
-         for (int i = 0; i < nRowKey; i++) {
-             Hasher hc = hf.newHasher();
-             String colValue = 
row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-             if (colValue != null) {
-                 rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+     @Override
+     protected void doCleanup(Context context) throws IOException, 
InterruptedException {
 -        if (collectStatistics) {
 -            ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
 -            // output each cuboid's hll to reducer, key is 0 - cuboidId
 -            for (CuboidStatCalculator cuboidStatCalculator : 
cuboidStatCalculators) {
 -                cuboidStatCalculator.waitForCompletion();
 -            }
 -            for (CuboidStatCalculator cuboidStatCalculator : 
cuboidStatCalculators) {
 -                Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
 -                HLLCounter[] cuboidsHLL = 
cuboidStatCalculator.getHLLCounters();
 -                HLLCounter hll;
++        ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
++        // output each cuboid's hll to reducer, key is 0 - cuboidId
++        for (CuboidStatCalculator cuboidStatCalculator : 
cuboidStatCalculators) {
++            cuboidStatCalculator.waitForCompletion();
++        }
++        for (CuboidStatCalculator cuboidStatCalculator : 
cuboidStatCalculators) {
++            Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
++            HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
++            HLLCounter hll;
+ 
 -                for (int i = 0; i < cuboidIds.length; i++) {
 -                    hll = cuboidsHLL[i];
 -                    tmpbuf.clear();
 -                    tmpbuf.put(MARK_FOR_HLL); // one byte
 -                    tmpbuf.putLong(cuboidIds[i]);
 -                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
 -                    hllBuf.clear();
 -                    hll.writeRegisters(hllBuf);
 -                    outputValue.set(hllBuf.array(), 0, hllBuf.position());
 -                    sortableKey.init(outputKey, (byte) 0);
 -                    context.write(sortableKey, outputValue);
 -                }
++            for (int i = 0; i < cuboidIds.length; i++) {
++                hll = cuboidsHLL[i];
++                tmpbuf.clear();
++                tmpbuf.put((byte) 
FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
++                tmpbuf.putLong(cuboidIds[i]);
++                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
++                hllBuf.clear();
++                hll.writeRegisters(hllBuf);
++                outputValue.set(hllBuf.array(), 0, hllBuf.position());
++                sortableKey.init(outputKey, (byte) 0);
++                context.write(sortableKey, outputValue);
+             }
+         }
+     }
+ 
+     private int countNewSize(int oldSize, int dataSize) {
+         int newSize = oldSize * 2;
+         while (newSize < dataSize) {
+             newSize = newSize * 2;
+         }
+         return newSize;
+     }
+ 
+     public static class CuboidStatCalculator implements Runnable {
+         private final int id;
+         private final int nRowKey;
+         private final int[] rowkeyColIndex;
+         private final Long[] cuboidIds;
+         private final Integer[][] cuboidsBitSet;
+         private volatile HLLCounter[] cuboidsHLL = null;
+ 
+         //about details of the new algorithm, please see KYLIN-2518
+         private final boolean isNewAlgorithm;
+         private final HashFunction hf;
+         private long[] rowHashCodesLong;
+ 
+         private BlockingQueue<String[]> queue = new 
LinkedBlockingQueue<String[]>(2000);
+         private Thread workThread;
+         private volatile boolean stop;
+ 
+         public CuboidStatCalculator(int id, int[] rowkeyColIndex, Long[] 
cuboidIds, Integer[][] cuboidsBitSet,
+                 boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] 
cuboidsHLL) {
+             this.id = id;
+             this.nRowKey = rowkeyColIndex.length;
+             this.rowkeyColIndex = rowkeyColIndex;
+             this.cuboidIds = cuboidIds;
+             this.cuboidsBitSet = cuboidsBitSet;
+             this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+             if (!isNewAlgorithm) {
+                 this.hf = Hashing.murmur3_32();
              } else {
-                 rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+                 rowHashCodesLong = new long[nRowKey];
+                 this.hf = Hashing.murmur3_128();
              }
+             this.cuboidsHLL = cuboidsHLL;
+             workThread = new Thread(this);
+         }
+ 
+         public void start() {
+             logger.info("cuboid stats calculator:" + id + " started, handle 
cuboids number:" + cuboidIds.length);
+             workThread.start();
          }
  
-         // user the row key column hash to get a consolidated hash for each 
cuboid
-         for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-             Hasher hc = hf.newHasher();
-             for (int position = 0; position < allCuboidsBitSet[i].length; 
position++) {
-                 hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+         public void putRow(final String[] row) {
+             String[] copyRow = Arrays.copyOf(row, row.length);
+             try {
+                 queue.put(copyRow);
+             } catch (InterruptedException e) {
+                 logger.error("interrupt", e);
              }
+         }
  
-             allCuboidsHLL[i].add(hc.hash().asBytes());
+         public void waitForCompletion() {
+             stop = true;
+             try {
+                 workThread.join();
+             } catch (InterruptedException e) {
+                 logger.error("interrupt", e);
+             }
          }
-     }
  
-     private void putRowKeyToHLLNew(String[] row) {
-         //generate hash for each row key column
-         for (int i = 0; i < nRowKey; i++) {
-             Hasher hc = hf.newHasher();
-             String colValue = 
row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-             if (colValue == null)
-                 colValue = "0";
-             byte[] bytes = hc.putString(colValue).hash().asBytes();
-             rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column 
ordinal to the hash value to distinguish between (a,b) and (b,a)
+         private void putRowKeyToHLLOld(String[] row) {
+             //generate hash for each row key column
+             byte[][] rowHashCodes = new byte[nRowKey][];
+             for (int i = 0; i < nRowKey; i++) {
+                 Hasher hc = hf.newHasher();
+                 String colValue = row[rowkeyColIndex[i]];
+                 if (colValue != null) {
+                     rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+                 } else {
+                     rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+                 }
+             }
+ 
+             // user the row key column hash to get a consolidated hash for 
each cuboid
+             for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                 Hasher hc = hf.newHasher();
+                 for (int position = 0; position < cuboidsBitSet[i].length; 
position++) {
+                     hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+                 }
+ 
+                 cuboidsHLL[i].add(hc.hash().asBytes());
+             }
          }
  
-         // user the row key column hash to get a consolidated hash for each 
cuboid
-         for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-             long value = 0;
-             for (int position = 0; position < allCuboidsBitSet[i].length; 
position++) {
-                 value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+         private void putRowKeyToHLLNew(String[] row) {
+             //generate hash for each row key column
+             for (int i = 0; i < nRowKey; i++) {
+                 Hasher hc = hf.newHasher();
+                 String colValue = row[rowkeyColIndex[i]];
+                 if (colValue == null)
+                     colValue = "0";
+                 byte[] bytes = hc.putString(colValue).hash().asBytes();
+                 rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column 
ordinal to the hash value to distinguish between (a,b) and (b,a)
+             }
+ 
+             // user the row key column hash to get a consolidated hash for 
each cuboid
+             for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                 long value = 0;
+                 for (int position = 0; position < cuboidsBitSet[i].length; 
position++) {
+                     value += rowHashCodesLong[cuboidsBitSet[i][position]];
+                 }
+                 cuboidsHLL[i].addHashDirectly(value);
              }
-             allCuboidsHLL[i].addHashDirectly(value);
          }
-     }
  
-     @Override
-     protected void doCleanup(Context context) throws IOException, 
InterruptedException {
-         ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-         // output each cuboid's hll to reducer, key is 0 - cuboidId
-         HLLCounter hll;
-         for (int i = 0; i < cuboidIds.length; i++) {
-             hll = allCuboidsHLL[i];
- 
-             tmpbuf.clear();
-             tmpbuf.put(MARK_FOR_HLL); // one byte
-             tmpbuf.putLong(cuboidIds[i]);
-             outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-             hllBuf.clear();
-             hll.writeRegisters(hllBuf);
-             outputValue.set(hllBuf.array(), 0, hllBuf.position());
-             sortableKey.init(outputKey, (byte) 0);
-             context.write(sortableKey, outputValue);
+         public HLLCounter[] getHLLCounters() {
+             return cuboidsHLL;
          }
-     }
  
+         public Long[] getCuboidIds() {
+             return cuboidIds;
+         }
  
-     private int countNewSize(int oldSize, int dataSize) {
-         int newSize = oldSize * 2;
-         while (newSize < dataSize) {
-             newSize = newSize * 2;
+         @Override
+         public void run() {
+             while (true) {
+                 String[] row = queue.poll();
+                 if (row == null && stop) {
+                     logger.info("cuboid stats calculator:" + id + " 
completed.");
+                     break;
+                 } else if (row == null) {
+                     Thread.yield();
+                     continue;
+                 }
+                 if (isNewAlgorithm) {
+                     putRowKeyToHLLNew(row);
+                 } else {
+                     putRowKeyToHLLOld(row);
+                 }
+             }
          }
-         return newSize;
      }
- 
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 00b831a,00b831a..c66042b
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@@ -20,9 -20,9 +20,7 @@@ package org.apache.kylin.engine.mr.step
  
  import java.io.IOException;
  import java.util.Arrays;
--import java.util.HashMap;
  import java.util.List;
--import java.util.Map;
  
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.io.Text;
@@@ -63,10 -63,10 +61,8 @@@ abstract public class FactDistinctColum
      protected CubeJoinedFlatTableEnrich intermediateTableDesc;
      protected int[] dictionaryColumnIndex;
  
--    protected int uhcReducerCount;
--    protected int[] uhcIndex;
--    protected Map<Integer, Integer> columnIndexToReducerBeginId = new 
HashMap<>();
--
++    protected FactDistinctColumnsReducerMapping reducerMapping;
++    
      @Override
      protected void doSetup(Context context) throws IOException {
          Configuration conf = context.getConfiguration();
@@@ -90,15 -90,15 +86,8 @@@
              dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
          }
  
--        uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
--        uhcReducerCount = cube.getConfig().getUHCReducerCount();
--        int count = 0;
--        for (int i = 0; i < uhcIndex.length; i++) {
--            columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) 
+ i);
--            if (uhcIndex[i] == 1) {
--                count++;
--            }
--        }
++        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
++                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
      }
  
      protected void handleErrorRecord(String[] record, Exception ex) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 8273530,37972c0..cad947c
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@@ -22,7 -22,7 +22,6 @@@ import java.io.DataOutputStream
  import java.io.IOException;
  import java.nio.ByteBuffer;
  import java.util.Collections;
--import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  
@@@ -61,7 -61,7 +60,6 @@@ public class FactDistinctColumnsReduce
  
      private static final Logger logger = 
LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
  
--    private List<TblColRef> columnList;
      private List<Long> baseCuboidRowCountInMappers;
      protected Map<Long, HLLCounter> cuboidHLLMap = null;
      protected long baseCuboidId;
@@@ -71,11 -71,11 +69,10 @@@
      private TblColRef col = null;
      private boolean isStatistics = false;
      private KylinConfig cubeConfig;
--    private int uhcReducerCount;
--    private Map<Integer, Integer> reducerIdToColumnIndex = new HashMap<>();
      private int taskId;
      private boolean isPartitionCol = false;
      private int rowCount = 0;
++    private FactDistinctColumnsReducerMapping reducerMapping;
  
      //local build dict
      private boolean buildDictInReducer;
@@@ -98,33 -98,49 +95,36 @@@
          CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
          cubeConfig = cube.getConfig();
          cubeDesc = cube.getDescriptor();
--        columnList = 
Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
  
 -        boolean collectStatistics = 
Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
          int numberOfTasks = context.getNumReduceTasks();
          taskId = context.getTaskAttemptID().getTaskID().getId();
  
--        uhcReducerCount = cube.getConfig().getUHCReducerCount();
--        initReducerIdToColumnIndex(config);
 -
 -        boolean ifCol = true;
 -        if (collectStatistics) {
 -            int hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 
0);
 -            if (hllShardBase <= 0) {
 -                throw new IllegalArgumentException(
 -                        "In job configuration the value for property " + 
BatchConstants.CFG_HLL_SHARD_BASE
 -                        + " is " + hllShardBase + ". It should be set 
correctly!!!");
 -            }
 -            ifCol = false;
 -            if (taskId >= numberOfTasks - hllShardBase) {
 -                // hll
 -                isStatistics = true;
 -                baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
 -                baseCuboidRowCountInMappers = Lists.newArrayList();
 -                cuboidHLLMap = Maps.newHashMap();
 -                samplingPercentage = Integer
 -                        
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
 -                logger.info("Reducer " + taskId + " handling stats");
 -            } else if (taskId == numberOfTasks - hllShardBase - 1) {
 -                // partition col
 -                isPartitionCol = true;
 -                col = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
 -                if (col == null) {
 -                    logger.info("No partition col. This reducer will do 
nothing");
 -                } else {
 -                    logger.info("Reducer " + taskId + " handling partition 
col " + col.getIdentity());
 -                }
++        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
++                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
++        
++        logger.info("reducer no " + taskId + ", role play " + 
reducerMapping.getRolePlayOfReducer(taskId));
 +
-         if (taskId == numberOfTasks - 1) {
++        if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
 +            // hll
 +            isStatistics = true;
++            baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
 +            baseCuboidRowCountInMappers = Lists.newArrayList();
 +            cuboidHLLMap = Maps.newHashMap();
-             samplingPercentage = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
++            samplingPercentage = Integer
++                    
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
 +            logger.info("Reducer " + taskId + " handling stats");
-         } else if (taskId == numberOfTasks - 2) {
++        } else if (reducerMapping.isPartitionColReducer(taskId)) {
 +            // partition col
 +            isPartitionCol = true;
 +            col = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
 +            if (col == null) {
 +                logger.info("No partition col. This reducer will do nothing");
              } else {
 -                ifCol = true;
 +                logger.info("Reducer " + taskId + " handling partition col " 
+ col.getIdentity());
              }
 -        }
 -        if (ifCol) {
 +        } else {
              // normal col
--            col = columnList.get(reducerIdToColumnIndex.get(taskId));
++            col = reducerMapping.getDictColForReducer(taskId);
              Preconditions.checkNotNull(col);
  
              // local build dict
@@@ -132,11 -148,11 +132,8 @@@
              if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only 
works with default dictionary builder
                  buildDictInReducer = false;
              }
--            if(config.getUHCReducerCount() > 1) {
--                int[] uhcIndex = 
CubeManager.getInstance(config).getUHCIndex(cubeDesc);
--                int colIndex = reducerIdToColumnIndex.get(taskId);
--                if (uhcIndex[colIndex] == 1)
--                    buildDictInReducer = false; //for UHC columns, this 
feature should be disabled
++            if (reducerMapping.getReducerNumForDictCol(col) > 1) {
++                buildDictInReducer = false; // only works if this is the only 
reducer of a dictionary column
              }
              if (buildDictInReducer) {
                  builder = 
DictionaryGenerator.newDictionaryBuilder(col.getType());
@@@ -146,20 -162,20 +143,6 @@@
          }
      }
  
--    private void initReducerIdToColumnIndex(KylinConfig config) throws 
IOException {
--        int[] uhcIndex = 
CubeManager.getInstance(config).getUHCIndex(cubeDesc);
--        int count = 0;
--        for (int i = 0; i < uhcIndex.length; i++) {
--            reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
--            if (uhcIndex[i] == 1) {
--                for (int j = 1; j < uhcReducerCount; j++) {
--                    reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) 
+ j + i, i);
--                }
--                count++;
--            }
--        }
--    }
--
      @Override
      public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, 
Context context) throws IOException, InterruptedException {
          Text key = skey.getText();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
index 0000000,0000000..51594c3
new file mode 100644
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
@@@ -1,0 -1,0 +1,156 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ * 
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++*/
++
++package org.apache.kylin.engine.mr.steps;
++
++import java.util.ArrayList;
++import java.util.List;
++
++import org.apache.kylin.cube.CubeInstance;
++import org.apache.kylin.cube.model.CubeDesc;
++import org.apache.kylin.engine.mr.common.MapReduceUtil;
++import org.apache.kylin.metadata.model.TblColRef;
++
++/**
++ * Reducers play different roles based on reducer-id:
++ * - (start from 0) one reducer for each dictionary column, UHC may have more 
than one reducer
++ * - one reducer to get min/max of date partition column
++ * - (at the end) one or more reducers to collect row counts for cuboids 
using HLL
++ */
++public class FactDistinctColumnsReducerMapping {
++
++    public static final int MARK_FOR_PARTITION_COL = -2;
++    public static final int MARK_FOR_HLL_COUNTER = -1;
++
++    final private int nDictValueCollectors;
++    final private int datePartitionReducerId;
++    final private int nCuboidRowCounters;
++    final private int nTotalReducers;
++
++    final private List<TblColRef> allDictCols;
++    final private int[] dictColIdToReducerBeginId;
++    final private int[] reducerRolePlay; // >=0 for dict col id, <0 for 
partition col and hll counter (using markers)
++
++    public FactDistinctColumnsReducerMapping(CubeInstance cube) {
++        this(cube, 0);
++    }
++
++    public FactDistinctColumnsReducerMapping(CubeInstance cube, int 
cuboidRowCounterReducerNum) {
++        CubeDesc desc = cube.getDescriptor();
++
++        allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt());
++
++        dictColIdToReducerBeginId = new int[allDictCols.size() + 1];
++
++        int uhcReducerCount = cube.getConfig().getUHCReducerCount();
++        List<TblColRef> uhcList = desc.getAllUHCColumns();
++        int counter = 0;
++        for (int i = 0; i < allDictCols.size(); i++) {
++            dictColIdToReducerBeginId[i] = counter;
++            boolean isUHC = uhcList.contains(allDictCols.get(i));
++            counter += (isUHC) ? uhcReducerCount : 1;
++        }
++
++        dictColIdToReducerBeginId[allDictCols.size()] = counter;
++        nDictValueCollectors = counter;
++        datePartitionReducerId = counter;
++
++        nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? //
++                MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : 
cuboidRowCounterReducerNum;
++        nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters;
++
++        reducerRolePlay = new int[nTotalReducers];
++        for (int i = 0, dictId = 0; i < nTotalReducers; i++) {
++            if (i > datePartitionReducerId) {
++                // cuboid HLL counter reducer
++                reducerRolePlay[i] = MARK_FOR_HLL_COUNTER;
++            } else if (i == datePartitionReducerId) {
++                // date partition min/max reducer
++                reducerRolePlay[i] = MARK_FOR_PARTITION_COL;
++            } else {
++                // dict value collector reducer
++                if (i == dictColIdToReducerBeginId[dictId + 1])
++                    dictId++;
++
++                reducerRolePlay[i] = dictId;
++            }
++        }
++    }
++    
++    public List<TblColRef> getAllDictCols() {
++        return allDictCols;
++    }
++    
++    public int getTotalReducerNum() {
++        return nTotalReducers;
++    }
++    
++    public int getCuboidRowCounterReducerNum() {
++        return nCuboidRowCounters;
++    }
++
++    public int getReducerIdForDictCol(int dictColId, Object fieldValue) {
++        int begin = dictColIdToReducerBeginId[dictColId];
++        int span = dictColIdToReducerBeginId[dictColId + 1] - begin;
++        
++        if (span == 1)
++            return begin;
++        
++        int hash = fieldValue == null ? 0 : fieldValue.hashCode();
++        return begin + Math.abs(hash) % span;
++    }
++    
++    public int[] getAllRolePlaysForReducers() {
++        return reducerRolePlay;
++    }
++
++    public int getRolePlayOfReducer(int reducerId) {
++        return reducerRolePlay[reducerId];
++    }
++    
++    public boolean isCuboidRowCounterReducer(int reducerId) {
++        return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER;
++    }
++    
++    public boolean isPartitionColReducer(int reducerId) {
++        return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL;
++    }
++
++    public TblColRef getDictColForReducer(int reducerId) {
++        int role = getRolePlayOfReducer(reducerId);
++        if (role < 0)
++            throw new IllegalStateException();
++        
++        return allDictCols.get(role);
++    }
++
++    public int getReducerNumForDictCol(TblColRef col) {
++        int dictColId = allDictCols.indexOf(col);
++        return dictColIdToReducerBeginId[dictColId + 1] - 
dictColIdToReducerBeginId[dictColId];
++    }
++
++    public int getReducerIdForDatePartitionColumn() {
++        return datePartitionReducerId;
++    }
++
++    public int getReducerIdForCuboidRowCount(long cuboidId) {
++        int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters);
++        return datePartitionReducerId + 1 + rowCounterId;
++    }
++    
++}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 2a184b0,3a1f852..da1576f
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@@ -63,20 -63,16 +63,20 @@@ public class MergeDictionaryStep extend
          try {
              checkLookupSnapshotsMustIncremental(mergingSegments);
  
 -            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
 -            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
 +            // work on copy instead of cached objects
 +            CubeInstance cubeCopy = cube.latestCopyForWrite();
 +            CubeSegment newSegCopy = 
cubeCopy.getSegmentById(newSegment.getUuid());
 +            
 +            makeDictForNewSegment(conf, cubeCopy, newSegCopy, 
mergingSegments);
 +            makeSnapshotForNewSegment(cubeCopy, newSegCopy, mergingSegments);
  
 -            CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -            cubeBuilder.setToUpdateSegs(newSegment);
 -            mgr.updateCube(cubeBuilder);
 +            CubeUpdate update = new CubeUpdate(cubeCopy);
 +            update.setToUpdateSegs(newSegCopy);
 +            mgr.updateCube(update);
-             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+             return ExecuteResult.createSucceed();
          } catch (IOException e) {
              logger.error("fail to merge dictionary or lookup snapshots", e);
-             return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
+             return ExecuteResult.createError(e);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index f15819f,921494f..b532360
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@@ -63,11 -78,59 +78,62 @@@ public class SaveStatisticsStep extend
                  throw new IOException("fail to find the statistics file in 
base dir: " + statisticsDir);
              }
  
-             FSDataInputStream is = fs.open(statisticsFilePath);
+             Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+             long totalRowsBeforeMerge = 0;
+             long grantTotal = 0;
+             int samplingPercentage = -1;
+             int mapperNumber = -1;
+             for (Path item : statisticsFiles) {
+                 CubeStatsReader.CubeStatsResult cubeStatsResult = new 
CubeStatsReader.CubeStatsResult(item,
+                         kylinConf.getCubeStatsHLLPrecision());
+                 cuboidHLLMap.putAll(cubeStatsResult.getCounterMap());
+                 long pGrantTotal = 0L;
+                 for (HLLCounter hll : 
cubeStatsResult.getCounterMap().values()) {
+                     pGrantTotal += hll.getCountEstimate();
+                 }
+                 totalRowsBeforeMerge += pGrantTotal * 
cubeStatsResult.getMapperOverlapRatio();
+                 grantTotal += pGrantTotal;
+                 int pMapperNumber = cubeStatsResult.getMapperNumber();
+                 if (pMapperNumber > 0) {
+                     if (mapperNumber < 0) {
+                         mapperNumber = pMapperNumber;
+                     } else {
+                         throw new RuntimeException(
+                                 "Base cuboid has been distributed to multiple 
reducers at step FactDistinctColumnsReducer!!!");
+                     }
+                 }
+                 int pSamplingPercentage = cubeStatsResult.getPercentage();
+                 if (samplingPercentage < 0) {
+                     samplingPercentage = pSamplingPercentage;
+                 } else if (samplingPercentage != pSamplingPercentage) {
+                     throw new RuntimeException(
+                             "The sampling percentage should be same among all 
of the reducer of FactDistinctColumnsReducer!!!");
+                 }
+             }
+             if (samplingPercentage < 0) {
+                 logger.warn("The sampling percentage should be set!!!");
+             }
+             if (mapperNumber < 0) {
+                 logger.warn("The mapper number should be set!!!");
+             }
+ 
+             if (logger.isDebugEnabled()) {
+                 logMapperAndCuboidStatistics(cuboidHLLMap, 
samplingPercentage, mapperNumber, grantTotal,
+                         totalRowsBeforeMerge);
+             }
+             double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grantTotal;
+             CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, 
cuboidHLLMap, samplingPercentage,
+                     mapperNumber, mapperOverlapRatio);
+ 
+             Path statisticsFile = new Path(statisticsDir, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
++            logger.info(newSegment + " stats saved to hdfs " + 
statisticsFile);
++            
+             FSDataInputStream is = fs.open(statisticsFile);
              try {
                  // put the statistics to metadata store
--                String statisticsFileName = 
newSegment.getStatisticsResourcePath();
--                rs.putResource(statisticsFileName, is, 
System.currentTimeMillis());
++                String resPath = newSegment.getStatisticsResourcePath();
++                rs.putResource(resPath, is, System.currentTimeMillis());
++                logger.info(newSegment + " stats saved to resource " + 
resPath);
  
                  CubingJob cubingJob = (CubingJob) getManager()
                          
.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
index 0000000,485975a..1b1a7f0
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
@@@ -1,0 -1,154 +1,154 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.kylin.engine.mr.steps;
+ 
++import java.io.IOException;
++import java.util.List;
++import java.util.Map;
++
+ import org.apache.commons.cli.Options;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.BytesWritable;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.mapreduce.Job;
+ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+ import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+ import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+ import org.apache.hadoop.util.ToolRunner;
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.HadoopUtil;
+ import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.cube.CubeManager;
+ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.apache.kylin.metadata.model.TblColRef;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
 -import java.io.IOException;
 -import java.util.List;
 -import java.util.Map;
 -
+ public class UHCDictionaryJob extends AbstractHadoopJob {
+     protected static final Logger logger = 
LoggerFactory.getLogger(UHCDictionaryJob.class);
+ 
+     private boolean isSkipped = false;
+ 
+     @Override
+     public int run(String[] args) throws Exception {
+         Options options = new Options();
+ 
+         try {
+             options.addOption(OPTION_JOB_NAME);
+             options.addOption(OPTION_CUBE_NAME);
+             options.addOption(OPTION_CUBING_JOB_ID);
+             options.addOption(OPTION_OUTPUT_PATH);
+             options.addOption(OPTION_INPUT_PATH);
+             parseOptions(options, args);
+ 
+             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+             String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+             String cubeName = getOptionValue(OPTION_CUBE_NAME);
+             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+             Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+ 
+             //add metadata to distributed cache
+             CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+             CubeInstance cube = cubeMgr.getCube(cubeName);
+             attachCubeMetadata(cube, job.getConfiguration());
+ 
 -            List<TblColRef> uhcColumns = 
cubeMgr.getAllUHCColumns(cube.getDescriptor());
++            List<TblColRef> uhcColumns = 
cube.getDescriptor().getAllUHCColumns();
+             int reducerCount = uhcColumns.size();
+ 
+             //Note! handle uhc columns is null.
+             boolean hasUHCValue = false;
+             for (TblColRef tblColRef : uhcColumns) {
+                 Path path = new Path(input.toString() + "/" + 
tblColRef.getIdentity());
+                 if (HadoopUtil.getFileSystem(path).exists(path)) {
+                     FileInputFormat.addInputPath(job, path);
+                     hasUHCValue = true;
+                 }
+             }
+ 
+             if (!hasUHCValue) {
+                 isSkipped = true;
+                 return 0;
+             }
+ 
+             setJobClasspath(job, cube.getConfig());
+             setupMapper();
+             setupReducer(output, reducerCount);
+ 
+             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, 
cubeName);
+             job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, 
job_id);
+             
job.getConfiguration().set(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR, 
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+             
job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false");
+ 
+             //8G memory is enough for all global dict, because the input is 
sequential and we handle global dict slice by slice
+             job.getConfiguration().set("mapreduce.reduce.memory.mb", "8500");
+             job.getConfiguration().set("mapred.reduce.child.java.opts", 
"-Xmx8g");
+             //Copying global dict to working dir in GlobalDictHDFSStore maybe 
elapsed a long time (Maybe we could improve it)
+             //Waiting the global dict lock maybe also take a long time.
+             //So we set 8 hours here
+             job.getConfiguration().set("mapreduce.task.timeout", "28800000");
+ 
+             //allow user specially set config for uhc step
+             for (Map.Entry<String, String> entry : 
cube.getConfig().getUHCMRConfigOverride().entrySet()) {
+                 job.getConfiguration().set(entry.getKey(), entry.getValue());
+             }
+ 
+             return waitForCompletion(job);
+         } finally {
+             if (job != null)
+                 cleanupTempConfFile(job.getConfiguration());
+         }
+     }
+ 
+     private void setupMapper() throws IOException {
+         job.setInputFormatClass(SequenceFileInputFormat.class);
+         job.setMapperClass(UHCDictionaryMapper.class);
+         job.setMapOutputKeyClass(SelfDefineSortableKey.class);
+         job.setMapOutputValueClass(NullWritable.class);
+     }
+ 
+     private void setupReducer(Path output, int numberOfReducers) throws 
IOException {
+         job.setReducerClass(UHCDictionaryReducer.class);
+         job.setPartitionerClass(UHCDictionaryPartitioner.class);
+         job.setNumReduceTasks(numberOfReducers);
+ 
+         MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, 
SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+         FileOutputFormat.setOutputPath(job, output);
+         job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, 
output.toString());
+ 
+         //prevent to create zero-sized default output
+         LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
+ 
+         deletePath(job.getConfiguration(), output);
+     }
+ 
+     @Override
+     public boolean isSkipped() {
+         return isSkipped;
+     }
+ 
+     public static void main(String[] args) throws Exception {
+         UHCDictionaryJob job = new UHCDictionaryJob();
+         int exitCode = ToolRunner.run(job, args);
+         System.exit(exitCode);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
index 0000000,d9d7b60..abec8fc
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
@@@ -1,0 -1,101 +1,101 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.kylin.engine.mr.steps;
+ 
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.util.List;
++
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.Bytes;
+ import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.cube.CubeManager;
+ import org.apache.kylin.engine.mr.KylinMapper;
+ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.apache.kylin.metadata.datatype.DataType;
+ import org.apache.kylin.metadata.model.TblColRef;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
 -import java.io.IOException;
 -import java.nio.ByteBuffer;
 -import java.util.List;
 -
+ public class UHCDictionaryMapper extends KylinMapper<NullWritable, Text, 
SelfDefineSortableKey, NullWritable> {
+     private static final Logger logger = 
LoggerFactory.getLogger(UHCDictionaryMapper.class);
+ 
+     protected int index;
+     protected DataType type;
+ 
+     protected Text outputKey = new Text();
+     private ByteBuffer tmpBuf;
+     private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+ 
+     @Override
+     protected void doSetup(Context context) throws IOException {
+         tmpBuf = ByteBuffer.allocate(4096);
+ 
+         Configuration conf = context.getConfiguration();
+         bindCurrentConfiguration(conf);
+         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ 
+         CubeInstance cube = 
CubeManager.getInstance(config).getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
 -        List<TblColRef> uhcColumns = 
CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor());
++        List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns();
+ 
+         FileSplit fileSplit = (FileSplit) context.getInputSplit();
+         String colName = fileSplit.getPath().getParent().getName();
+ 
+         for (int i = 0; i < uhcColumns.size(); i++) {
+             if (uhcColumns.get(i).getIdentity().equalsIgnoreCase(colName)) {
+                 index = i;
+                 break;
+             }
+         }
+         type = uhcColumns.get(index).getType();
+ 
+         //for debug
+         logger.info("column name: " + colName);
+         logger.info("index: " + index);
+         logger.info("type: " + type);
+     }
+ 
+     @Override
+     public void doMap(NullWritable key, Text value, Context context) throws 
IOException, InterruptedException {
+         tmpBuf.clear();
+         int size = value.getLength()+ 1;
+         if (size >= tmpBuf.capacity()) {
+             tmpBuf = ByteBuffer.allocate(countNewSize(tmpBuf.capacity(), 
size));
+         }
+         tmpBuf.put(Bytes.toBytes(index)[3]);
+         tmpBuf.put(value.getBytes(), 0, value.getLength());
+         outputKey.set(tmpBuf.array(), 0, tmpBuf.position());
+ 
+         sortableKey.init(outputKey, type);
+         context.write(sortableKey, NullWritable.get());
+     }
+ 
+     private int countNewSize(int oldSize, int dataSize) {
+         int newSize = oldSize * 2;
+         while (newSize < dataSize) {
+             newSize = newSize * 2;
+         }
+         return newSize;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
index 0000000,ce9b792..6da198d
mode 000000,100644..100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
@@@ -1,0 -1,113 +1,113 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.kylin.engine.mr.steps;
+ 
++import static 
org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer.DICT_FILE_POSTFIX;
++
++import java.io.DataOutputStream;
++import java.io.IOException;
++import java.util.List;
++
+ import org.apache.commons.io.output.ByteArrayOutputStream;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.BytesWritable;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.Bytes;
+ import org.apache.kylin.common.util.ClassUtil;
+ import org.apache.kylin.common.util.Dictionary;
+ import org.apache.kylin.cube.CubeInstance;
+ import org.apache.kylin.cube.CubeManager;
+ import org.apache.kylin.cube.model.CubeDesc;
+ import org.apache.kylin.dict.DictionaryGenerator;
+ import org.apache.kylin.dict.DictionaryInfo;
+ import org.apache.kylin.dict.IDictionaryBuilder;
+ import org.apache.kylin.engine.mr.KylinReducer;
+ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+ import org.apache.kylin.engine.mr.common.BatchConstants;
+ import org.apache.kylin.metadata.model.TblColRef;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
 -import java.io.DataOutputStream;
 -import java.io.IOException;
 -import java.util.List;
 -
 -import static 
org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer.DICT_FILE_POSTFIX;
 -
+ public class UHCDictionaryReducer extends KylinReducer<SelfDefineSortableKey, 
NullWritable, NullWritable, BytesWritable> {
+     private static final Logger logger = 
LoggerFactory.getLogger(UHCDictionaryReducer.class);
+ 
+     private IDictionaryBuilder builder;
+     private TblColRef col;
+ 
+     private MultipleOutputs mos;
+ 
+     @Override
+     protected void doSetup(Context context) throws IOException {
+         super.bindCurrentConfiguration(context.getConfiguration());
+         Configuration conf = context.getConfiguration();
+         mos = new MultipleOutputs(context);
+ 
+         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+         CubeDesc cubeDesc = cube.getDescriptor();
 -        List<TblColRef> uhcColumns = 
CubeManager.getInstance(config).getAllUHCColumns(cubeDesc);
++        List<TblColRef> uhcColumns = cubeDesc.getAllUHCColumns();
+ 
+         int taskId = context.getTaskAttemptID().getTaskID().getId();
+         col = uhcColumns.get(taskId);
+         logger.info("column name: " + col.getIdentity());
+ 
+         if (cube.getDescriptor().getShardByColumns().contains(col)) {
+             //for ShardByColumns
+             builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+             builder.init(null, 0, null);
+         } else {
+             //for GlobalDictionaryColumns
+             String hdfsDir = 
conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
+             DictionaryInfo dictionaryInfo = new 
DictionaryInfo(col.getColumnDesc(), col.getDatatype());
+             String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+             builder = (IDictionaryBuilder) 
ClassUtil.newInstance(builderClass);
+             builder.init(dictionaryInfo, 0, hdfsDir);
+         }
+     }
+ 
+     @Override
+     public void doReduce(SelfDefineSortableKey skey, Iterable<NullWritable> 
values, Context context) throws IOException, InterruptedException {
+         Text key = skey.getText();
+         String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+         builder.addValue(value);
+     }
+ 
+     @Override
+     protected void doCleanup(Context context) throws IOException, 
InterruptedException {
+         Dictionary<String> dict = builder.build();
+         outputDict(col, dict);
+     }
+ 
+     private void outputDict(TblColRef col, Dictionary<String> dict) throws 
IOException, InterruptedException {
+         // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+         String dictFileName = col.getIdentity() + "/" + col.getName() + 
DICT_FILE_POSTFIX;
+ 
+         try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
DataOutputStream outputStream = new DataOutputStream(baos);) {
+             outputStream.writeUTF(dict.getClass().getName());
+             dict.write(outputStream);
+ 
+             mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new 
BytesWritable(baos.toByteArray()), dictFileName);
+         }
+         mos.close();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
----------------------------------------------------------------------
diff --cc 
engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index 0000000,0000000..a6bc019
new file mode 100644
--- /dev/null
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@@ -1,0 -1,0 +1,93 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ * 
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++*/
++
++package org.apache.kylin.engine.mr.steps;
++
++import org.apache.kylin.common.util.LocalFileMetadataTestCase;
++import org.apache.kylin.cube.CubeInstance;
++import org.apache.kylin.cube.CubeManager;
++import org.apache.kylin.metadata.model.TblColRef;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++
++/**
++ */
++public class FactDistinctColumnsReducerMappingTest extends 
LocalFileMetadataTestCase {
++
++    @Before
++    public void setUp() throws Exception {
++        createTestMetadata();
++        System.setProperty("kylin.engine.mr.uhc-reducer-count", "2");
++        System.setProperty("kylin.engine.mr.per-reducer-hll-cuboid-number", 
"1");
++        System.setProperty("kylin.engine.mr.hll-max-reducer-number", "2");
++    }
++
++    @After
++    public void after() throws Exception {
++        cleanupTestMetadata();
++        System.clearProperty("kylin.engine.mr.uhc-reducer-count");
++        System.clearProperty("kylin.engine.mr.per-reducer-hll-cuboid-number");
++        System.clearProperty("kylin.engine.mr.hll-max-reducer-number");
++    }
++
++    @Test
++    public void testBasics() {
++        CubeManager mgr = CubeManager.getInstance(getTestConfig());
++        CubeInstance cube = mgr.getCube("ci_left_join_cube");
++        TblColRef aUHC = 
cube.getModel().findColumn("TEST_COUNT_DISTINCT_BITMAP");
++
++        FactDistinctColumnsReducerMapping mapping = new 
FactDistinctColumnsReducerMapping(cube);
++        //System.out.println(mapping.getAllDictCols());
++        
//System.out.println(Arrays.toString(mapping.getAllRolePlaysForReducers()));
++
++        int totalReducerNum = mapping.getTotalReducerNum();
++        Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
++        
++        // check partition column reducer & cuboid row count reducers
++        
Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
++                mapping.getRolePlayOfReducer(totalReducerNum - 1));
++        
Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
++                mapping.getRolePlayOfReducer(totalReducerNum - 2));
++        
Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL,
++                mapping.getRolePlayOfReducer(totalReducerNum - 3));
++        
++        // check all dict column reducers
++        int dictEnd = totalReducerNum - 3;
++        for (int i = 0; i < dictEnd; i++)
++            Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0);
++        
++        // check a UHC dict column
++        Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC));
++        int uhcReducerBegin = -1;
++        for (int i = 0; i < dictEnd; i++) {
++            if (mapping.getDictColForReducer(i).equals(aUHC)) {
++                uhcReducerBegin = i;
++                break;
++            }
++        }
++        
++        int[] allRolePlay = mapping.getAllRolePlaysForReducers();
++        Assert.assertEquals(allRolePlay[uhcReducerBegin], 
allRolePlay[uhcReducerBegin + 1]);
++        for (int i = 0; i < 5; i++) {
++            int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, 
i);
++            Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= 
uhcReducerBegin + 1);
++        }
++    }
++}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --cc examples/test_case_data/sandbox/kylin.properties
index a6d4adc,7271e90..13d62a2
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@@ -153,10 -165,9 +165,13 @@@ kylin.web.help.1=odbc|ODBC Driver
  kylin.web.help.2=tableau|Tableau Guide|
  kylin.web.help.3=onboard|Cube Design Tutorial|
  
 +#allow user to export query result
 +kylin.web.export-allow-admin=true
 +kylin.web.export-allow-other=true
 +
+ # Hide measures in measure list of cube designer, separate by comma
+ kylin.web.hide-measures=RAW
+ 
  ### OTHER ###
  
  # kylin query metrics percentiles intervals default=60, 300, 3600

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITAclTableMigrationToolTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/QueryConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --cc 
server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 132f373,0e9f4ba..f53189b
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@@ -33,12 -37,17 +37,18 @@@ import org.apache.kylin.common.util.Jso
  import org.apache.kylin.cube.CubeInstance;
  import org.apache.kylin.cube.CubeManager;
  import org.apache.kylin.cube.CubeSegment;
+ import org.apache.kylin.cube.cuboid.CuboidScheduler;
+ import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
  import org.apache.kylin.cube.model.CubeBuildTypeEnum;
  import org.apache.kylin.cube.model.CubeDesc;
 +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+ import org.apache.kylin.cube.model.RowKeyColDesc;
  import org.apache.kylin.dimension.DimensionEncodingFactory;
+ import org.apache.kylin.engine.EngineFactory;
+ import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
  import org.apache.kylin.job.JobInstance;
  import org.apache.kylin.job.JoinedFlatTable;
+ import org.apache.kylin.job.exception.JobException;
  import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
  import org.apache.kylin.metadata.model.ISourceAware;
  import org.apache.kylin.metadata.model.SegmentRange;
@@@ -52,7 -63,9 +64,10 @@@ import org.apache.kylin.rest.exception.
  import org.apache.kylin.rest.request.CubeRequest;
  import org.apache.kylin.rest.request.JobBuildRequest;
  import org.apache.kylin.rest.request.JobBuildRequest2;
+ import org.apache.kylin.rest.request.JobOptimizeRequest;
+ import org.apache.kylin.rest.request.SQLRequest;
 +import org.apache.kylin.rest.response.CubeInstanceResponse;
+ import org.apache.kylin.rest.response.CuboidTreeResponse;
  import org.apache.kylin.rest.response.EnvelopeResponse;
  import org.apache.kylin.rest.response.GeneralResponse;
  import org.apache.kylin.rest.response.HBaseResponse;
@@@ -684,6 -787,170 +791,170 @@@ public class CubeController extends Bas
  
      }
  
+     @RequestMapping(value = "/{cubeName}/cuboids/export", method = 
RequestMethod.GET)
+     @ResponseBody
+     public void cuboidsExport(@PathVariable String cubeName, 
@RequestParam(value = "top") Integer top,
+             HttpServletResponse response) throws IOException {
+         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+         if (cube == null) {
+             logger.error("Get cube: [" + cubeName + "] failed when get 
recommend cuboids");
+             throw new BadRequestException("Get cube: [" + cubeName + "] 
failed when get recommend cuboids");
+         }
+         Map<Long, Long> cuboidList = getRecommendCuboidList(cube);
+         if (cuboidList == null || cuboidList.isEmpty()) {
+             logger.warn("Cannot get recommend cuboid list for cube " + 
cubeName);
+         }
+         if (cuboidList.size() < top) {
+             logger.info("Only recommend " + cuboidList.size() + " cuboids 
less than topn " + top);
+         }
+         Iterator<Long> cuboidIterator = cuboidList.keySet().iterator();
+         RowKeyColDesc[] rowKeyColDescList = 
cube.getDescriptor().getRowkey().getRowKeyColumns();
+ 
+         List<Set<String>> dimensionSetList = Lists.newLinkedList();
+         while (top-- > 0 && cuboidIterator.hasNext()) {
+             Set<String> dimensionSet = Sets.newHashSet();
+             dimensionSetList.add(dimensionSet);
+             long cuboid = cuboidIterator.next();
+             for (int i = 0; i < rowKeyColDescList.length; i++) {
+                 if ((cuboid & (1L << rowKeyColDescList[i].getBitIndex())) > 
0) {
+                     dimensionSet.add(rowKeyColDescList[i].getColumn());
+                 }
+             }
+         }
+ 
+         response.setContentType("text/json;charset=utf-8");
+         response.setHeader("Content-Disposition", "attachment; filename=\"" + 
cubeName + ".json\"");
+         try (PrintWriter writer = response.getWriter()) {
+             writer.write(JsonUtil.writeValueAsString(dimensionSetList));
+         } catch (IOException e) {
+             logger.error("", e);
+             throw new InternalErrorException("Failed to write: " + 
e.getLocalizedMessage());
+         }
+     }
+ 
+     @RequestMapping(value = "/{cubeName}/cuboids/current", method = 
RequestMethod.GET)
+     @ResponseBody
+     public CuboidTreeResponse getCurrentCuboids(@PathVariable String 
cubeName) {
+         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+         if (cube == null) {
+             logger.error("Get cube: [" + cubeName + "] failed when get 
current cuboids");
+             throw new BadRequestException("Get cube: [" + cubeName + "] 
failed when get current cuboids");
+         }
+         // The cuboid tree displayed should be consistent with the current one
+         CuboidScheduler cuboidScheduler = cube.getCuboidScheduler();
+         Map<Long, Long> cuboidStatsMap = cube.getCuboids();
+         if (cuboidStatsMap == null) {
+             cuboidStatsMap = 
CuboidStatsReaderUtil.readCuboidStatsFromCube(cuboidScheduler.getAllCuboidIds(),
 cube);
+         }
+ 
+         Map<Long, Long> hitFrequencyMap = null;
+         Map<Long, Long> queryMatchMap = null;
+         try {
+             hitFrequencyMap = getTargetCuboidHitFrequency(cubeName);
+             queryMatchMap = getCuboidQueryMatchCount(cubeName);
+         } catch (Exception e) {
+             logger.warn("Fail to query on system cube due to " + e);
+         }
+ 
+         Set<Long> currentCuboidSet = 
cube.getCuboidScheduler().getAllCuboidIds();
+         return cubeService.getCuboidTreeResponse(cuboidScheduler, 
cuboidStatsMap, hitFrequencyMap, queryMatchMap,
+                 currentCuboidSet);
+     }
+ 
+     @RequestMapping(value = "/{cubeName}/cuboids/recommend", method = 
RequestMethod.GET)
+     @ResponseBody
+     public CuboidTreeResponse getRecommendCuboids(@PathVariable String 
cubeName) throws IOException {
+         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+         if (cube == null) {
+             logger.error("Get cube: [" + cubeName + "] failed when get 
recommend cuboids");
+             throw new BadRequestException("Get cube: [" + cubeName + "] 
failed when get recommend cuboids");
+         }
+         Map<Long, Long> recommendCuboidStatsMap = 
getRecommendCuboidList(cube);
+         if (recommendCuboidStatsMap == null || 
recommendCuboidStatsMap.isEmpty()) {
+             return new CuboidTreeResponse();
+         }
+         CuboidScheduler cuboidScheduler = new 
TreeCuboidScheduler(cube.getDescriptor(),
+                 Lists.newArrayList(recommendCuboidStatsMap.keySet()),
+                 new 
TreeCuboidScheduler.CuboidCostComparator(recommendCuboidStatsMap));
+ 
+         // Get cuboid target info for displaying heat map of cuboid hit
+         Map<Long, Long> displayHitFrequencyMap = 
getTargetCuboidHitFrequency(cubeName);
+         // Get exactly matched cuboid query count
+         Map<Long, Long> queryMatchMap = getCuboidQueryMatchCount(cubeName);
+ 
+         Set<Long> currentCuboidSet = 
cube.getCuboidScheduler().getAllCuboidIds();
+         return cubeService.getCuboidTreeResponse(cuboidScheduler, 
recommendCuboidStatsMap, displayHitFrequencyMap,
+                 queryMatchMap, currentCuboidSet);
+     }
+ 
+     private Map<Long, Long> getRecommendCuboidList(CubeInstance cube) throws 
IOException {
+         // Get cuboid source info
+         Map<Long, Long> optimizeHitFrequencyMap = 
getSourceCuboidHitFrequency(cube.getName());
+         Map<Long, Map<Long, Long>> rollingUpCountSourceMap = 
getCuboidRollingUpCount(cube.getName());
+         return cubeService.getRecommendCuboidStatistics(cube, 
optimizeHitFrequencyMap, rollingUpCountSourceMap);
+     }
+ 
+     private Map<Long, Long> getSourceCuboidHitFrequency(String cubeName) {
+         return getCuboidHitFrequency(cubeName, true);
+     }
+ 
+     private Map<Long, Long> getTargetCuboidHitFrequency(String cubeName) {
+         return getCuboidHitFrequency(cubeName, false);
+     }
+ 
+     private Map<Long, Long> getCuboidHitFrequency(String cubeName, boolean 
isCuboidSource) {
+         SQLRequest sqlRequest = new SQLRequest();
+         sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT);
+         String cuboidColumn = QueryCubePropertyEnum.CUBOID_SOURCE.toString();
+         if (!isCuboidSource) {
+             cuboidColumn = QueryCubePropertyEnum.CUBOID_TARGET.toString();
+         }
+         String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString();
+         String table = cubeService.getMetricsManager()
+                 
.getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube());
+         String sql = "select " + cuboidColumn + ", sum(" + hitMeasure + ") " 
//
+                 + "from " + table//
+                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" 
+ cubeName + "' " //
+                 + "group by " + cuboidColumn;
+         sqlRequest.setSql(sql);
 -        List<List<String>> orgHitFrequency = 
queryService.queryWithoutSecure(sqlRequest).getResults();
++        List<List<String>> orgHitFrequency = 
queryService.doQueryWithCache(sqlRequest).getResults();
+         return cubeService.formatQueryCount(orgHitFrequency);
+     }
+ 
+     private Map<Long, Map<Long, Long>> getCuboidRollingUpCount(String 
cubeName) {
+         SQLRequest sqlRequest = new SQLRequest();
+         sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT);
+         String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString();
+         String cuboidTarget = QueryCubePropertyEnum.CUBOID_TARGET.toString();
+         String aggCount = QueryCubePropertyEnum.AGGR_COUNT.toString();
+         String table = cubeService.getMetricsManager()
+                 
.getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube());
+         String sql = "select " + cuboidSource + ", " + cuboidTarget + ", 
sum(" + aggCount + ")/count(*) " //
+                 + "from " + table //
+                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" 
+ cubeName + "' " //
+                 + "group by " + cuboidSource + ", " + cuboidTarget;
+         sqlRequest.setSql(sql);
 -        List<List<String>> orgRollingUpCount = 
queryService.queryWithoutSecure(sqlRequest).getResults();
++        List<List<String>> orgRollingUpCount = 
queryService.doQueryWithCache(sqlRequest).getResults();
+         return cubeService.formatRollingUpCount(orgRollingUpCount);
+     }
+ 
+     private Map<Long, Long> getCuboidQueryMatchCount(String cubeName) {
+         SQLRequest sqlRequest = new SQLRequest();
+         sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT);
+         String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString();
+         String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString();
+         String table = cubeService.getMetricsManager()
+                 
.getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube());
+         String sql = "select " + cuboidSource + ", sum(" + hitMeasure + ") " 
//
+                 + "from " + table //
+                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" 
+ cubeName + "' and "
+                 + QueryCubePropertyEnum.IF_MATCH.toString() + " = true " //
+                 + "group by " + cuboidSource;
+         sqlRequest.setSql(sql);
 -        List<List<String>> orgMatchHitFrequency = 
queryService.queryWithoutSecure(sqlRequest).getResults();
++        List<List<String>> orgMatchHitFrequency = 
queryService.doQueryWithCache(sqlRequest).getResults();
+         return cubeService.formatQueryCount(orgMatchHitFrequency);
+     }
+ 
      /**
       * Initiate the very beginning of a streaming cube. Will seek the latest 
offests of each partition from streaming
       * source (kafka) and record in the cube descriptor; In the first build 
job, it will use these offests as the start point.

Reply via email to