This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 0da41e2 KYLIN-4746 Improve build performance by reducing the count of calling 'count()' function 0da41e2 is described below commit 0da41e20a652794d9328c819977754ec8c4f9941 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Thu Sep 3 23:19:04 2020 +0800 KYLIN-4746 Improve build performance by reducing the count of calling 'count()' function --- .../kylin/engine/spark/job/CubeBuildJob.java | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 3a44d84..bbf50e8 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -77,7 +77,8 @@ public class CubeBuildJob extends SparkApplication { private CubeManager cubeManager; private CubeInstance cubeInstance; private BuildLayoutWithUpdate buildLayoutWithUpdate; - private Map<Long, Short> cuboidShardNum = Maps.newHashMap(); + private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap(); + private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap(); public static void main(String[] args) { CubeBuildJob cubeBuildJob = new CubeBuildJob(); cubeBuildJob.execute(args); @@ -217,7 +218,10 @@ public class CubeBuildJob extends SparkApplication { cuboidsNumInLayer += toBuildCuboids.size(); Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty."); Dataset<Row> parentDS = info.getParentDS(); - long parentDSCnt = parentDS.count(); + // record the source count of flat table + if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) { + cuboidsRowCount.putIfAbsent(info.getLayoutId(), parentDS.count()); + } for (LayoutEntity index : toBuildCuboids) { Preconditions.checkNotNull(parentDS, "Parent dataset is null when building."); @@ -229,8 +233,7 @@ public class CubeBuildJob extends SparkApplication { @Override public LayoutEntity build() throws IOException { - return buildCuboid(seg, index, parentDS, st, info.getLayoutId(), - parentDSCnt); + return buildCuboid(seg, index, parentDS, st, info.getLayoutId()); } }, config); allIndexesInCurrentLayer.add(index); @@ -292,7 +295,7 @@ public class CubeBuildJob extends SparkApplication { } private LayoutEntity buildCuboid(SegmentInfo seg, LayoutEntity cuboid, Dataset<Row> parent, - SpanningTree spanningTree, long parentId, long parentDSCnt) throws IOException { + SpanningTree spanningTree, long parentId) throws IOException { String parentName = String.valueOf(parentId); if (parentId == ParentSourceChooser.FLAT_TABLE_FLAG()) { parentName = "flat table"; @@ -308,7 +311,7 @@ public class CubeBuildJob extends SparkApplication { Set<Integer> orderedDims = layoutEntity.getOrderedDimensions().keySet(); Dataset<Row> afterSort = afterPrj.select(NSparkCubingUtil.getColumns(orderedDims)) .sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims)); - saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt); + saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId); } else { Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, dimIndexes, cuboid.getOrderedMeasures(), spanningTree, false); @@ -320,7 +323,7 @@ public class CubeBuildJob extends SparkApplication { .select(NSparkCubingUtil.getColumns(rowKeys, layoutEntity.getOrderedMeasures().keySet())) .sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys)); - saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt); + saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId); } ss.sparkContext().setJobDescription(null); logger.info("Finished Build index :{}, in segment:{}", cuboid.getId(), seg.id()); @@ -328,7 +331,7 @@ public class CubeBuildJob extends SparkApplication { } private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, LayoutEntity layout, - long parentDSCnt) throws IOException { + long parentId) throws IOException { long layoutId = layout.getId(); // for spark metrics @@ -349,8 +352,11 @@ public class CubeBuildJob extends SparkApplication { if (rowCount == -1) { infos.recordAbnormalLayouts(layoutId, "'Job metrics seems null, use count() to collect cuboid rows.'"); logger.debug("Can not get cuboid row cnt, use count() to collect cuboid rows."); - layout.setRows(dataset.count()); - layout.setSourceRows(parentDSCnt); + long cuboidRowCnt = dataset.count(); + layout.setRows(cuboidRowCnt); + // record the row count of cuboid + cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt); + layout.setSourceRows(cuboidsRowCount.get(parentId)); } else { layout.setRows(rowCount); layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));