This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new a979a49 KYLIN-4185: optimize CuboidSizeMap by using historical segments a979a49 is described below commit a979a49932ef0b4bcd48d518559f6df6840e6a22 Author: Zhou Kang <zhouka...@xiaomi.com> AuthorDate: Wed Dec 18 21:12:47 2019 +0800 KYLIN-4185: optimize CuboidSizeMap by using historical segments --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../java/org/apache/kylin/cube/CubeManager.java | 1 + .../java/org/apache/kylin/cube/CubeSegment.java | 20 +++++ .../java/org/apache/kylin/engine/mr/CubingJob.java | 66 +++++++++++++++ .../kylin/engine/mr/common/CubeStatsReader.java | 99 +++++++++++++++++++++- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 5 ++ .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 5 ++ 7 files changed, 199 insertions(+), 1 deletion(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 44629e6..a65058c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -613,6 +613,10 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.cube.segment-advisor", "org.apache.kylin.cube.CubeSegmentAdvisor"); } + public boolean enableJobCuboidSizeOptimize() { + return Boolean.parseBoolean(getOptional("kylin.cube.size-estimate-enable-optimize", "false")); + } + public double getJobCuboidSizeRatio() { return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25")); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 189f738..d057982 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -837,6 +837,7 @@ public class CubeManager implements IRealizationProvider { } CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange); + newSegment.setMerged(true); Segments<CubeSegment> mergingSegments = cubeCopy.getMergingSegments(newSegment); if (mergingSegments.size() <= 1) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 178fd39..70fb501 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -82,6 +82,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { private SegmentStatusEnum status; @JsonProperty("size_kb") private long sizeKB; + @JsonProperty("is_merged") + private boolean isMerged; + @JsonProperty("estimate_ratio") + private List<Double> estimateRatio; @JsonProperty("input_records") private long inputRecords; @JsonProperty("input_records_size") @@ -224,6 +228,22 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { this.sizeKB = sizeKB; } + public boolean isMerged() { + return isMerged; + } + + public void setMerged(boolean isMerged) { + this.isMerged = isMerged; + } + + public List<Double> getEstimateRatio() { + return estimateRatio; + } + + public void setEstimateRatio(List<Double> estimateRatio) { + this.estimateRatio = estimateRatio; + } + public long getInputRecords() { return inputRecords; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 568392e..5456d55 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -18,6 +18,7 @@ package org.apache.kylin.engine.mr; +import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @@ -26,12 +27,18 @@ import java.util.Map; import java.util.TimeZone; import java.util.regex.Matcher; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.constant.ExecutableConstants; @@ -361,4 +368,63 @@ public class CubingJob extends DefaultChainedExecutable { return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0")); } + public List<Double> findEstimateRatio(CubeSegment seg, KylinConfig config) { + CubeInstance cubeInstance = seg.getCubeInstance(); + CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler(); + List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer(); + int totalLevels = cuboidScheduler.getBuildLevel(); + + List<Double> result = Lists.newArrayList(); + + Map<Long, Double> estimatedSizeMap; + + String cuboidRootPath = getCuboidRootPath(seg, config); + + try { + estimatedSizeMap = new CubeStatsReader(seg, config).getCuboidSizeMap(true); + } catch (IOException e) { + logger.warn("Cannot get segment {} estimated size map", seg.getName()); + + return null; + } + + for (int level = 0; level <= totalLevels; level++) { + double levelEstimatedSize = 0; + for (Long cuboidId : layeredCuboids.get(level)) { + levelEstimatedSize += estimatedSizeMap.get(cuboidId) == null ? 0.0 : estimatedSizeMap.get(cuboidId); + } + + double levelRealSize = getRealSizeByLevel(cuboidRootPath, level); + + if (levelEstimatedSize == 0.0 || levelRealSize == 0.0){ + result.add(level, -1.0); + } else { + result.add(level, levelRealSize / levelEstimatedSize); + } + } + + return result; + } + + + private double getRealSizeByLevel(String rootPath, int level) { + try { + String levelPath = JobBuilderSupport.getCuboidOutputPathsByLevel(rootPath, level); + FileSystem fs = HadoopUtil.getFileSystem(levelPath); + return fs.getContentSummary(new Path(levelPath)).getLength() / (1024L * 1024L); + } catch (Exception e) { + logger.warn("get level real size failed." + e); + return 0L; + } + } + + private String getCuboidRootPath(CubeSegment seg, KylinConfig kylinConfig) { + String rootDir = kylinConfig.getHdfsWorkingDirectory(); + if (!rootDir.endsWith("/")) { + rootDir = rootDir + "/"; + } + String jobID = this.getId(); + return rootDir + "kylin-" + jobID + "/" + seg.getRealization().getName() + "/cuboid/"; + } + } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index e935173..3c93d05 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -64,6 +64,7 @@ import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,7 +172,11 @@ public class CubeStatsReader { // return map of Cuboid ID => MB public Map<Long, Double> getCuboidSizeMap() { - return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount); + return getCuboidSizeMap(false); + } + + public Map<Long, Double> getCuboidSizeMap(boolean origin) { + return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount, origin); } public double estimateCubeSize() { @@ -199,6 +204,11 @@ public class CubeStatsReader { public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap, long sourceRowCount) { + return getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap, sourceRowCount, true); + } + + private static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap, + long sourceRowCount, boolean origin) { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); final List<Integer> rowkeyColumnSize = Lists.newArrayList(); final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); @@ -215,9 +225,96 @@ public class CubeStatsReader { sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount)); } + + if (origin == false && cubeSegment.getConfig().enableJobCuboidSizeOptimize()) { + optimizeSizeMap(sizeMap, cubeSegment); + } + return sizeMap; } + private static Double harmonicMean(List<Double> data) { + if (data == null || data.size() == 0) { + return 1.0; + } + Double sum = 0.0; + for (Double item : data) { + sum += 1.0 / item; + } + return data.size() / sum; + } + + private static List<Double> getHistoricalRating(CubeSegment cubeSegment, + CubeInstance cubeInstance, + int totalLevels) { + boolean isMerged = cubeSegment.isMerged(); + + Map<Integer, List<Double>> layerRatio = Maps.newHashMap(); + List<Double> result = Lists.newArrayList(); + + for (CubeSegment seg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { + if (seg.isMerged() != isMerged || seg.getEstimateRatio() == null) { + continue; + } + + logger.info("get ratio from {} with: {}", seg.getName(), StringUtils.join(seg.getEstimateRatio(), ",")); + + for(int level = 0; level <= totalLevels; level++) { + if (seg.getEstimateRatio().get(level) <= 0) { + continue; + } + + List<Double> temp = layerRatio.get(level) == null ? Lists.newArrayList() : layerRatio.get(level); + + temp.add(seg.getEstimateRatio().get(level)); + layerRatio.put(level, temp); + } + } + + if (layerRatio.size() == 0) { + logger.info("Fail to get historical rating."); + return null; + } else { + for(int level = 0; level <= totalLevels; level++) { + logger.debug("level {}: {}", level, StringUtils.join(layerRatio.get(level), ",")); + result.add(level, harmonicMean(layerRatio.get(level))); + } + + logger.info("Finally estimate ratio is {}", StringUtils.join(result, ",")); + + return result; + } + } + + private static void optimizeSizeMap(Map<Long, Double> sizeMap, CubeSegment cubeSegment) { + CubeInstance cubeInstance = cubeSegment.getCubeInstance(); + int totalLevels = cubeInstance.getCuboidScheduler().getBuildLevel(); + List<List<Long>> layeredCuboids = cubeInstance.getCuboidScheduler().getCuboidsByLayer(); + + logger.info("cube size is {} before optimize", SumHelper.sumDouble(sizeMap.values())); + + List<Double> levelRating = getHistoricalRating(cubeSegment, cubeInstance, totalLevels); + + if (levelRating == null) { + logger.info("Fail to optimize, use origin."); + return; + } + + for (int level = 0; level <= totalLevels; level++) { + Double rate = levelRating.get(level); + + for (Long cuboidId : layeredCuboids.get(level)) { + double oriValue = (sizeMap.get(cuboidId) == null ? 0.0 : sizeMap.get(cuboidId)); + sizeMap.put(cuboidId, oriValue * rate); + } + } + + logger.info("cube size is {} after optimize", SumHelper.sumDouble(sizeMap.values())); + + return; + } + + /** * Estimate the cuboid's size * diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index c2031da..2f13fdb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; @@ -76,11 +77,15 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { long sourceSizeBytes = cubingJob.findSourceSizeBytes(); long cubeSizeBytes = cubingJob.findCubeSizeBytes(); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(segment, config); + segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); segment.setLastBuildTime(System.currentTimeMillis()); segment.setSizeKB(cubeSizeBytes / 1024); segment.setInputRecords(sourceCount); segment.setInputRecordsSize(sourceSizeBytes); + segment.setEstimateRatio(cuboidEstimateRatio); try { saveExtSnapshotIfNeeded(cubeManager, cube, segment); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index e7b127e..0a8cd1e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -92,6 +93,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { } } + KylinConfig config = KylinConfig.getInstanceFromEnv(); + List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(mergedSegment, config); + // update segment info mergedSegment.setSizeKB(cubeSizeBytes / 1024); mergedSegment.setInputRecords(sourceCount); @@ -100,6 +104,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { mergedSegment.setLastBuildTime(System.currentTimeMillis()); mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap); mergedSegment.setStreamSourceCheckpoint(lastMergedSegment != null ? lastMergedSegment.getStreamSourceCheckpoint() : null); + mergedSegment.setEstimateRatio(cuboidEstimateRatio); if (isOffsetCube) { SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);