This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 291ff35722 [GLUTEN-8018][VL] Support adjusting stage resource profile
dynamically (#8209)
291ff35722 is described below
commit 291ff35722430ebcab98f957a3daa0b787d111d6
Author: Terry Wang <[email protected]>
AuthorDate: Fri Jan 24 19:35:54 2025 +0800
[GLUTEN-8018][VL] Support adjusting stage resource profile dynamically
(#8209)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +
.../AutoAdjustStageResourceProfileSuite.scala | 163 ++++++++++++++++++++
.../GlutenAutoAdjustStageResourceProfile.scala | 167 +++++++++++++++++++++
.../org/apache/gluten/config/GlutenConfig.scala | 29 ++++
4 files changed, 361 insertions(+)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 14227dd7d0..acf50383e2 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -113,6 +113,7 @@ object VeloxRuleApi {
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
+ injector.injectFinal(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}
@@ -187,6 +188,7 @@ object VeloxRuleApi {
injector.injectPostTransform(c =>
ColumnarCollapseTransformStages(c.glutenConf))
injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
injector.injectPostTransform(c =>
RemoveGlutenTableCacheColumnarToRow(c.session))
+ injector.injectPostTransform(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf,
c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala
new file mode 100644
index 0000000000..a6b25e5d64
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.{ApplyResourceProfileExec,
ColumnarShuffleExchangeExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+
+@Experimental
+class AutoAdjustStageResourceProfileSuite
+ extends VeloxWholeStageTransformerSuite
+ with AdaptiveSparkPlanHelper {
+ protected val rootPath: String = getClass.getResource("/").getPath
+ override protected val resourcePath: String = "/tpch-data-parquet"
+ override protected val fileFormat: String = "parquet"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.memory.offHeap.size", "2g")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.gluten.auto.adjustStageResource.enabled", "true")
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ spark
+ .range(100)
+ .selectExpr("cast(id % 3 as int) as c1", "id as c2")
+ .write
+ .format("parquet")
+ .saveAsTable("tmp1")
+ spark
+ .range(100)
+ .selectExpr("cast(id % 9 as int) as c1")
+ .write
+ .format("parquet")
+ .saveAsTable("tmp2")
+ spark
+ .range(100)
+ .selectExpr("cast(id % 3 as int) as c1", "cast(id % 9 as int) as c2")
+ .write
+ .format("parquet")
+ .saveAsTable("tmp3")
+ }
+
+ override protected def afterAll(): Unit = {
+ spark.sql("drop table tmp1")
+ spark.sql("drop table tmp2")
+ spark.sql("drop table tmp3")
+
+ super.afterAll()
+ }
+
+ private def collectColumnarToRow(plan: SparkPlan): Int = {
+ collect(plan) { case v: VeloxColumnarToRowExec => v }.size
+ }
+
+ private def collectColumnarShuffleExchange(plan: SparkPlan): Int = {
+ collect(plan) { case c: ColumnarShuffleExchangeExec => c }.size
+ }
+
+ private def collectShuffleExchange(plan: SparkPlan): Int = {
+ collect(plan) { case c: ShuffleExchangeExec => c }.size
+ }
+
+ private def collectApplyResourceProfileExec(plan: SparkPlan): Int = {
+ collect(plan) { case c: ApplyResourceProfileExec => c }.size
+ }
+
+ test("stage contains fallback nodes and apply new resource profile") {
+ withSQLConf(
+ GlutenConfig.COLUMNAR_SHUFFLE_ENABLED.key -> "false",
+ GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD.key
-> "0.1") {
+ runQueryAndCompare("select c1, count(*) from tmp1 group by c1") {
+ df =>
+ val plan = df.queryExecution.executedPlan
+ // scalastyle:off
+ // format: off
+ /*
+ VeloxColumnarToRow
+ +- ^(7) HashAggregateTransformer(keys=[c1#22],
functions=[count(1)], isStreamingAgg=false, output=[c1#22, count(1)#33L])
+ +- ^(7) InputIteratorTransformer[c1#22, count#37L]
+ +- RowToVeloxColumnar
+ +- AQEShuffleRead coalesced
+ +- ShuffleQueryStage 0
+ +- Exchange hashpartitioning(c1#22, 5),
ENSURE_REQUIREMENTS, [plan_id=615]
+ +- ApplyResourceProfile Profile: id = 0,
executor resources: cores -> name: cores, amount: 1, script: , vendor: ,memory
-> name: memory, amount: 1024, script: , vendor: ,offHeap -> name: offHeap,
amount: 2048, script: , vendor: , task resources: cpus -> name: cpus, amount:
1.0
+ +- VeloxColumnarToRow
+ +- ^(6)
FlushableHashAggregateTransformer(keys=[c1#22], functions=[partial_count(1)],
isStreamingAgg=false, output=[c1#22, count#37L])
+ +- ^(6) FileScanTransformer parquet
default.tmp1[c1#22] Batched: true, DataFilters: [],
+ */
+ // format: on
+ // scalastyle:on
+ assert(collectColumnarShuffleExchange(plan) == 0)
+ assert(collectShuffleExchange(plan) == 1)
+
+ val wholeQueryColumnarToRow = collectColumnarToRow(plan)
+ assert(wholeQueryColumnarToRow == 2)
+
+ val applyResourceProfileExec = collectApplyResourceProfileExec(plan)
+ // here we can't check the applied resource profile since
+ // ResourceProfiles are only supported on YARN and Kubernetes
+ // with dynamic allocation enabled. In testing mode, we apply
+ // default resource profile to make sure ut works.
+ assert(applyResourceProfileExec == 1)
+ }
+ }
+ }
+
+ test("Apply new resource profile when whole stage fallback") {
+ withSQLConf(
+ GlutenConfig.COLUMNAR_FALLBACK_PREFER_COLUMNAR.key -> "false",
+ GlutenConfig.COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR.key -> "false",
+ GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
+ GlutenConfig.RAS_ENABLED.key -> "false"
+ ) {
+ runQueryAndCompare(
+ "select " +
+ "java_method('java.lang.Integer', 'signum', tmp1.c1), count(*) " +
+ "from tmp1 group by java_method('java.lang.Integer', 'signum',
tmp1.c1)") {
+ // scalastyle:off
+ // format: off
+ /*
+ DeserializeToObject createexternalrow(java_method(java.lang.Integer,
signum, c1)#35.toString, count(1)#36L,
StructField(java_method(java.lang.Integer, signum, c1),StringType,true),
StructField(count(1),LongType,false)), obj#42: org.apache.spark.sql.Row
+ +- *(3) HashAggregate(keys=[_nondeterministic#37],
functions=[count(1)], output=[java_method(java.lang.Integer, signum, c1)#35,
count(1)#36L])
+ +- AQEShuffleRead coalesced
+ +- ShuffleQueryStage 0
+ +- Exchange hashpartitioning(_nondeterministic#37, 5),
ENSURE_REQUIREMENTS, [plan_id=607]
+ +- ApplyResourceProfile Profile: id = 0, executor
resources: cores -> name: cores, amount: 1, script: , vendor: ,memory -> name:
memory, amount: 1024, script: , vendor: ,offHeap -> name: offHeap, amount:
2048, script: , vendor: , task resources: cpus -> name: cpus, amount: 1.0
+ +- *(2) HashAggregate(keys=[_nondeterministic#37],
functions=[partial_count(1)], output=[_nondeterministic#37, count#41L])
+ +- Project [java_method(java.lang.Integer, signum,
c1#22) AS _nondeterministic#37]
+ +- *(1) ColumnarToRow
+ +- FileScan parquet default.tmp1[c1#22]
Batched: true, DataFilters: [], Format: Parquet
+ */
+ // format: on
+ // scalastyle:on
+ df =>
assert(collectApplyResourceProfileExec(df.queryExecution.executedPlan) == 1)
+ }
+ }
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala
new file mode 100644
index 0000000000..1e9197b610
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan}
+import org.apache.gluten.logging.LogLevelUtil
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile,
ResourceProfileManager, TaskResourceRequest}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import
org.apache.spark.sql.execution.GlutenAutoAdjustStageResourceProfile.{applyNewResourceProfileIfPossible,
collectStagePlan}
+import org.apache.spark.sql.execution.adaptive.QueryStageExec
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec}
+import org.apache.spark.sql.execution.exchange.Exchange
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.SparkTestUtil
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * This rule is used to dynamic adjust stage resource profile for following
purposes:
+ * 1. swap offheap and onheap memory size when whole stage fallback happened
2. increase executor
+ * heap memory if stage contains gluten operator and spark operator at
the same time. Note: we
+ * don't support set resource profile for final stage now. Todo: support
set resource profile
+ * for final stage.
+ */
+@Experimental
+case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig,
spark: SparkSession)
+ extends Rule[SparkPlan]
+ with LogLevelUtil {
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!glutenConf.enableAutoAdjustStageResourceProfile) {
+ return plan
+ }
+ if (!SQLConf.get.adaptiveExecutionEnabled) {
+ return plan
+ }
+ if (!plan.isInstanceOf[Exchange]) {
+ // todo: support set resource profile for final stage
+ return plan
+ }
+ val planNodes = collectStagePlan(plan)
+ if (planNodes.isEmpty) {
+ return plan
+ }
+ log.info(s"detailPlanNodes ${planNodes.map(_.nodeName).mkString("Array(",
", ", ")")}")
+
+ // one stage is considered as fallback if all node is not GlutenPlan
+ // or all GlutenPlan node is C2R node.
+ val wholeStageFallback = planNodes
+ .filter(_.isInstanceOf[GlutenPlan])
+ .count(!_.isInstanceOf[ColumnarToRowExecBase]) == 0
+
+ val rpManager = spark.sparkContext.resourceProfileManager
+ val defaultRP = rpManager.defaultResourceProfile
+
+ // initial resource profile config as default resource profile
+ val taskResource = mutable.Map.empty[String, TaskResourceRequest] ++=
defaultRP.taskResources
+ val executorResource =
+ mutable.Map.empty[String, ExecutorResourceRequest] ++=
defaultRP.executorResources
+ val memoryRequest = executorResource.get(ResourceProfile.MEMORY)
+ val offheapRequest = executorResource.get(ResourceProfile.OFFHEAP_MEM)
+ logInfo(s"default memory request $memoryRequest")
+ logInfo(s"default offheap request $offheapRequest")
+
+ // case 1: whole stage fallback to vanilla spark in such case we increase
the heap
+ if (wholeStageFallback) {
+ val newMemoryAmount = memoryRequest.get.amount *
glutenConf.autoAdjustStageRPHeapRatio
+ val newExecutorMemory =
+ new ExecutorResourceRequest(ResourceProfile.MEMORY,
newMemoryAmount.toLong)
+ executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
+ val newRP = new ResourceProfile(executorResource.toMap,
taskResource.toMap)
+ return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
+ }
+
+ // case 2: check whether fallback exists and decide whether increase heap
memory
+ val fallenNodeCnt = planNodes.count(p => !p.isInstanceOf[GlutenPlan])
+ val totalCount = planNodes.size
+
+ if (1.0 * fallenNodeCnt / totalCount >=
glutenConf.autoAdjustStageFallenNodeThreshold) {
+ val newMemoryAmount = memoryRequest.get.amount *
glutenConf.autoAdjustStageRPHeapRatio;
+ val newExecutorMemory =
+ new ExecutorResourceRequest(ResourceProfile.MEMORY,
newMemoryAmount.toLong)
+ executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
+ val newRP = new ResourceProfile(executorResource.toMap,
taskResource.toMap)
+ return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
+ }
+ plan
+ }
+}
+
+object GlutenAutoAdjustStageResourceProfile extends Logging {
+ // collect all plan nodes belong to this stage including child query stage
+ // but exclude query stage child
+ def collectStagePlan(plan: SparkPlan): ArrayBuffer[SparkPlan] = {
+
+ def collectStagePlan(plan: SparkPlan, planNodes: ArrayBuffer[SparkPlan]):
Unit = {
+ if (plan.isInstanceOf[DataWritingCommandExec] ||
plan.isInstanceOf[ExecutedCommandExec]) {
+ // todo: support set final stage's resource profile
+ return
+ }
+ planNodes += plan
+ if (plan.isInstanceOf[QueryStageExec]) {
+ return
+ }
+ plan.children.foreach(collectStagePlan(_, planNodes))
+ }
+
+ val planNodes = new ArrayBuffer[SparkPlan]()
+ collectStagePlan(plan, planNodes)
+ planNodes
+ }
+
+ private def getFinalResourceProfile(
+ rpManager: ResourceProfileManager,
+ newRP: ResourceProfile): ResourceProfile = {
+ // Just for test
+ // ResourceProfiles are only supported on YARN and Kubernetes with dynamic
allocation enabled
+ if (SparkTestUtil.isTesting) {
+ return rpManager.defaultResourceProfile
+ }
+ val maybeEqProfile = rpManager.getEquivalentProfile(newRP)
+ if (maybeEqProfile.isDefined) {
+ maybeEqProfile.get
+ } else {
+ // register new resource profile here
+ rpManager.addResourceProfile(newRP)
+ newRP
+ }
+ }
+
+ def applyNewResourceProfileIfPossible(
+ plan: SparkPlan,
+ rp: ResourceProfile,
+ rpManager: ResourceProfileManager): SparkPlan = {
+ val finalRP = getFinalResourceProfile(rpManager, rp)
+ if (plan.isInstanceOf[Exchange]) {
+ // Wrap the plan with ApplyResourceProfileExec so that we can apply new
ResourceProfile
+ val wrapperPlan = ApplyResourceProfileExec(plan.children.head, finalRP)
+ logInfo(s"Apply resource profile $finalRP for plan
${wrapperPlan.nodeName}")
+ plan.withNewChildren(IndexedSeq(wrapperPlan))
+ } else {
+ logInfo(s"Ignore apply resource profile for plan ${plan.nodeName}")
+ // todo: support set final stage's resource profile
+ plan
+ }
+ }
+}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 7de5f92420..0c5d39ec76 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -496,6 +496,13 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def parquetEncryptionValidationEnabled: Boolean =
getConf(ENCRYPTED_PARQUET_FALLBACK_ENABLED)
+ def enableAutoAdjustStageResourceProfile: Boolean =
+ getConf(AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED)
+
+ def autoAdjustStageRPHeapRatio: Double =
getConf(AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO)
+
+ def autoAdjustStageFallenNodeThreshold: Double =
+ getConf(AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD)
}
object GlutenConfig {
@@ -2274,4 +2281,26 @@ object GlutenConfig {
.stringConf
.createWithDefault("")
+ val AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED =
+ buildStaticConf("spark.gluten.auto.adjustStageResource.enabled")
+ .internal()
+ .doc("Experimental: If enabled, gluten will try to set the stage
resource according " +
+ "to stage execution plan. Only worked when aqe is enabled at the same
time!!")
+ .booleanConf
+ .createWithDefault(false)
+
+ val AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO =
+ buildConf("spark.gluten.auto.adjustStageResources.heap.ratio")
+ .internal()
+ .doc("Experimental: Increase executor heap memory when match adjust
stage resource rule.")
+ .doubleConf
+ .createWithDefault(2.0d)
+
+ val AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD =
+
buildConf("spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold")
+ .internal()
+ .doc("Experimental: Increase executor heap memory when stage contains
fallen node " +
+ "count exceeds the total node count ratio.")
+ .doubleConf
+ .createWithDefault(0.5d)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]