This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5d1ed563bb1c9cf1dd8185335f8ed14cce6a19ac
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Fri Dec 9 13:58:32 2022 +0800

    KYLIN-5441 fix reset spark.sql.shuffle.partitions
    
    * KE-39271 fix reset spark.sql.shuffle.partitions
---
 .../java/org/apache/kylin/common/QueryContext.java |  3 +
 .../datasource/ResetShufflePartition.scala         | 13 +++-
 .../datasource/ResetShufflePartitionSuite.scala    | 82 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 2 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 0c717c11bc..551b66d90f 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -79,6 +79,9 @@ public class QueryContext implements Closeable {
     private int shufflePartitions;
     @Getter
     @Setter
+    private int shufflePartitionsReset;
+    @Getter
+    @Setter
     // Spark execution ID
     private String executionID = "";
     @Getter
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index 24c6c9cf56..04cc4804f5 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasource
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
 
 trait ResetShufflePartition extends Logging {
 
@@ -35,7 +36,15 @@ trait ResetShufflePartition extends Logging {
         KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 
1024) + 1,
         defaultParallelism).toInt
     }
-    
sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", 
partitionsNum.toString)
-    logInfo(s"Set partition to $partitionsNum, total bytes 
${QueryContext.current().getMetrics.getSourceScanBytes}")
+    val originPartitionsNum = QueryContext.current().getShufflePartitionsReset
+    if (partitionsNum > originPartitionsNum) {
+      
sparkSession.sessionState.conf.setLocalProperty(SQLConf.SHUFFLE_PARTITIONS.key, 
partitionsNum.toString)
+      QueryContext.current().setShufflePartitionsReset(partitionsNum)
+      logInfo(s"Set partition from $originPartitionsNum to $partitionsNum, " +
+        s"total bytes ${QueryContext.current().getMetrics.getSourceScanBytes}")
+    } else {
+      logInfo(s"Origin partition is $originPartitionsNum, new partition is 
$partitionsNum, total bytes " +
+        s"${QueryContext.current().getMetrics.getSourceScanBytes}, will not 
reset the ${SQLConf.SHUFFLE_PARTITIONS.key}")
+    }
   }
 }
diff --git 
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartitionSuite.scala
 
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartitionSuite.scala
new file mode 100644
index 0000000000..3477c43b52
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartitionSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasource
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.common.{LocalMetadata, SparderBaseFunSuite}
+import org.apache.spark.sql.internal.SQLConf
+
+class ResetShufflePartitionSuite extends SparderBaseFunSuite with 
LocalMetadata {
+
+  private val testResetShufflePartition = new ResetShufflePartition {}
+
+  test("KE-39271: test shuffle partition reset without 
kylin.query.engine.spark-sql-shuffle-partitions") {
+    overwriteSystemProp("kylin.storage.columnar.partition-split-size-mb", "1")
+    val workerThread = 10
+    val sparkSession = SparkSession.builder()
+      .master(s"local[$workerThread]")
+      .config("spark.sql.shuffle.partitions", workerThread.toString)
+      .getOrCreate()
+
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== workerThread)
+
+    val sourceRows = 0
+    var totalFileSize = 1024 * 1024 * 5
+    testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, 
sparkSession)
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== 6)
+
+    totalFileSize = 1024 * 1024 * 6
+    testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, 
sparkSession)
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== 7)
+
+    totalFileSize = 1024 * 1024 * 3
+    testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, 
sparkSession)
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== 7)
+
+    totalFileSize = 1024 * 1024 * 12
+    testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, 
sparkSession)
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== workerThread)
+  }
+
+  test("KE-39271: test shuffle partition reset with 
kylin.query.engine.spark-sql-shuffle-partitions") {
+    overwriteSystemProp("kylin.storage.columnar.partition-split-size-mb", "1")
+    overwriteSystemProp("kylin.query.engine.spark-sql-shuffle-partitions", 
"100")
+
+    val workerThread = 10
+    val sparkSession = SparkSession.builder()
+      .master(s"local[$workerThread]")
+      .config("spark.sql.shuffle.partitions", workerThread.toString)
+      .getOrCreate()
+
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== workerThread)
+
+    val sourceRows = 0
+    var totalFileSize = 1024 * 1024 * 5
+    testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, 
sparkSession)
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== 100)
+
+    totalFileSize = 1024 * 1024 * 15
+    testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, 
sparkSession)
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== 100)
+
+    totalFileSize = 1024 * 1024 * 105
+    testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, 
sparkSession)
+    assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) 
== 100)
+  }
+}

Reply via email to