This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new b428f1e  KYLIN-4893 Optimize query performance when using shard by 
column
b428f1e is described below

commit b428f1e1ab6a48101bec03e515d135c57af32878
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Wed Feb 3 15:20:01 2021 +0800

    KYLIN-4893 Optimize query performance when using shard by column
    
    (cherry picked from commit bd5ab5e61ca4dc0c5ccabb66b17b4be1642ce13d)
    (cherry picked from commit 8fa9d8d210b2755325999ed3e7496a320e3bd7f9)
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 24 +++++++++++---
 .../sql/execution/datasource/FilePruner.scala      | 38 +++++++++++++++-------
 .../datasource/ResetShufflePartition.scala         |  6 ++--
 3 files changed, 50 insertions(+), 18 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4941595..0fd24e2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2883,22 +2883,36 @@ public abstract class KylinConfigBase implements 
Serializable {
 
     private String getLogPropertyFile(String filename) {
         if (isDevEnv()) {
-            return Paths.get(getKylinHomeWithoutWarn(), "build", 
"conf").toString() + File.separator + filename;
+            return Paths.get(getKylinHomeWithoutWarn(),
+                    "build", "conf").toString() + File.separator + filename;
         } else {
-            return Paths.get(getKylinHomeWithoutWarn(), "conf").toString() + 
File.separator + filename;
+            return Paths.get(getKylinHomeWithoutWarn(),
+                    "conf").toString() + File.separator + filename;
         }
     }
 
     public int getQueryPartitionSplitSizeMB() {
-        return 
Integer.parseInt(getOptional("kylin.query.spark-engine.partition-split-size-mb",
 "64"));
+        return 
Integer.parseInt(getOptional("kylin.query.spark-engine.partition-split-size-mb",
+                "64"));
+    }
+
+    /**
+     * The max size in mb handled per task when using shard by column,
+     * if the sharding size exceeds this value, it will fall back to 
non-sharding read RDD
+     */
+    public int getMaxShardingSizeMBPerTask() {
+        return 
Integer.parseInt(getOptional("kylin.query.spark-engine.max-sharding-size-mb",
+                "64"));
     }
 
     public boolean isShardingJoinOptEnabled() {
-        return 
Boolean.parseBoolean(getOptional("kylin.query.spark-engine.expose-sharding-trait",
 "true"));
+        return 
Boolean.parseBoolean(getOptional("kylin.query.spark-engine.expose-sharding-trait",
+                "true"));
     }
 
     public int getSparkSqlShufflePartitions() {
-        return 
Integer.parseInt(getOptional("kylin.query.spark-engine.spark-sql-shuffle-partitions",
 "-1"));
+        return 
Integer.parseInt(getOptional("kylin.query.spark-engine.spark-sql-shuffle-partitions",
+                "-1"));
     }
 
     public Map<String, String> getQuerySparkConf() {
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index f0f7916..513b47f 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasource
 import java.sql.{Date, Timestamp}
 
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.DateFormat
 import org.apache.kylin.cube.cuboid.Cuboid
 import org.apache.kylin.cube.CubeInstance
@@ -169,28 +170,39 @@ class FilePruner(cubeInstance: CubeInstance,
   }
 
   private def genShardSpec(selected: Seq[SegmentDirectory]): Option[ShardSpec] 
= {
-    if (selected.isEmpty) {
+    if (!KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled || 
selected.isEmpty) {
       None
     } else {
       val segments = selected.par.map { segDir =>
         cubeInstance.getSegment(segDir.segmentName, SegmentStatusEnum.READY);
-      }.toIterator.toSeq
+      }.seq
       val shardNum = segments.head.getCuboidShardNum(layoutEntity.getId).toInt
 
       // the shard num of all layout in segments must be the same
       if (layoutEntity.getShardByColumns.isEmpty || segments.exists(
         _.getCuboidShardNum(layoutEntity.getId).toInt != shardNum)) {
-        logInfo("Shard by column is empty or segments have the different 
number of shard, skip " +
-          "shard join.")
+        logInfo("Shard by column is empty or segments have the different 
number of shard, " +
+          "skip shard join.")
         None
       } else {
-        val sortColumns = if (segments.length == 1) {
-          
layoutEntity.getOrderedDimensions.keySet().asScala.map(_.toString).toSeq
+        // calculate the file size for each partition
+        val partitionSizePerId = selected.flatMap(_.files).map( f =>
+          (FilePruner.getPartitionId(f.getPath), f.getLen)
+        ).groupBy(_._1).mapValues(_.map(_._2).sum)
+        // if there are some partition ids which the file size exceeds the 
threshold
+        if (partitionSizePerId.exists(_._2 > 
FilePruner.MAX_SHARDING_SIZE_PER_TASK)) {
+          logInfo(s"There are some partition ids which the file size exceeds 
the " +
+            s"threshold size ${FilePruner.MAX_SHARDING_SIZE_PER_TASK}, skip 
shard join.")
+          None
         } else {
-          logInfo("Sort order will lost in multiple segments.")
-          Seq.empty[String]
+          val sortColumns = if (segments.length == 1) {
+            
layoutEntity.getOrderedDimensions.keySet().asScala.map(_.toString).toSeq
+          } else {
+            logInfo("Sort order will lost in multiple segments.")
+            Seq.empty[String]
+          }
+          Some(ShardSpec(shardNum, shardBySchema.fieldNames.toSeq, 
sortColumns))
         }
-        Some(ShardSpec(shardNum, shardBySchema.fieldNames.toSeq, sortColumns))
       }
     }
   }
@@ -245,8 +257,8 @@ class FilePruner(cubeInstance: CubeInstance,
     // generate the ShardSpec
     shardSpec = genShardSpec(selected)
     //    QueryContextFacade.current().record("shard_pruning")
-    val totalFileSize = selected.flatMap(partition => 
partition.files).map(_.getLen).sum
-    logInfo(s"totalFileSize is ${totalFileSize}")
+    val totalFileSize = selected.flatMap(_.files).map(_.getLen).sum
+    logInfo(s"After files pruning, total file size is ${totalFileSize}")
     setShufflePartitions(totalFileSize, session)
     logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble / 
1000000} ms")
     if (selected.isEmpty) {
@@ -434,6 +446,10 @@ class FilePruner(cubeInstance: CubeInstance,
 }
 
 object FilePruner {
+
+  val MAX_SHARDING_SIZE_PER_TASK: Long = KylinConfig.getInstanceFromEnv
+    .getMaxShardingSizeMBPerTask * 1024 * 1024
+
   def getPartitionId(p: Path): Int = {
     // path like: 
part-00001-91f13932-3d5e-4f85-9a56-d1e2b47d0ccb-c000.snappy.parquet
     // we need to get 00001.
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index b048283..1549c45 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.utils.SparderUtils
 
 trait ResetShufflePartition extends Logging {
-  val PARTITION_SPLIT_BYTES: Long = 
KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 // 
64MB
+  val PARTITION_SPLIT_BYTES: Long =
+    KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 
// 64MB
 
   def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = {
     QueryContextFacade.current().addAndGetSourceScanBytes(bytes)
@@ -37,6 +38,7 @@ trait ResetShufflePartition extends Logging {
     }
     // when hitting cube, this will override the value of 
'spark.sql.shuffle.partitions'
     sparkSession.conf.set("spark.sql.shuffle.partitions", 
partitionsNum.toString)
-    logInfo(s"Set partition to $partitionsNum, total bytes 
${QueryContextFacade.current().getSourceScanBytes}")
+    logInfo(s"Set partition to $partitionsNum, " +
+      s"total bytes ${QueryContextFacade.current().getSourceScanBytes}")
   }
 }

Reply via email to