This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 276f906  KYLIN-4747 Use the first dimension column as sort column 
within a partiton
276f906 is described below

commit 276f906a35ce9f868b444b0a95c4d57648e07b23
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Fri Sep 4 09:28:10 2020 +0800

    KYLIN-4747 Use the first dimension column as sort column within a partiton
---
 .../apache/kylin/engine/spark/job/NSparkCubingUtil.java    | 14 ++++----------
 .../org/apache/kylin/engine/spark/job/CubeBuildJob.java    |  4 ++--
 .../org/apache/kylin/engine/spark/job/CubeMergeJob.java    | 13 ++++++++-----
 .../org/apache/kylin/engine/spark/utils/Repartitioner.java |  4 ++--
 4 files changed, 16 insertions(+), 19 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
index 5438896..6a1f9f1 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
@@ -23,8 +23,8 @@ import org.apache.kylin.metadata.model.Segments;
 import org.apache.spark.sql.Column;
 import org.spark_project.guava.collect.Sets;
 
+import java.util.Collection;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -51,7 +51,7 @@ public class NSparkCubingUtil {
         return getColumns(ret);
     }
 
-    public static Column[] getColumns(Set<Integer> indices) {
+    public static Column[] getColumns(Collection<Integer> indices) {
         Column[] ret = new Column[indices.size()];
         int index = 0;
         for (Integer i : indices) {
@@ -61,14 +61,8 @@ public class NSparkCubingUtil {
         return ret;
     }
 
-    public static Column[] getColumns(List<Integer> indices) {
-        Column[] ret = new Column[indices.size()];
-        int index = 0;
-        for (Integer i : indices) {
-            ret[index] = new Column(String.valueOf(i));
-            index++;
-        }
-        return ret;
+    public static Column getFirstColumn(Collection<Integer> indices) {
+        return getColumns(indices)[0];
     }
 
     private static final Pattern DOT_PATTERN = 
Pattern.compile("(\\S+)\\.(\\D+)");
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index bbf50e8..954434d 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -310,7 +310,7 @@ public class CubeBuildJob extends SparkApplication {
             ss.sparkContext().setJobDescription("build " + 
layoutEntity.getId() + " from parent " + parentName);
             Set<Integer> orderedDims = 
layoutEntity.getOrderedDimensions().keySet();
             Dataset<Row> afterSort = 
afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
-                    
.sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
+                    
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(orderedDims));
             saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         } else {
             Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, 
dimIndexes, cuboid.getOrderedMeasures(),
@@ -321,7 +321,7 @@ public class CubeBuildJob extends SparkApplication {
 
             Dataset<Row> afterSort = afterAgg
                     .select(NSparkCubingUtil.getColumns(rowKeys, 
layoutEntity.getOrderedMeasures().keySet()))
-                    
.sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
+                    
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(rowKeys));
 
             saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 3d54492..2772118 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.spark.job;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.kylin.cube.CubeInstance;
@@ -91,17 +92,19 @@ public class CubeMergeJob extends SparkApplication {
 
             Dataset<Row> afterSort;
             if (layout.isTableIndex()) {
-                afterSort = 
afterMerge.sortWithinPartitions(NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet()));
+                afterSort =
+                        
afterMerge.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(layout.getOrderedDimensions().keySet()));
             } else {
-                Column[] dimsCols = 
NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet());
-                Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge, 
layout.getOrderedDimensions().keySet(),
+                Set<Integer> dimColumns = 
layout.getOrderedDimensions().keySet();
+                Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge, 
dimColumns,
                         layout.getOrderedMeasures(), spanningTree, false);
-                afterSort = afterAgg.sortWithinPartitions(dimsCols);
+                afterSort = afterAgg.sortWithinPartitions(
+                        NSparkCubingUtil.getFirstColumn(dimColumns));
             }
             buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() 
{
                 @Override
                 public String getName() {
-                    return "merge-layout-" + layout.getId();
+                    return "merge-cuboid-" + layout.getId();
                 }
 
                 @Override
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index efaa7d0..453704e 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -153,12 +153,12 @@ public class Repartitioner {
                 
//ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", 
"false");
                 data = storage.getFrom(tempPath, 
ss).repartition(repartitionNum,
                         NSparkCubingUtil.getColumns(getShardByColumns()))
-                        .sortWithinPartitions(sortCols);
+                        .sortWithinPartitions(sortCols[0]);
             } else {
                 // repartition for single file size is too small
                 logger.info("repartition to {}", repartitionNum);
                 data = storage.getFrom(tempPath, 
ss).repartition(repartitionNum)
-                        .sortWithinPartitions(sortCols);
+                        .sortWithinPartitions(sortCols[0]);
             }
 
             storage.saveTo(path, data, ss);

Reply via email to