This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 276f906 KYLIN-4747 Use the first dimension column as sort column within a partiton 276f906 is described below commit 276f906a35ce9f868b444b0a95c4d57648e07b23 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Fri Sep 4 09:28:10 2020 +0800 KYLIN-4747 Use the first dimension column as sort column within a partiton --- .../apache/kylin/engine/spark/job/NSparkCubingUtil.java | 14 ++++---------- .../org/apache/kylin/engine/spark/job/CubeBuildJob.java | 4 ++-- .../org/apache/kylin/engine/spark/job/CubeMergeJob.java | 13 ++++++++----- .../org/apache/kylin/engine/spark/utils/Repartitioner.java | 4 ++-- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java index 5438896..6a1f9f1 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java @@ -23,8 +23,8 @@ import org.apache.kylin.metadata.model.Segments; import org.apache.spark.sql.Column; import org.spark_project.guava.collect.Sets; +import java.util.Collection; import java.util.LinkedHashSet; -import java.util.List; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -51,7 +51,7 @@ public class NSparkCubingUtil { return getColumns(ret); } - public static Column[] getColumns(Set<Integer> indices) { + public static Column[] getColumns(Collection<Integer> indices) { Column[] ret = new Column[indices.size()]; int index = 0; for (Integer i : indices) { @@ -61,14 +61,8 @@ public class NSparkCubingUtil { return ret; } - public static Column[] getColumns(List<Integer> indices) { - Column[] ret = new Column[indices.size()]; - int index = 0; - for (Integer i : indices) { - ret[index] = new Column(String.valueOf(i)); - index++; - } - return ret; + public static Column getFirstColumn(Collection<Integer> indices) { + return getColumns(indices)[0]; } private static final Pattern DOT_PATTERN = Pattern.compile("(\\S+)\\.(\\D+)"); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index bbf50e8..954434d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -310,7 +310,7 @@ public class CubeBuildJob extends SparkApplication { ss.sparkContext().setJobDescription("build " + layoutEntity.getId() + " from parent " + parentName); Set<Integer> orderedDims = layoutEntity.getOrderedDimensions().keySet(); Dataset<Row> afterSort = afterPrj.select(NSparkCubingUtil.getColumns(orderedDims)) - .sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims)); + .sortWithinPartitions(NSparkCubingUtil.getFirstColumn(orderedDims)); saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId); } else { Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, dimIndexes, cuboid.getOrderedMeasures(), @@ -321,7 +321,7 @@ public class CubeBuildJob extends SparkApplication { Dataset<Row> afterSort = afterAgg .select(NSparkCubingUtil.getColumns(rowKeys, layoutEntity.getOrderedMeasures().keySet())) - .sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys)); + .sortWithinPartitions(NSparkCubingUtil.getFirstColumn(rowKeys)); saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index 3d54492..2772118 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -21,6 +21,7 @@ package org.apache.kylin.engine.spark.job; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.kylin.cube.CubeInstance; @@ -91,17 +92,19 @@ public class CubeMergeJob extends SparkApplication { Dataset<Row> afterSort; if (layout.isTableIndex()) { - afterSort = afterMerge.sortWithinPartitions(NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet())); + afterSort = + afterMerge.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(layout.getOrderedDimensions().keySet())); } else { - Column[] dimsCols = NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet()); - Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge, layout.getOrderedDimensions().keySet(), + Set<Integer> dimColumns = layout.getOrderedDimensions().keySet(); + Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge, dimColumns, layout.getOrderedMeasures(), spanningTree, false); - afterSort = afterAgg.sortWithinPartitions(dimsCols); + afterSort = afterAgg.sortWithinPartitions( + NSparkCubingUtil.getFirstColumn(dimColumns)); } buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { @Override public String getName() { - return "merge-layout-" + layout.getId(); + return "merge-cuboid-" + layout.getId(); } @Override diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java index efaa7d0..453704e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java @@ -153,12 +153,12 @@ public class Repartitioner { //ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", "false"); data = storage.getFrom(tempPath, ss).repartition(repartitionNum, NSparkCubingUtil.getColumns(getShardByColumns())) - .sortWithinPartitions(sortCols); + .sortWithinPartitions(sortCols[0]); } else { // repartition for single file size is too small logger.info("repartition to {}", repartitionNum); data = storage.getFrom(tempPath, ss).repartition(repartitionNum) - .sortWithinPartitions(sortCols); + .sortWithinPartitions(sortCols[0]); } storage.saveTo(path, data, ss);