http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 89bb11e,575c7b3..569b810 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@@ -54,20 -56,20 +56,17 @@@ public class FactDistinctColumnsMapper< BYTES } - protected CuboidScheduler cuboidScheduler = null; - protected boolean collectStatistics = false; protected int nRowKey; private Integer[][] allCuboidsBitSet = null; private HLLCounter[] allCuboidsHLL = null; private Long[] cuboidIds; - private HashFunction hf = null; private int rowCount = 0; private int samplingPercentage; - //private ByteArray[] row_hashcodes = null; - private long[] rowHashCodesLong = null; private ByteBuffer tmpbuf; + + private CuboidStatCalculator[] cuboidStatCalculators; + private static final Text EMPTY_TEXT = new Text(); -- public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; -- public static final byte MARK_FOR_HLL = (byte) 0xFF; private int partitionColumnIndex = -1; private boolean needFetchPartitionCol = true; @@@ -81,70 -80,95 +77,93 @@@ protected void doSetup(Context context) throws IOException { super.doSetup(context); tmpbuf = ByteBuffer.allocate(4096); - collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); - if (collectStatistics) { - samplingPercentage = Integer - .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; - - Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds()); - if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSeg)) { - // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated - // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids. - cuboidIdSet.addAll(cubeSeg.getCubeDesc().getMandatoryCuboids()); - } - cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]); - allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey); + - allCuboidsHLL = new HLLCounter[cuboidIds.length]; - for (int i = 0; i < cuboidIds.length; i++) { - allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); - } + samplingPercentage = Integer + .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - cuboidScheduler = cubeDesc.getInitialCuboidScheduler(); + nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; - List<Long> cuboidIdList = Lists.newArrayList(); - List<Integer[]> allCuboidsBitSetList = Lists.newArrayList(); - addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList); - - allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); - cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); - TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (partitionColRef != null) { - partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); - } ++ Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds()); ++ if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSeg)) { ++ // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated ++ // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids. ++ cuboidIdSet.addAll(cubeSeg.getCubeDesc().getMandatoryCuboids()); ++ } ++ cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]); ++ allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey); - // check whether need fetch the partition col values - if (partitionColumnIndex < 0) { - // if partition col not on cube, no need - needFetchPartitionCol = false; - } else { - needFetchPartitionCol = true; - } - //for KYLIN-2518 backward compatibility - boolean isUsePutRowKeyToHllNewAlgorithm; - if (KylinVersion.isBefore200(cubeDesc.getVersion())) { - isUsePutRowKeyToHllNewAlgorithm = false; - logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion()); - } else { - isUsePutRowKeyToHllNewAlgorithm = true; - logger.info( - "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", - cubeDesc.getVersion()); - } + allCuboidsHLL = new HLLCounter[cuboidIds.length]; + for (int i = 0; i < cuboidIds.length; i++) { + allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); + } - int calculatorNum = getStatsThreadNum(cuboidIds.length); - cuboidStatCalculators = new CuboidStatCalculator[calculatorNum]; - int splitSize = cuboidIds.length / calculatorNum; - if (splitSize <= 0) { - splitSize = 1; - } - for (int i = 0; i < calculatorNum; i++) { - HLLCounter[] cuboidsHLLSplit; - Integer[][] cuboidsBitSetSplit; - Long[] cuboidIdSplit; - int start = i * splitSize; - if (start >= cuboidIds.length) { - break; - } - int end = (i + 1) * splitSize; - if (i == calculatorNum - 1) {// last split - end = cuboidIds.length; - } + TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + if (partitionColRef != null) { + partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); + } - cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end); - cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end); - cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end); - CuboidStatCalculator calculator = new CuboidStatCalculator(i, - intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit, - isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit); - cuboidStatCalculators[i] = calculator; - calculator.start(); + // check whether need fetch the partition col values + if (partitionColumnIndex < 0) { + // if partition col not on cube, no need + needFetchPartitionCol = false; + } else { + needFetchPartitionCol = true; + } + //for KYLIN-2518 backward compatibility ++ boolean isUsePutRowKeyToHllNewAlgorithm; + if (KylinVersion.isBefore200(cubeDesc.getVersion())) { + isUsePutRowKeyToHllNewAlgorithm = false; - hf = Hashing.murmur3_32(); + logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion()); + } else { + isUsePutRowKeyToHllNewAlgorithm = true; - rowHashCodesLong = new long[nRowKey]; - hf = Hashing.murmur3_128(); + logger.info( + "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", + cubeDesc.getVersion()); + } + - } ++ int calculatorNum = getStatsThreadNum(cuboidIds.length); ++ cuboidStatCalculators = new CuboidStatCalculator[calculatorNum]; ++ int splitSize = cuboidIds.length / calculatorNum; ++ if (splitSize <= 0) { ++ splitSize = 1; ++ } ++ for (int i = 0; i < calculatorNum; i++) { ++ HLLCounter[] cuboidsHLLSplit; ++ Integer[][] cuboidsBitSetSplit; ++ Long[] cuboidIdSplit; ++ int start = i * splitSize; ++ if (start >= cuboidIds.length) { ++ break; + } ++ int end = (i + 1) * splitSize; ++ if (i == calculatorNum - 1) {// last split ++ end = cuboidIds.length; ++ } + - private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) { - allCuboids.add(cuboidId); - Integer[] indice = new Integer[Long.bitCount(cuboidId)]; ++ cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end); ++ cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end); ++ cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end); ++ CuboidStatCalculator calculator = new CuboidStatCalculator(i, ++ intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit, ++ isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit); ++ cuboidStatCalculators[i] = calculator; ++ calculator.start(); + } + } - long mask = Long.highestOneBit(baseCuboidId); - int position = 0; - for (int i = 0; i < nRowKey; i++) { - if ((mask & cuboidId) > 0) { - indice[position] = i; - position++; - } - mask = mask >> 1; + private int getStatsThreadNum(int cuboidNum) { + int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatsCalculator(); + if (unitNum <= 0) { + logger.warn("config from getCuboidNumberPerStatsCalculator() " + unitNum + " is should larger than 0"); + logger.info("Will use single thread for cuboid statistics calculation"); + return 1; } - allCuboidsBitSet.add(indice); - Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId); - for (Long childId : children) { - addCuboidBitSet(childId, allCuboidsBitSet, allCuboids); + int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber(); + int calculatorNum = (cuboidNum - 1) / unitNum + 1; + if (calculatorNum > maxCalculatorNum) { + calculatorNum = maxCalculatorNum; } + return calculatorNum; } @Override @@@ -158,15 -182,16 +177,8 @@@ if (fieldValue == null) continue; -- int reducerIndex; -- if (uhcIndex[i] == 0) { -- //for the normal dictionary column -- reducerIndex = columnIndexToReducerBeginId.get(i); -- } else { -- //for the uhc - reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; - reducerIndex = columnIndexToReducerBeginId.get(i) - + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; -- } -- ++ int reducerIndex = reducerMapping.getReducerIdForDictCol(i, fieldValue); ++ tmpbuf.clear(); byte[] valueBytes = Bytes.toBytes(fieldValue); int size = valueBytes.length + 1; @@@ -187,28 -213,26 +200,24 @@@ } } - if (collectStatistics) { - if (rowCount % 100 < samplingPercentage) { - putRowKeyToHLL(row); - } + if (rowCount % 100 < samplingPercentage) { - if (isUsePutRowKeyToHllNewAlgorithm) { - putRowKeyToHLLNew(row); - } else { - putRowKeyToHLLOld(row); - } ++ putRowKeyToHLL(row); + } - if (needFetchPartitionCol == true) { - String fieldValue = row[partitionColumnIndex]; - if (fieldValue != null) { - tmpbuf.clear(); - byte[] valueBytes = Bytes.toBytes(fieldValue); - int size = valueBytes.length + 1; - if (size >= tmpbuf.capacity()) { - tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); - } - tmpbuf.put(MARK_FOR_PARTITION_COL); - tmpbuf.put(valueBytes); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - sortableKey.init(outputKey, (byte) 0); - context.write(sortableKey, EMPTY_TEXT); + if (needFetchPartitionCol == true) { + String fieldValue = row[partitionColumnIndex]; + if (fieldValue != null) { + tmpbuf.clear(); + byte[] valueBytes = Bytes.toBytes(fieldValue); + int size = valueBytes.length + 1; + if (size >= tmpbuf.capacity()) { + tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); } - tmpbuf.put(MARK_FOR_PARTITION_COL); ++ tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL); + tmpbuf.put(valueBytes); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + sortableKey.init(outputKey, (byte) 0); + context.write(sortableKey, EMPTY_TEXT); } } rowCount++; @@@ -224,78 -254,171 +239,169 @@@ return size; } - private void putRowKeyToHLLOld(String[] row) { - //generate hash for each row key column - byte[][] rowHashCodes = new byte[nRowKey][]; - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hf.newHasher(); - String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; - if (colValue != null) { - rowHashCodes[i] = hc.putString(colValue).hash().asBytes(); + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { - if (collectStatistics) { - ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - // output each cuboid's hll to reducer, key is 0 - cuboidId - for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { - cuboidStatCalculator.waitForCompletion(); - } - for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { - Long[] cuboidIds = cuboidStatCalculator.getCuboidIds(); - HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters(); - HLLCounter hll; ++ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); ++ // output each cuboid's hll to reducer, key is 0 - cuboidId ++ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { ++ cuboidStatCalculator.waitForCompletion(); ++ } ++ for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { ++ Long[] cuboidIds = cuboidStatCalculator.getCuboidIds(); ++ HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters(); ++ HLLCounter hll; + - for (int i = 0; i < cuboidIds.length; i++) { - hll = cuboidsHLL[i]; - tmpbuf.clear(); - tmpbuf.put(MARK_FOR_HLL); // one byte - tmpbuf.putLong(cuboidIds[i]); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - hllBuf.clear(); - hll.writeRegisters(hllBuf); - outputValue.set(hllBuf.array(), 0, hllBuf.position()); - sortableKey.init(outputKey, (byte) 0); - context.write(sortableKey, outputValue); - } ++ for (int i = 0; i < cuboidIds.length; i++) { ++ hll = cuboidsHLL[i]; ++ tmpbuf.clear(); ++ tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte ++ tmpbuf.putLong(cuboidIds[i]); ++ outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); ++ hllBuf.clear(); ++ hll.writeRegisters(hllBuf); ++ outputValue.set(hllBuf.array(), 0, hllBuf.position()); ++ sortableKey.init(outputKey, (byte) 0); ++ context.write(sortableKey, outputValue); + } + } + } + + private int countNewSize(int oldSize, int dataSize) { + int newSize = oldSize * 2; + while (newSize < dataSize) { + newSize = newSize * 2; + } + return newSize; + } + + public static class CuboidStatCalculator implements Runnable { + private final int id; + private final int nRowKey; + private final int[] rowkeyColIndex; + private final Long[] cuboidIds; + private final Integer[][] cuboidsBitSet; + private volatile HLLCounter[] cuboidsHLL = null; + + //about details of the new algorithm, please see KYLIN-2518 + private final boolean isNewAlgorithm; + private final HashFunction hf; + private long[] rowHashCodesLong; + + private BlockingQueue<String[]> queue = new LinkedBlockingQueue<String[]>(2000); + private Thread workThread; + private volatile boolean stop; + + public CuboidStatCalculator(int id, int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet, + boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) { + this.id = id; + this.nRowKey = rowkeyColIndex.length; + this.rowkeyColIndex = rowkeyColIndex; + this.cuboidIds = cuboidIds; + this.cuboidsBitSet = cuboidsBitSet; + this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm; + if (!isNewAlgorithm) { + this.hf = Hashing.murmur3_32(); } else { - rowHashCodes[i] = hc.putInt(0).hash().asBytes(); + rowHashCodesLong = new long[nRowKey]; + this.hf = Hashing.murmur3_128(); } + this.cuboidsHLL = cuboidsHLL; + workThread = new Thread(this); + } + + public void start() { + logger.info("cuboid stats calculator:" + id + " started, handle cuboids number:" + cuboidIds.length); + workThread.start(); } - // user the row key column hash to get a consolidated hash for each cuboid - for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { - Hasher hc = hf.newHasher(); - for (int position = 0; position < allCuboidsBitSet[i].length; position++) { - hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]); + public void putRow(final String[] row) { + String[] copyRow = Arrays.copyOf(row, row.length); + try { + queue.put(copyRow); + } catch (InterruptedException e) { + logger.error("interrupt", e); } + } - allCuboidsHLL[i].add(hc.hash().asBytes()); + public void waitForCompletion() { + stop = true; + try { + workThread.join(); + } catch (InterruptedException e) { + logger.error("interrupt", e); + } } - } - private void putRowKeyToHLLNew(String[] row) { - //generate hash for each row key column - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hf.newHasher(); - String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; - if (colValue == null) - colValue = "0"; - byte[] bytes = hc.putString(colValue).hash().asBytes(); - rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a) + private void putRowKeyToHLLOld(String[] row) { + //generate hash for each row key column + byte[][] rowHashCodes = new byte[nRowKey][]; + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[rowkeyColIndex[i]]; + if (colValue != null) { + rowHashCodes[i] = hc.putString(colValue).hash().asBytes(); + } else { + rowHashCodes[i] = hc.putInt(0).hash().asBytes(); + } + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = cuboidsBitSet.length; i < n; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < cuboidsBitSet[i].length; position++) { + hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]); + } + + cuboidsHLL[i].add(hc.hash().asBytes()); + } } - // user the row key column hash to get a consolidated hash for each cuboid - for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { - long value = 0; - for (int position = 0; position < allCuboidsBitSet[i].length; position++) { - value += rowHashCodesLong[allCuboidsBitSet[i][position]]; + private void putRowKeyToHLLNew(String[] row) { + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[rowkeyColIndex[i]]; + if (colValue == null) + colValue = "0"; + byte[] bytes = hc.putString(colValue).hash().asBytes(); + rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a) + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = cuboidsBitSet.length; i < n; i++) { + long value = 0; + for (int position = 0; position < cuboidsBitSet[i].length; position++) { + value += rowHashCodesLong[cuboidsBitSet[i][position]]; + } + cuboidsHLL[i].addHashDirectly(value); } - allCuboidsHLL[i].addHashDirectly(value); } - } - @Override - protected void doCleanup(Context context) throws IOException, InterruptedException { - ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - // output each cuboid's hll to reducer, key is 0 - cuboidId - HLLCounter hll; - for (int i = 0; i < cuboidIds.length; i++) { - hll = allCuboidsHLL[i]; - - tmpbuf.clear(); - tmpbuf.put(MARK_FOR_HLL); // one byte - tmpbuf.putLong(cuboidIds[i]); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - hllBuf.clear(); - hll.writeRegisters(hllBuf); - outputValue.set(hllBuf.array(), 0, hllBuf.position()); - sortableKey.init(outputKey, (byte) 0); - context.write(sortableKey, outputValue); + public HLLCounter[] getHLLCounters() { + return cuboidsHLL; } - } + public Long[] getCuboidIds() { + return cuboidIds; + } - private int countNewSize(int oldSize, int dataSize) { - int newSize = oldSize * 2; - while (newSize < dataSize) { - newSize = newSize * 2; + @Override + public void run() { + while (true) { + String[] row = queue.poll(); + if (row == null && stop) { + logger.info("cuboid stats calculator:" + id + " completed."); + break; + } else if (row == null) { + Thread.yield(); + continue; + } + if (isNewAlgorithm) { + putRowKeyToHLLNew(row); + } else { + putRowKeyToHLLOld(row); + } + } } - return newSize; } - }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 00b831a,00b831a..c66042b --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@@ -20,9 -20,9 +20,7 @@@ package org.apache.kylin.engine.mr.step import java.io.IOException; import java.util.Arrays; --import java.util.HashMap; import java.util.List; --import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@@ -63,10 -63,10 +61,8 @@@ abstract public class FactDistinctColum protected CubeJoinedFlatTableEnrich intermediateTableDesc; protected int[] dictionaryColumnIndex; -- protected int uhcReducerCount; -- protected int[] uhcIndex; -- protected Map<Integer, Integer> columnIndexToReducerBeginId = new HashMap<>(); -- ++ protected FactDistinctColumnsReducerMapping reducerMapping; ++ @Override protected void doSetup(Context context) throws IOException { Configuration conf = context.getConfiguration(); @@@ -90,15 -90,15 +86,8 @@@ dictionaryColumnIndex[i] = columnIndexOnFlatTbl; } -- uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc); -- uhcReducerCount = cube.getConfig().getUHCReducerCount(); -- int count = 0; -- for (int i = 0; i < uhcIndex.length; i++) { -- columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) + i); -- if (uhcIndex[i] == 1) { -- count++; -- } -- } ++ reducerMapping = new FactDistinctColumnsReducerMapping(cube, ++ conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1)); } protected void handleErrorRecord(String[] record, Exception ex) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 8273530,37972c0..cad947c --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@@ -22,7 -22,7 +22,6 @@@ import java.io.DataOutputStream import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; --import java.util.HashMap; import java.util.List; import java.util.Map; @@@ -61,7 -61,7 +60,6 @@@ public class FactDistinctColumnsReduce private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); -- private List<TblColRef> columnList; private List<Long> baseCuboidRowCountInMappers; protected Map<Long, HLLCounter> cuboidHLLMap = null; protected long baseCuboidId; @@@ -71,11 -71,11 +69,10 @@@ private TblColRef col = null; private boolean isStatistics = false; private KylinConfig cubeConfig; -- private int uhcReducerCount; -- private Map<Integer, Integer> reducerIdToColumnIndex = new HashMap<>(); private int taskId; private boolean isPartitionCol = false; private int rowCount = 0; ++ private FactDistinctColumnsReducerMapping reducerMapping; //local build dict private boolean buildDictInReducer; @@@ -98,33 -98,49 +95,36 @@@ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); cubeConfig = cube.getConfig(); cubeDesc = cube.getDescriptor(); -- columnList = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); - boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED)); int numberOfTasks = context.getNumReduceTasks(); taskId = context.getTaskAttemptID().getTaskID().getId(); -- uhcReducerCount = cube.getConfig().getUHCReducerCount(); -- initReducerIdToColumnIndex(config); - - boolean ifCol = true; - if (collectStatistics) { - int hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 0); - if (hllShardBase <= 0) { - throw new IllegalArgumentException( - "In job configuration the value for property " + BatchConstants.CFG_HLL_SHARD_BASE - + " is " + hllShardBase + ". It should be set correctly!!!"); - } - ifCol = false; - if (taskId >= numberOfTasks - hllShardBase) { - // hll - isStatistics = true; - baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); - baseCuboidRowCountInMappers = Lists.newArrayList(); - cuboidHLLMap = Maps.newHashMap(); - samplingPercentage = Integer - .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - logger.info("Reducer " + taskId + " handling stats"); - } else if (taskId == numberOfTasks - hllShardBase - 1) { - // partition col - isPartitionCol = true; - col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (col == null) { - logger.info("No partition col. This reducer will do nothing"); - } else { - logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity()); - } ++ reducerMapping = new FactDistinctColumnsReducerMapping(cube, ++ conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1)); ++ ++ logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId)); + - if (taskId == numberOfTasks - 1) { ++ if (reducerMapping.isCuboidRowCounterReducer(taskId)) { + // hll + isStatistics = true; ++ baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); + baseCuboidRowCountInMappers = Lists.newArrayList(); + cuboidHLLMap = Maps.newHashMap(); - samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); ++ samplingPercentage = Integer ++ .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + logger.info("Reducer " + taskId + " handling stats"); - } else if (taskId == numberOfTasks - 2) { ++ } else if (reducerMapping.isPartitionColReducer(taskId)) { + // partition col + isPartitionCol = true; + col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + if (col == null) { + logger.info("No partition col. This reducer will do nothing"); } else { - ifCol = true; + logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity()); } - } - if (ifCol) { + } else { // normal col -- col = columnList.get(reducerIdToColumnIndex.get(taskId)); ++ col = reducerMapping.getDictColForReducer(taskId); Preconditions.checkNotNull(col); // local build dict @@@ -132,11 -148,11 +132,8 @@@ if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder buildDictInReducer = false; } -- if(config.getUHCReducerCount() > 1) { -- int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc); -- int colIndex = reducerIdToColumnIndex.get(taskId); -- if (uhcIndex[colIndex] == 1) -- buildDictInReducer = false; //for UHC columns, this feature should be disabled ++ if (reducerMapping.getReducerNumForDictCol(col) > 1) { ++ buildDictInReducer = false; // only works if this is the only reducer of a dictionary column } if (buildDictInReducer) { builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); @@@ -146,20 -162,20 +143,6 @@@ } } -- private void initReducerIdToColumnIndex(KylinConfig config) throws IOException { -- int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc); -- int count = 0; -- for (int i = 0; i < uhcIndex.length; i++) { -- reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i); -- if (uhcIndex[i] == 1) { -- for (int j = 1; j < uhcReducerCount; j++) { -- reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i); -- } -- count++; -- } -- } -- } -- @Override public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text key = skey.getText(); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java index 0000000,0000000..51594c3 new file mode 100644 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java @@@ -1,0 -1,0 +1,156 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++*/ ++ ++package org.apache.kylin.engine.mr.steps; ++ ++import java.util.ArrayList; ++import java.util.List; ++ ++import org.apache.kylin.cube.CubeInstance; ++import org.apache.kylin.cube.model.CubeDesc; ++import org.apache.kylin.engine.mr.common.MapReduceUtil; ++import org.apache.kylin.metadata.model.TblColRef; ++ ++/** ++ * Reducers play different roles based on reducer-id: ++ * - (start from 0) one reducer for each dictionary column, UHC may have more than one reducer ++ * - one reducer to get min/max of date partition column ++ * - (at the end) one or more reducers to collect row counts for cuboids using HLL ++ */ ++public class FactDistinctColumnsReducerMapping { ++ ++ public static final int MARK_FOR_PARTITION_COL = -2; ++ public static final int MARK_FOR_HLL_COUNTER = -1; ++ ++ final private int nDictValueCollectors; ++ final private int datePartitionReducerId; ++ final private int nCuboidRowCounters; ++ final private int nTotalReducers; ++ ++ final private List<TblColRef> allDictCols; ++ final private int[] dictColIdToReducerBeginId; ++ final private int[] reducerRolePlay; // >=0 for dict col id, <0 for partition col and hll counter (using markers) ++ ++ public FactDistinctColumnsReducerMapping(CubeInstance cube) { ++ this(cube, 0); ++ } ++ ++ public FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) { ++ CubeDesc desc = cube.getDescriptor(); ++ ++ allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt()); ++ ++ dictColIdToReducerBeginId = new int[allDictCols.size() + 1]; ++ ++ int uhcReducerCount = cube.getConfig().getUHCReducerCount(); ++ List<TblColRef> uhcList = desc.getAllUHCColumns(); ++ int counter = 0; ++ for (int i = 0; i < allDictCols.size(); i++) { ++ dictColIdToReducerBeginId[i] = counter; ++ boolean isUHC = uhcList.contains(allDictCols.get(i)); ++ counter += (isUHC) ? uhcReducerCount : 1; ++ } ++ ++ dictColIdToReducerBeginId[allDictCols.size()] = counter; ++ nDictValueCollectors = counter; ++ datePartitionReducerId = counter; ++ ++ nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? // ++ MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : cuboidRowCounterReducerNum; ++ nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters; ++ ++ reducerRolePlay = new int[nTotalReducers]; ++ for (int i = 0, dictId = 0; i < nTotalReducers; i++) { ++ if (i > datePartitionReducerId) { ++ // cuboid HLL counter reducer ++ reducerRolePlay[i] = MARK_FOR_HLL_COUNTER; ++ } else if (i == datePartitionReducerId) { ++ // date partition min/max reducer ++ reducerRolePlay[i] = MARK_FOR_PARTITION_COL; ++ } else { ++ // dict value collector reducer ++ if (i == dictColIdToReducerBeginId[dictId + 1]) ++ dictId++; ++ ++ reducerRolePlay[i] = dictId; ++ } ++ } ++ } ++ ++ public List<TblColRef> getAllDictCols() { ++ return allDictCols; ++ } ++ ++ public int getTotalReducerNum() { ++ return nTotalReducers; ++ } ++ ++ public int getCuboidRowCounterReducerNum() { ++ return nCuboidRowCounters; ++ } ++ ++ public int getReducerIdForDictCol(int dictColId, Object fieldValue) { ++ int begin = dictColIdToReducerBeginId[dictColId]; ++ int span = dictColIdToReducerBeginId[dictColId + 1] - begin; ++ ++ if (span == 1) ++ return begin; ++ ++ int hash = fieldValue == null ? 0 : fieldValue.hashCode(); ++ return begin + Math.abs(hash) % span; ++ } ++ ++ public int[] getAllRolePlaysForReducers() { ++ return reducerRolePlay; ++ } ++ ++ public int getRolePlayOfReducer(int reducerId) { ++ return reducerRolePlay[reducerId]; ++ } ++ ++ public boolean isCuboidRowCounterReducer(int reducerId) { ++ return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER; ++ } ++ ++ public boolean isPartitionColReducer(int reducerId) { ++ return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL; ++ } ++ ++ public TblColRef getDictColForReducer(int reducerId) { ++ int role = getRolePlayOfReducer(reducerId); ++ if (role < 0) ++ throw new IllegalStateException(); ++ ++ return allDictCols.get(role); ++ } ++ ++ public int getReducerNumForDictCol(TblColRef col) { ++ int dictColId = allDictCols.indexOf(col); ++ return dictColIdToReducerBeginId[dictColId + 1] - dictColIdToReducerBeginId[dictColId]; ++ } ++ ++ public int getReducerIdForDatePartitionColumn() { ++ return datePartitionReducerId; ++ } ++ ++ public int getReducerIdForCuboidRowCount(long cuboidId) { ++ int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters); ++ return datePartitionReducerId + 1 + rowCounterId; ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index 2a184b0,3a1f852..da1576f --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@@ -63,20 -63,16 +63,20 @@@ public class MergeDictionaryStep extend try { checkLookupSnapshotsMustIncremental(mergingSegments); - makeDictForNewSegment(conf, cube, newSegment, mergingSegments); - makeSnapshotForNewSegment(cube, newSegment, mergingSegments); + // work on copy instead of cached objects + CubeInstance cubeCopy = cube.latestCopyForWrite(); + CubeSegment newSegCopy = cubeCopy.getSegmentById(newSegment.getUuid()); + + makeDictForNewSegment(conf, cubeCopy, newSegCopy, mergingSegments); + makeSnapshotForNewSegment(cubeCopy, newSegCopy, mergingSegments); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(newSegment); - mgr.updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(newSegCopy); + mgr.updateCube(update); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return ExecuteResult.createSucceed(); } catch (IOException e) { logger.error("fail to merge dictionary or lookup snapshots", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + return ExecuteResult.createError(e); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index f15819f,921494f..b532360 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@@ -63,11 -78,59 +78,62 @@@ public class SaveStatisticsStep extend throw new IOException("fail to find the statistics file in base dir: " + statisticsDir); } - FSDataInputStream is = fs.open(statisticsFilePath); + Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); + long totalRowsBeforeMerge = 0; + long grantTotal = 0; + int samplingPercentage = -1; + int mapperNumber = -1; + for (Path item : statisticsFiles) { + CubeStatsReader.CubeStatsResult cubeStatsResult = new CubeStatsReader.CubeStatsResult(item, + kylinConf.getCubeStatsHLLPrecision()); + cuboidHLLMap.putAll(cubeStatsResult.getCounterMap()); + long pGrantTotal = 0L; + for (HLLCounter hll : cubeStatsResult.getCounterMap().values()) { + pGrantTotal += hll.getCountEstimate(); + } + totalRowsBeforeMerge += pGrantTotal * cubeStatsResult.getMapperOverlapRatio(); + grantTotal += pGrantTotal; + int pMapperNumber = cubeStatsResult.getMapperNumber(); + if (pMapperNumber > 0) { + if (mapperNumber < 0) { + mapperNumber = pMapperNumber; + } else { + throw new RuntimeException( + "Base cuboid has been distributed to multiple reducers at step FactDistinctColumnsReducer!!!"); + } + } + int pSamplingPercentage = cubeStatsResult.getPercentage(); + if (samplingPercentage < 0) { + samplingPercentage = pSamplingPercentage; + } else if (samplingPercentage != pSamplingPercentage) { + throw new RuntimeException( + "The sampling percentage should be same among all of the reducer of FactDistinctColumnsReducer!!!"); + } + } + if (samplingPercentage < 0) { + logger.warn("The sampling percentage should be set!!!"); + } + if (mapperNumber < 0) { + logger.warn("The mapper number should be set!!!"); + } + + if (logger.isDebugEnabled()) { + logMapperAndCuboidStatistics(cuboidHLLMap, samplingPercentage, mapperNumber, grantTotal, + totalRowsBeforeMerge); + } + double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal; + CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage, + mapperNumber, mapperOverlapRatio); + + Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); ++ logger.info(newSegment + " stats saved to hdfs " + statisticsFile); ++ + FSDataInputStream is = fs.open(statisticsFile); try { // put the statistics to metadata store -- String statisticsFileName = newSegment.getStatisticsResourcePath(); -- rs.putResource(statisticsFileName, is, System.currentTimeMillis()); ++ String resPath = newSegment.getStatisticsResourcePath(); ++ rs.putResource(resPath, is, System.currentTimeMillis()); ++ logger.info(newSegment + " stats saved to resource " + resPath); CubingJob cubingJob = (CubingJob) getManager() .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java index 0000000,485975a..1b1a7f0 mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java @@@ -1,0 -1,154 +1,154 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.steps; + ++import java.io.IOException; ++import java.util.List; ++import java.util.Map; ++ + import org.apache.commons.cli.Options; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.io.BytesWritable; + import org.apache.hadoop.io.NullWritable; + import org.apache.hadoop.mapreduce.Job; + import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; + import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; + import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; + import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + import org.apache.hadoop.util.ToolRunner; + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.util.HadoopUtil; + import org.apache.kylin.cube.CubeInstance; + import org.apache.kylin.cube.CubeManager; + import org.apache.kylin.engine.mr.common.AbstractHadoopJob; + import org.apache.kylin.engine.mr.common.BatchConstants; + import org.apache.kylin.metadata.model.TblColRef; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + -import java.io.IOException; -import java.util.List; -import java.util.Map; - + public class UHCDictionaryJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(UHCDictionaryJob.class); + + private boolean isSkipped = false; + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_INPUT_PATH); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String job_id = getOptionValue(OPTION_CUBING_JOB_ID); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + + //add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + attachCubeMetadata(cube, job.getConfiguration()); + - List<TblColRef> uhcColumns = cubeMgr.getAllUHCColumns(cube.getDescriptor()); ++ List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns(); + int reducerCount = uhcColumns.size(); + + //Note! handle uhc columns is null. + boolean hasUHCValue = false; + for (TblColRef tblColRef : uhcColumns) { + Path path = new Path(input.toString() + "/" + tblColRef.getIdentity()); + if (HadoopUtil.getFileSystem(path).exists(path)) { + FileInputFormat.addInputPath(job, path); + hasUHCValue = true; + } + } + + if (!hasUHCValue) { + isSkipped = true; + return 0; + } + + setJobClasspath(job, cube.getConfig()); + setupMapper(); + setupReducer(output, reducerCount); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id); + job.getConfiguration().set(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false"); + + //8G memory is enough for all global dict, because the input is sequential and we handle global dict slice by slice + job.getConfiguration().set("mapreduce.reduce.memory.mb", "8500"); + job.getConfiguration().set("mapred.reduce.child.java.opts", "-Xmx8g"); + //Copying global dict to working dir in GlobalDictHDFSStore maybe elapsed a long time (Maybe we could improve it) + //Waiting the global dict lock maybe also take a long time. + //So we set 8 hours here + job.getConfiguration().set("mapreduce.task.timeout", "28800000"); + + //allow user specially set config for uhc step + for (Map.Entry<String, String> entry : cube.getConfig().getUHCMRConfigOverride().entrySet()) { + job.getConfiguration().set(entry.getKey(), entry.getValue()); + } + + return waitForCompletion(job); + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + private void setupMapper() throws IOException { + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(UHCDictionaryMapper.class); + job.setMapOutputKeyClass(SelfDefineSortableKey.class); + job.setMapOutputValueClass(NullWritable.class); + } + + private void setupReducer(Path output, int numberOfReducers) throws IOException { + job.setReducerClass(UHCDictionaryReducer.class); + job.setPartitionerClass(UHCDictionaryPartitioner.class); + job.setNumReduceTasks(numberOfReducers); + + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class); + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + + //prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); + + deletePath(job.getConfiguration(), output); + } + + @Override + public boolean isSkipped() { + return isSkipped; + } + + public static void main(String[] args) throws Exception { + UHCDictionaryJob job = new UHCDictionaryJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java index 0000000,d9d7b60..abec8fc mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java @@@ -1,0 -1,101 +1,101 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.steps; + ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import java.util.List; ++ + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.NullWritable; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.mapreduce.lib.input.FileSplit; + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.util.Bytes; + import org.apache.kylin.cube.CubeInstance; + import org.apache.kylin.cube.CubeManager; + import org.apache.kylin.engine.mr.KylinMapper; + import org.apache.kylin.engine.mr.common.AbstractHadoopJob; + import org.apache.kylin.engine.mr.common.BatchConstants; + import org.apache.kylin.metadata.datatype.DataType; + import org.apache.kylin.metadata.model.TblColRef; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - + public class UHCDictionaryMapper extends KylinMapper<NullWritable, Text, SelfDefineSortableKey, NullWritable> { + private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryMapper.class); + + protected int index; + protected DataType type; + + protected Text outputKey = new Text(); + private ByteBuffer tmpBuf; + private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); + + @Override + protected void doSetup(Context context) throws IOException { + tmpBuf = ByteBuffer.allocate(4096); + + Configuration conf = context.getConfiguration(); + bindCurrentConfiguration(conf); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + CubeInstance cube = CubeManager.getInstance(config).getCube(conf.get(BatchConstants.CFG_CUBE_NAME)); - List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor()); ++ List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns(); + + FileSplit fileSplit = (FileSplit) context.getInputSplit(); + String colName = fileSplit.getPath().getParent().getName(); + + for (int i = 0; i < uhcColumns.size(); i++) { + if (uhcColumns.get(i).getIdentity().equalsIgnoreCase(colName)) { + index = i; + break; + } + } + type = uhcColumns.get(index).getType(); + + //for debug + logger.info("column name: " + colName); + logger.info("index: " + index); + logger.info("type: " + type); + } + + @Override + public void doMap(NullWritable key, Text value, Context context) throws IOException, InterruptedException { + tmpBuf.clear(); + int size = value.getLength()+ 1; + if (size >= tmpBuf.capacity()) { + tmpBuf = ByteBuffer.allocate(countNewSize(tmpBuf.capacity(), size)); + } + tmpBuf.put(Bytes.toBytes(index)[3]); + tmpBuf.put(value.getBytes(), 0, value.getLength()); + outputKey.set(tmpBuf.array(), 0, tmpBuf.position()); + + sortableKey.init(outputKey, type); + context.write(sortableKey, NullWritable.get()); + } + + private int countNewSize(int oldSize, int dataSize) { + int newSize = oldSize * 2; + while (newSize < dataSize) { + newSize = newSize * 2; + } + return newSize; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java index 0000000,ce9b792..6da198d mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java @@@ -1,0 -1,113 +1,113 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.steps; + ++import static org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer.DICT_FILE_POSTFIX; ++ ++import java.io.DataOutputStream; ++import java.io.IOException; ++import java.util.List; ++ + import org.apache.commons.io.output.ByteArrayOutputStream; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.BytesWritable; + import org.apache.hadoop.io.NullWritable; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.util.Bytes; + import org.apache.kylin.common.util.ClassUtil; + import org.apache.kylin.common.util.Dictionary; + import org.apache.kylin.cube.CubeInstance; + import org.apache.kylin.cube.CubeManager; + import org.apache.kylin.cube.model.CubeDesc; + import org.apache.kylin.dict.DictionaryGenerator; + import org.apache.kylin.dict.DictionaryInfo; + import org.apache.kylin.dict.IDictionaryBuilder; + import org.apache.kylin.engine.mr.KylinReducer; + import org.apache.kylin.engine.mr.common.AbstractHadoopJob; + import org.apache.kylin.engine.mr.common.BatchConstants; + import org.apache.kylin.metadata.model.TblColRef; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.List; - -import static org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer.DICT_FILE_POSTFIX; - + public class UHCDictionaryReducer extends KylinReducer<SelfDefineSortableKey, NullWritable, NullWritable, BytesWritable> { + private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryReducer.class); + + private IDictionaryBuilder builder; + private TblColRef col; + + private MultipleOutputs mos; + + @Override + protected void doSetup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + Configuration conf = context.getConfiguration(); + mos = new MultipleOutputs(context); + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + CubeDesc cubeDesc = cube.getDescriptor(); - List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cubeDesc); ++ List<TblColRef> uhcColumns = cubeDesc.getAllUHCColumns(); + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + col = uhcColumns.get(taskId); + logger.info("column name: " + col.getIdentity()); + + if (cube.getDescriptor().getShardByColumns().contains(col)) { + //for ShardByColumns + builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); + builder.init(null, 0, null); + } else { + //for GlobalDictionaryColumns + String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR); + DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype()); + String builderClass = cubeDesc.getDictionaryBuilderClass(col); + builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass); + builder.init(dictionaryInfo, 0, hdfsDir); + } + } + + @Override + public void doReduce(SelfDefineSortableKey skey, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { + Text key = skey.getText(); + String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + builder.addValue(value); + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + Dictionary<String> dict = builder.build(); + outputDict(col, dict); + } + + private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException { + // output written to baseDir/colName/colName.rldict-r-00000 (etc) + String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX; + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) { + outputStream.writeUTF(dict.getClass().getName()); + dict.write(outputStream); + + mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName); + } + mos.close(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java ---------------------------------------------------------------------- diff --cc engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java index 0000000,0000000..a6bc019 new file mode 100644 --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java @@@ -1,0 -1,0 +1,93 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++*/ ++ ++package org.apache.kylin.engine.mr.steps; ++ ++import org.apache.kylin.common.util.LocalFileMetadataTestCase; ++import org.apache.kylin.cube.CubeInstance; ++import org.apache.kylin.cube.CubeManager; ++import org.apache.kylin.metadata.model.TblColRef; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++ ++/** ++ */ ++public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTestCase { ++ ++ @Before ++ public void setUp() throws Exception { ++ createTestMetadata(); ++ System.setProperty("kylin.engine.mr.uhc-reducer-count", "2"); ++ System.setProperty("kylin.engine.mr.per-reducer-hll-cuboid-number", "1"); ++ System.setProperty("kylin.engine.mr.hll-max-reducer-number", "2"); ++ } ++ ++ @After ++ public void after() throws Exception { ++ cleanupTestMetadata(); ++ System.clearProperty("kylin.engine.mr.uhc-reducer-count"); ++ System.clearProperty("kylin.engine.mr.per-reducer-hll-cuboid-number"); ++ System.clearProperty("kylin.engine.mr.hll-max-reducer-number"); ++ } ++ ++ @Test ++ public void testBasics() { ++ CubeManager mgr = CubeManager.getInstance(getTestConfig()); ++ CubeInstance cube = mgr.getCube("ci_left_join_cube"); ++ TblColRef aUHC = cube.getModel().findColumn("TEST_COUNT_DISTINCT_BITMAP"); ++ ++ FactDistinctColumnsReducerMapping mapping = new FactDistinctColumnsReducerMapping(cube); ++ //System.out.println(mapping.getAllDictCols()); ++ //System.out.println(Arrays.toString(mapping.getAllRolePlaysForReducers())); ++ ++ int totalReducerNum = mapping.getTotalReducerNum(); ++ Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum()); ++ ++ // check partition column reducer & cuboid row count reducers ++ Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER, ++ mapping.getRolePlayOfReducer(totalReducerNum - 1)); ++ Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER, ++ mapping.getRolePlayOfReducer(totalReducerNum - 2)); ++ Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL, ++ mapping.getRolePlayOfReducer(totalReducerNum - 3)); ++ ++ // check all dict column reducers ++ int dictEnd = totalReducerNum - 3; ++ for (int i = 0; i < dictEnd; i++) ++ Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0); ++ ++ // check a UHC dict column ++ Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC)); ++ int uhcReducerBegin = -1; ++ for (int i = 0; i < dictEnd; i++) { ++ if (mapping.getDictColForReducer(i).equals(aUHC)) { ++ uhcReducerBegin = i; ++ break; ++ } ++ } ++ ++ int[] allRolePlay = mapping.getAllRolePlaysForReducers(); ++ Assert.assertEquals(allRolePlay[uhcReducerBegin], allRolePlay[uhcReducerBegin + 1]); ++ for (int i = 0; i < 5; i++) { ++ int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, i); ++ Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= uhcReducerBegin + 1); ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/examples/test_case_data/localmeta/kylin.properties ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --cc examples/test_case_data/sandbox/kylin.properties index a6d4adc,7271e90..13d62a2 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@@ -153,10 -165,9 +165,13 @@@ kylin.web.help.1=odbc|ODBC Driver kylin.web.help.2=tableau|Tableau Guide| kylin.web.help.3=onboard|Cube Design Tutorial| +#allow user to export query result +kylin.web.export-allow-admin=true +kylin.web.export-allow-other=true + + # Hide measures in measure list of cube designer, separate by comma + kylin.web.hide-measures=RAW + ### OTHER ### # kylin query metrics percentiles intervals default=60, 300, 3600 http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITAclTableMigrationToolTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/QueryConnection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 132f373,0e9f4ba..f53189b --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@@ -33,12 -37,17 +37,18 @@@ import org.apache.kylin.common.util.Jso import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; + import org.apache.kylin.cube.cuboid.CuboidScheduler; + import org.apache.kylin.cube.cuboid.TreeCuboidScheduler; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; + import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.dimension.DimensionEncodingFactory; + import org.apache.kylin.engine.EngineFactory; + import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.JoinedFlatTable; + import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.SegmentRange; @@@ -52,7 -63,9 +64,10 @@@ import org.apache.kylin.rest.exception. import org.apache.kylin.rest.request.CubeRequest; import org.apache.kylin.rest.request.JobBuildRequest; import org.apache.kylin.rest.request.JobBuildRequest2; + import org.apache.kylin.rest.request.JobOptimizeRequest; + import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.rest.response.CubeInstanceResponse; + import org.apache.kylin.rest.response.CuboidTreeResponse; import org.apache.kylin.rest.response.EnvelopeResponse; import org.apache.kylin.rest.response.GeneralResponse; import org.apache.kylin.rest.response.HBaseResponse; @@@ -684,6 -787,170 +791,170 @@@ public class CubeController extends Bas } + @RequestMapping(value = "/{cubeName}/cuboids/export", method = RequestMethod.GET) + @ResponseBody + public void cuboidsExport(@PathVariable String cubeName, @RequestParam(value = "top") Integer top, + HttpServletResponse response) throws IOException { + CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); + if (cube == null) { + logger.error("Get cube: [" + cubeName + "] failed when get recommend cuboids"); + throw new BadRequestException("Get cube: [" + cubeName + "] failed when get recommend cuboids"); + } + Map<Long, Long> cuboidList = getRecommendCuboidList(cube); + if (cuboidList == null || cuboidList.isEmpty()) { + logger.warn("Cannot get recommend cuboid list for cube " + cubeName); + } + if (cuboidList.size() < top) { + logger.info("Only recommend " + cuboidList.size() + " cuboids less than topn " + top); + } + Iterator<Long> cuboidIterator = cuboidList.keySet().iterator(); + RowKeyColDesc[] rowKeyColDescList = cube.getDescriptor().getRowkey().getRowKeyColumns(); + + List<Set<String>> dimensionSetList = Lists.newLinkedList(); + while (top-- > 0 && cuboidIterator.hasNext()) { + Set<String> dimensionSet = Sets.newHashSet(); + dimensionSetList.add(dimensionSet); + long cuboid = cuboidIterator.next(); + for (int i = 0; i < rowKeyColDescList.length; i++) { + if ((cuboid & (1L << rowKeyColDescList[i].getBitIndex())) > 0) { + dimensionSet.add(rowKeyColDescList[i].getColumn()); + } + } + } + + response.setContentType("text/json;charset=utf-8"); + response.setHeader("Content-Disposition", "attachment; filename=\"" + cubeName + ".json\""); + try (PrintWriter writer = response.getWriter()) { + writer.write(JsonUtil.writeValueAsString(dimensionSetList)); + } catch (IOException e) { + logger.error("", e); + throw new InternalErrorException("Failed to write: " + e.getLocalizedMessage()); + } + } + + @RequestMapping(value = "/{cubeName}/cuboids/current", method = RequestMethod.GET) + @ResponseBody + public CuboidTreeResponse getCurrentCuboids(@PathVariable String cubeName) { + CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); + if (cube == null) { + logger.error("Get cube: [" + cubeName + "] failed when get current cuboids"); + throw new BadRequestException("Get cube: [" + cubeName + "] failed when get current cuboids"); + } + // The cuboid tree displayed should be consistent with the current one + CuboidScheduler cuboidScheduler = cube.getCuboidScheduler(); + Map<Long, Long> cuboidStatsMap = cube.getCuboids(); + if (cuboidStatsMap == null) { + cuboidStatsMap = CuboidStatsReaderUtil.readCuboidStatsFromCube(cuboidScheduler.getAllCuboidIds(), cube); + } + + Map<Long, Long> hitFrequencyMap = null; + Map<Long, Long> queryMatchMap = null; + try { + hitFrequencyMap = getTargetCuboidHitFrequency(cubeName); + queryMatchMap = getCuboidQueryMatchCount(cubeName); + } catch (Exception e) { + logger.warn("Fail to query on system cube due to " + e); + } + + Set<Long> currentCuboidSet = cube.getCuboidScheduler().getAllCuboidIds(); + return cubeService.getCuboidTreeResponse(cuboidScheduler, cuboidStatsMap, hitFrequencyMap, queryMatchMap, + currentCuboidSet); + } + + @RequestMapping(value = "/{cubeName}/cuboids/recommend", method = RequestMethod.GET) + @ResponseBody + public CuboidTreeResponse getRecommendCuboids(@PathVariable String cubeName) throws IOException { + CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); + if (cube == null) { + logger.error("Get cube: [" + cubeName + "] failed when get recommend cuboids"); + throw new BadRequestException("Get cube: [" + cubeName + "] failed when get recommend cuboids"); + } + Map<Long, Long> recommendCuboidStatsMap = getRecommendCuboidList(cube); + if (recommendCuboidStatsMap == null || recommendCuboidStatsMap.isEmpty()) { + return new CuboidTreeResponse(); + } + CuboidScheduler cuboidScheduler = new TreeCuboidScheduler(cube.getDescriptor(), + Lists.newArrayList(recommendCuboidStatsMap.keySet()), + new TreeCuboidScheduler.CuboidCostComparator(recommendCuboidStatsMap)); + + // Get cuboid target info for displaying heat map of cuboid hit + Map<Long, Long> displayHitFrequencyMap = getTargetCuboidHitFrequency(cubeName); + // Get exactly matched cuboid query count + Map<Long, Long> queryMatchMap = getCuboidQueryMatchCount(cubeName); + + Set<Long> currentCuboidSet = cube.getCuboidScheduler().getAllCuboidIds(); + return cubeService.getCuboidTreeResponse(cuboidScheduler, recommendCuboidStatsMap, displayHitFrequencyMap, + queryMatchMap, currentCuboidSet); + } + + private Map<Long, Long> getRecommendCuboidList(CubeInstance cube) throws IOException { + // Get cuboid source info + Map<Long, Long> optimizeHitFrequencyMap = getSourceCuboidHitFrequency(cube.getName()); + Map<Long, Map<Long, Long>> rollingUpCountSourceMap = getCuboidRollingUpCount(cube.getName()); + return cubeService.getRecommendCuboidStatistics(cube, optimizeHitFrequencyMap, rollingUpCountSourceMap); + } + + private Map<Long, Long> getSourceCuboidHitFrequency(String cubeName) { + return getCuboidHitFrequency(cubeName, true); + } + + private Map<Long, Long> getTargetCuboidHitFrequency(String cubeName) { + return getCuboidHitFrequency(cubeName, false); + } + + private Map<Long, Long> getCuboidHitFrequency(String cubeName, boolean isCuboidSource) { + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT); + String cuboidColumn = QueryCubePropertyEnum.CUBOID_SOURCE.toString(); + if (!isCuboidSource) { + cuboidColumn = QueryCubePropertyEnum.CUBOID_TARGET.toString(); + } + String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(); + String table = cubeService.getMetricsManager() + .getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube()); + String sql = "select " + cuboidColumn + ", sum(" + hitMeasure + ") " // + + "from " + table// + + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" + cubeName + "' " // + + "group by " + cuboidColumn; + sqlRequest.setSql(sql); - List<List<String>> orgHitFrequency = queryService.queryWithoutSecure(sqlRequest).getResults(); ++ List<List<String>> orgHitFrequency = queryService.doQueryWithCache(sqlRequest).getResults(); + return cubeService.formatQueryCount(orgHitFrequency); + } + + private Map<Long, Map<Long, Long>> getCuboidRollingUpCount(String cubeName) { + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT); + String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString(); + String cuboidTarget = QueryCubePropertyEnum.CUBOID_TARGET.toString(); + String aggCount = QueryCubePropertyEnum.AGGR_COUNT.toString(); + String table = cubeService.getMetricsManager() + .getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube()); + String sql = "select " + cuboidSource + ", " + cuboidTarget + ", sum(" + aggCount + ")/count(*) " // + + "from " + table // + + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" + cubeName + "' " // + + "group by " + cuboidSource + ", " + cuboidTarget; + sqlRequest.setSql(sql); - List<List<String>> orgRollingUpCount = queryService.queryWithoutSecure(sqlRequest).getResults(); ++ List<List<String>> orgRollingUpCount = queryService.doQueryWithCache(sqlRequest).getResults(); + return cubeService.formatRollingUpCount(orgRollingUpCount); + } + + private Map<Long, Long> getCuboidQueryMatchCount(String cubeName) { + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT); + String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString(); + String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(); + String table = cubeService.getMetricsManager() + .getSystemTableFromSubject(cubeService.getConfig().getKylinMetricsSubjectQueryCube()); + String sql = "select " + cuboidSource + ", sum(" + hitMeasure + ") " // + + "from " + table // + + " where " + QueryCubePropertyEnum.CUBE.toString() + " = '" + cubeName + "' and " + + QueryCubePropertyEnum.IF_MATCH.toString() + " = true " // + + "group by " + cuboidSource; + sqlRequest.setSql(sql); - List<List<String>> orgMatchHitFrequency = queryService.queryWithoutSecure(sqlRequest).getResults(); ++ List<List<String>> orgMatchHitFrequency = queryService.doQueryWithCache(sqlRequest).getResults(); + return cubeService.formatQueryCount(orgMatchHitFrequency); + } + /** * Initiate the very beginning of a streaming cube. Will seek the latest offests of each partition from streaming * source (kafka) and record in the cube descriptor; In the first build job, it will use these offests as the start point.