This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5d1ed563bb1c9cf1dd8185335f8ed14cce6a19ac Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Fri Dec 9 13:58:32 2022 +0800 KYLIN-5441 fix reset spark.sql.shuffle.partitions * KE-39271 fix reset spark.sql.shuffle.partitions --- .../java/org/apache/kylin/common/QueryContext.java | 3 + .../datasource/ResetShufflePartition.scala | 13 +++- .../datasource/ResetShufflePartitionSuite.scala | 82 ++++++++++++++++++++++ 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 0c717c11bc..551b66d90f 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -79,6 +79,9 @@ public class QueryContext implements Closeable { private int shufflePartitions; @Getter @Setter + private int shufflePartitionsReset; + @Getter + @Setter // Spark execution ID private String executionID = ""; @Getter diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala index 24c6c9cf56..04cc4804f5 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasource import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf trait ResetShufflePartition extends Logging { @@ -35,7 +36,15 @@ trait ResetShufflePartition extends Logging { KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024) + 1, defaultParallelism).toInt } - sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", partitionsNum.toString) - logInfo(s"Set partition to $partitionsNum, total bytes ${QueryContext.current().getMetrics.getSourceScanBytes}") + val originPartitionsNum = QueryContext.current().getShufflePartitionsReset + if (partitionsNum > originPartitionsNum) { + sparkSession.sessionState.conf.setLocalProperty(SQLConf.SHUFFLE_PARTITIONS.key, partitionsNum.toString) + QueryContext.current().setShufflePartitionsReset(partitionsNum) + logInfo(s"Set partition from $originPartitionsNum to $partitionsNum, " + + s"total bytes ${QueryContext.current().getMetrics.getSourceScanBytes}") + } else { + logInfo(s"Origin partition is $originPartitionsNum, new partition is $partitionsNum, total bytes " + + s"${QueryContext.current().getMetrics.getSourceScanBytes}, will not reset the ${SQLConf.SHUFFLE_PARTITIONS.key}") + } } } diff --git a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartitionSuite.scala b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartitionSuite.scala new file mode 100644 index 0000000000..3477c43b52 --- /dev/null +++ b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartitionSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasource + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.common.{LocalMetadata, SparderBaseFunSuite} +import org.apache.spark.sql.internal.SQLConf + +class ResetShufflePartitionSuite extends SparderBaseFunSuite with LocalMetadata { + + private val testResetShufflePartition = new ResetShufflePartition {} + + test("KE-39271: test shuffle partition reset without kylin.query.engine.spark-sql-shuffle-partitions") { + overwriteSystemProp("kylin.storage.columnar.partition-split-size-mb", "1") + val workerThread = 10 + val sparkSession = SparkSession.builder() + .master(s"local[$workerThread]") + .config("spark.sql.shuffle.partitions", workerThread.toString) + .getOrCreate() + + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == workerThread) + + val sourceRows = 0 + var totalFileSize = 1024 * 1024 * 5 + testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, sparkSession) + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == 6) + + totalFileSize = 1024 * 1024 * 6 + testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, sparkSession) + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == 7) + + totalFileSize = 1024 * 1024 * 3 + testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, sparkSession) + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == 7) + + totalFileSize = 1024 * 1024 * 12 + testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, sparkSession) + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == workerThread) + } + + test("KE-39271: test shuffle partition reset with kylin.query.engine.spark-sql-shuffle-partitions") { + overwriteSystemProp("kylin.storage.columnar.partition-split-size-mb", "1") + overwriteSystemProp("kylin.query.engine.spark-sql-shuffle-partitions", "100") + + val workerThread = 10 + val sparkSession = SparkSession.builder() + .master(s"local[$workerThread]") + .config("spark.sql.shuffle.partitions", workerThread.toString) + .getOrCreate() + + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == workerThread) + + val sourceRows = 0 + var totalFileSize = 1024 * 1024 * 5 + testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, sparkSession) + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == 100) + + totalFileSize = 1024 * 1024 * 15 + testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, sparkSession) + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == 100) + + totalFileSize = 1024 * 1024 * 105 + testResetShufflePartition.setShufflePartitions(totalFileSize, sourceRows, sparkSession) + assert(sparkSession.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) == 100) + } +}