KYLIN-2736 code refine
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/79374047 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/79374047 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/79374047 Branch: refs/heads/master Commit: 79374047d3bcd8468bee9ac56f68f4f191edad7b Parents: fd59b51 Author: lidongsjtu <[email protected]> Authored: Wed Dec 13 16:44:44 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 13 16:44:44 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 6 ++-- .../mr/steps/FactDistinctColumnsMapper.java | 30 +++++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/79374047/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 4ccf1d1..d59c98d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1027,12 +1027,12 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000")); } - public int getCuboidStatisticsCalculatorMaxNumber() { - // default multi-thread statistics calculation is disabled + public int getCuboidStatsCalculatorMaxNumber() { + // set 1 to disable multi-thread statistics calculation return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1")); } - public int getCuboidNumberPerStatisticsCalculator() { + public int getCuboidNumberPerStatsCalculator() { return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/79374047/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 272894f..575c7b3 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -46,8 +46,6 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; - - /** */ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { @@ -84,7 +82,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB tmpbuf = ByteBuffer.allocate(4096); collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); if (collectStatistics) { - samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + samplingPercentage = Integer + .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds()); @@ -101,7 +100,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); } - TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); if (partitionColRef != null) { partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); @@ -121,7 +119,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion()); } else { isUsePutRowKeyToHllNewAlgorithm = true; - logger.info("Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", cubeDesc.getVersion()); + logger.info( + "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518", + cubeDesc.getVersion()); } int calculatorNum = getStatsThreadNum(cuboidIds.length); @@ -135,7 +135,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB Integer[][] cuboidsBitSetSplit; Long[] cuboidIdSplit; int start = i * splitSize; - if (start > cuboidIds.length) { + if (start >= cuboidIds.length) { break; } int end = (i + 1) * splitSize; @@ -156,14 +156,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } private int getStatsThreadNum(int cuboidNum) { - int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatisticsCalculator(); + int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatsCalculator(); if (unitNum <= 0) { - logger.warn("config from getCuboidNumberPerStatisticsCalculator() " + unitNum + " is should larger than 0"); + logger.warn("config from getCuboidNumberPerStatsCalculator() " + unitNum + " is should larger than 0"); logger.info("Will use single thread for cuboid statistics calculation"); return 1; } - int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatisticsCalculatorMaxNumber(); + int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber(); int calculatorNum = (cuboidNum - 1) / unitNum + 1; if (calculatorNum > maxCalculatorNum) { calculatorNum = maxCalculatorNum; @@ -175,7 +175,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); - for (String[] row: rowCollection) { + for (String[] row : rowCollection) { context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); for (int i = 0; i < dictCols.size(); i++) { String fieldValue = row[dictionaryColumnIndex[i]]; @@ -188,7 +188,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB reducerIndex = columnIndexToReducerBeginId.get(i); } else { //for the uhc - reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; + reducerIndex = columnIndexToReducerBeginId.get(i) + + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; } tmpbuf.clear(); @@ -207,7 +208,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB // log a few rows for troubleshooting if (rowCount < 10) { - logger.info("Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); + logger.info( + "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); } } @@ -258,7 +260,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); // output each cuboid's hll to reducer, key is 0 - cuboidId for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { - cuboidStatCalculator.waitComplete(); + cuboidStatCalculator.waitForCompletion(); } for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { Long[] cuboidIds = cuboidStatCalculator.getCuboidIds(); @@ -338,7 +340,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } } - public void waitComplete() { + public void waitForCompletion() { stop = true; try { workThread.join();
