This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 291ff35722 [GLUTEN-8018][VL] Support adjusting stage resource profile 
dynamically (#8209)
291ff35722 is described below

commit 291ff35722430ebcab98f957a3daa0b787d111d6
Author: Terry Wang <[email protected]>
AuthorDate: Fri Jan 24 19:35:54 2025 +0800

    [GLUTEN-8018][VL] Support adjusting stage resource profile dynamically 
(#8209)
---
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   2 +
 .../AutoAdjustStageResourceProfileSuite.scala      | 163 ++++++++++++++++++++
 .../GlutenAutoAdjustStageResourceProfile.scala     | 167 +++++++++++++++++++++
 .../org/apache/gluten/config/GlutenConfig.scala    |  29 ++++
 4 files changed, 361 insertions(+)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 14227dd7d0..acf50383e2 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -113,6 +113,7 @@ object VeloxRuleApi {
 
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
+    injector.injectFinal(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
     injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
     injector.injectFinal(_ => RemoveFallbackTagRule())
   }
@@ -187,6 +188,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
ColumnarCollapseTransformStages(c.glutenConf))
     injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
     injector.injectPostTransform(c => 
RemoveGlutenTableCacheColumnarToRow(c.session))
+    injector.injectPostTransform(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
     injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, 
c.session))
     injector.injectPostTransform(_ => RemoveFallbackTagRule())
   }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala
new file mode 100644
index 0000000000..a6b25e5d64
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/AutoAdjustStageResourceProfileSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.{ApplyResourceProfileExec, 
ColumnarShuffleExchangeExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+
+@Experimental
+class AutoAdjustStageResourceProfileSuite
+  extends VeloxWholeStageTransformerSuite
+  with AdaptiveSparkPlanHelper {
+  protected val rootPath: String = getClass.getResource("/").getPath
+  override protected val resourcePath: String = "/tpch-data-parquet"
+  override protected val fileFormat: String = "parquet"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.memory.offHeap.size", "2g")
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.gluten.auto.adjustStageResource.enabled", "true")
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    spark
+      .range(100)
+      .selectExpr("cast(id % 3 as int) as c1", "id as c2")
+      .write
+      .format("parquet")
+      .saveAsTable("tmp1")
+    spark
+      .range(100)
+      .selectExpr("cast(id % 9 as int) as c1")
+      .write
+      .format("parquet")
+      .saveAsTable("tmp2")
+    spark
+      .range(100)
+      .selectExpr("cast(id % 3 as int) as c1", "cast(id % 9 as int) as c2")
+      .write
+      .format("parquet")
+      .saveAsTable("tmp3")
+  }
+
+  override protected def afterAll(): Unit = {
+    spark.sql("drop table tmp1")
+    spark.sql("drop table tmp2")
+    spark.sql("drop table tmp3")
+
+    super.afterAll()
+  }
+
+  private def collectColumnarToRow(plan: SparkPlan): Int = {
+    collect(plan) { case v: VeloxColumnarToRowExec => v }.size
+  }
+
+  private def collectColumnarShuffleExchange(plan: SparkPlan): Int = {
+    collect(plan) { case c: ColumnarShuffleExchangeExec => c }.size
+  }
+
+  private def collectShuffleExchange(plan: SparkPlan): Int = {
+    collect(plan) { case c: ShuffleExchangeExec => c }.size
+  }
+
+  private def collectApplyResourceProfileExec(plan: SparkPlan): Int = {
+    collect(plan) { case c: ApplyResourceProfileExec => c }.size
+  }
+
+  test("stage contains fallback nodes and apply new resource profile") {
+    withSQLConf(
+      GlutenConfig.COLUMNAR_SHUFFLE_ENABLED.key -> "false",
+      GlutenConfig.AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD.key 
-> "0.1") {
+      runQueryAndCompare("select c1, count(*) from tmp1 group by c1") {
+        df =>
+          val plan = df.queryExecution.executedPlan
+          // scalastyle:off
+          // format: off
+          /*
+            VeloxColumnarToRow
+            +- ^(7) HashAggregateTransformer(keys=[c1#22], 
functions=[count(1)], isStreamingAgg=false, output=[c1#22, count(1)#33L])
+               +- ^(7) InputIteratorTransformer[c1#22, count#37L]
+                  +- RowToVeloxColumnar
+                     +- AQEShuffleRead coalesced
+                        +- ShuffleQueryStage 0
+                           +- Exchange hashpartitioning(c1#22, 5), 
ENSURE_REQUIREMENTS, [plan_id=615]
+                              +- ApplyResourceProfile Profile: id = 0, 
executor resources: cores -> name: cores, amount: 1, script: , vendor: ,memory 
-> name: memory, amount: 1024, script: , vendor: ,offHeap -> name: offHeap, 
amount: 2048, script: , vendor: , task resources: cpus -> name: cpus, amount: 
1.0
+                                 +- VeloxColumnarToRow
+                                    +- ^(6) 
FlushableHashAggregateTransformer(keys=[c1#22], functions=[partial_count(1)], 
isStreamingAgg=false, output=[c1#22, count#37L])
+                                       +- ^(6) FileScanTransformer parquet 
default.tmp1[c1#22] Batched: true, DataFilters: [],
+           */
+          // format: on
+          // scalastyle:on
+          assert(collectColumnarShuffleExchange(plan) == 0)
+          assert(collectShuffleExchange(plan) == 1)
+
+          val wholeQueryColumnarToRow = collectColumnarToRow(plan)
+          assert(wholeQueryColumnarToRow == 2)
+
+          val applyResourceProfileExec = collectApplyResourceProfileExec(plan)
+          // here we can't check the applied resource profile since
+          // ResourceProfiles are only supported on YARN and Kubernetes
+          // with dynamic allocation enabled. In testing mode, we apply
+          // default resource profile to make sure ut works.
+          assert(applyResourceProfileExec == 1)
+      }
+    }
+  }
+
+  test("Apply new resource profile when whole stage fallback") {
+    withSQLConf(
+      GlutenConfig.COLUMNAR_FALLBACK_PREFER_COLUMNAR.key -> "false",
+      GlutenConfig.COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR.key -> "false",
+      GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
+      GlutenConfig.RAS_ENABLED.key -> "false"
+    ) {
+      runQueryAndCompare(
+        "select " +
+          "java_method('java.lang.Integer', 'signum', tmp1.c1), count(*) " +
+          "from tmp1 group by java_method('java.lang.Integer', 'signum', 
tmp1.c1)") {
+        // scalastyle:off
+        // format: off
+        /*
+         DeserializeToObject createexternalrow(java_method(java.lang.Integer, 
signum, c1)#35.toString, count(1)#36L, 
StructField(java_method(java.lang.Integer, signum, c1),StringType,true), 
StructField(count(1),LongType,false)), obj#42: org.apache.spark.sql.Row
+         +- *(3) HashAggregate(keys=[_nondeterministic#37], 
functions=[count(1)], output=[java_method(java.lang.Integer, signum, c1)#35, 
count(1)#36L])
+            +- AQEShuffleRead coalesced
+               +- ShuffleQueryStage 0
+                  +- Exchange hashpartitioning(_nondeterministic#37, 5), 
ENSURE_REQUIREMENTS, [plan_id=607]
+                     +- ApplyResourceProfile Profile: id = 0, executor 
resources: cores -> name: cores, amount: 1, script: , vendor: ,memory -> name: 
memory, amount: 1024, script: , vendor: ,offHeap -> name: offHeap, amount: 
2048, script: , vendor: , task resources: cpus -> name: cpus, amount: 1.0
+                        +- *(2) HashAggregate(keys=[_nondeterministic#37], 
functions=[partial_count(1)], output=[_nondeterministic#37, count#41L])
+                           +- Project [java_method(java.lang.Integer, signum, 
c1#22) AS _nondeterministic#37]
+                              +- *(1) ColumnarToRow
+                                 +- FileScan parquet default.tmp1[c1#22] 
Batched: true, DataFilters: [], Format: Parquet
+         */
+        // format: on
+        // scalastyle:on
+        df => 
assert(collectApplyResourceProfileExec(df.queryExecution.executedPlan) == 1)
+      }
+    }
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala
new file mode 100644
index 0000000000..1e9197b610
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan}
+import org.apache.gluten.logging.LogLevelUtil
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, 
ResourceProfileManager, TaskResourceRequest}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import 
org.apache.spark.sql.execution.GlutenAutoAdjustStageResourceProfile.{applyNewResourceProfileIfPossible,
 collectStagePlan}
+import org.apache.spark.sql.execution.adaptive.QueryStageExec
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec}
+import org.apache.spark.sql.execution.exchange.Exchange
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.SparkTestUtil
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * This rule is used to dynamic adjust stage resource profile for following 
purposes:
+ *   1. swap offheap and onheap memory size when whole stage fallback happened 
2. increase executor
+ *      heap memory if stage contains gluten operator and spark operator at 
the same time. Note: we
+ *      don't support set resource profile for final stage now. Todo: support 
set resource profile
+ *      for final stage.
+ */
+@Experimental
+case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, 
spark: SparkSession)
+  extends Rule[SparkPlan]
+  with LogLevelUtil {
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!glutenConf.enableAutoAdjustStageResourceProfile) {
+      return plan
+    }
+    if (!SQLConf.get.adaptiveExecutionEnabled) {
+      return plan
+    }
+    if (!plan.isInstanceOf[Exchange]) {
+      // todo: support set resource profile for final stage
+      return plan
+    }
+    val planNodes = collectStagePlan(plan)
+    if (planNodes.isEmpty) {
+      return plan
+    }
+    log.info(s"detailPlanNodes ${planNodes.map(_.nodeName).mkString("Array(", 
", ", ")")}")
+
+    // one stage is considered as fallback if all node is not GlutenPlan
+    // or all GlutenPlan node is C2R node.
+    val wholeStageFallback = planNodes
+      .filter(_.isInstanceOf[GlutenPlan])
+      .count(!_.isInstanceOf[ColumnarToRowExecBase]) == 0
+
+    val rpManager = spark.sparkContext.resourceProfileManager
+    val defaultRP = rpManager.defaultResourceProfile
+
+    // initial resource profile config as default resource profile
+    val taskResource = mutable.Map.empty[String, TaskResourceRequest] ++= 
defaultRP.taskResources
+    val executorResource =
+      mutable.Map.empty[String, ExecutorResourceRequest] ++= 
defaultRP.executorResources
+    val memoryRequest = executorResource.get(ResourceProfile.MEMORY)
+    val offheapRequest = executorResource.get(ResourceProfile.OFFHEAP_MEM)
+    logInfo(s"default memory request $memoryRequest")
+    logInfo(s"default offheap request $offheapRequest")
+
+    // case 1: whole stage fallback to vanilla spark in such case we increase 
the heap
+    if (wholeStageFallback) {
+      val newMemoryAmount = memoryRequest.get.amount * 
glutenConf.autoAdjustStageRPHeapRatio
+      val newExecutorMemory =
+        new ExecutorResourceRequest(ResourceProfile.MEMORY, 
newMemoryAmount.toLong)
+      executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
+      val newRP = new ResourceProfile(executorResource.toMap, 
taskResource.toMap)
+      return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
+    }
+
+    // case 2: check whether fallback exists and decide whether increase heap 
memory
+    val fallenNodeCnt = planNodes.count(p => !p.isInstanceOf[GlutenPlan])
+    val totalCount = planNodes.size
+
+    if (1.0 * fallenNodeCnt / totalCount >= 
glutenConf.autoAdjustStageFallenNodeThreshold) {
+      val newMemoryAmount = memoryRequest.get.amount * 
glutenConf.autoAdjustStageRPHeapRatio;
+      val newExecutorMemory =
+        new ExecutorResourceRequest(ResourceProfile.MEMORY, 
newMemoryAmount.toLong)
+      executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)
+      val newRP = new ResourceProfile(executorResource.toMap, 
taskResource.toMap)
+      return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
+    }
+    plan
+  }
+}
+
+object GlutenAutoAdjustStageResourceProfile extends Logging {
+  // collect all plan nodes belong to this stage including child query stage
+  // but exclude query stage child
+  def collectStagePlan(plan: SparkPlan): ArrayBuffer[SparkPlan] = {
+
+    def collectStagePlan(plan: SparkPlan, planNodes: ArrayBuffer[SparkPlan]): 
Unit = {
+      if (plan.isInstanceOf[DataWritingCommandExec] || 
plan.isInstanceOf[ExecutedCommandExec]) {
+        // todo: support set final stage's resource profile
+        return
+      }
+      planNodes += plan
+      if (plan.isInstanceOf[QueryStageExec]) {
+        return
+      }
+      plan.children.foreach(collectStagePlan(_, planNodes))
+    }
+
+    val planNodes = new ArrayBuffer[SparkPlan]()
+    collectStagePlan(plan, planNodes)
+    planNodes
+  }
+
+  private def getFinalResourceProfile(
+      rpManager: ResourceProfileManager,
+      newRP: ResourceProfile): ResourceProfile = {
+    // Just for test
+    // ResourceProfiles are only supported on YARN and Kubernetes with dynamic 
allocation enabled
+    if (SparkTestUtil.isTesting) {
+      return rpManager.defaultResourceProfile
+    }
+    val maybeEqProfile = rpManager.getEquivalentProfile(newRP)
+    if (maybeEqProfile.isDefined) {
+      maybeEqProfile.get
+    } else {
+      // register new resource profile here
+      rpManager.addResourceProfile(newRP)
+      newRP
+    }
+  }
+
+  def applyNewResourceProfileIfPossible(
+      plan: SparkPlan,
+      rp: ResourceProfile,
+      rpManager: ResourceProfileManager): SparkPlan = {
+    val finalRP = getFinalResourceProfile(rpManager, rp)
+    if (plan.isInstanceOf[Exchange]) {
+      // Wrap the plan with ApplyResourceProfileExec so that we can apply new 
ResourceProfile
+      val wrapperPlan = ApplyResourceProfileExec(plan.children.head, finalRP)
+      logInfo(s"Apply resource profile $finalRP for plan 
${wrapperPlan.nodeName}")
+      plan.withNewChildren(IndexedSeq(wrapperPlan))
+    } else {
+      logInfo(s"Ignore apply resource profile for plan ${plan.nodeName}")
+      // todo: support set final stage's resource profile
+      plan
+    }
+  }
+}
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 7de5f92420..0c5d39ec76 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -496,6 +496,13 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def parquetEncryptionValidationEnabled: Boolean = 
getConf(ENCRYPTED_PARQUET_FALLBACK_ENABLED)
 
+  def enableAutoAdjustStageResourceProfile: Boolean =
+    getConf(AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED)
+
+  def autoAdjustStageRPHeapRatio: Double = 
getConf(AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO)
+
+  def autoAdjustStageFallenNodeThreshold: Double =
+    getConf(AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD)
 }
 
 object GlutenConfig {
@@ -2274,4 +2281,26 @@ object GlutenConfig {
       .stringConf
       .createWithDefault("")
 
+  val AUTO_ADJUST_STAGE_RESOURCE_PROFILE_ENABLED =
+    buildStaticConf("spark.gluten.auto.adjustStageResource.enabled")
+      .internal()
+      .doc("Experimental: If enabled, gluten will try to set the stage 
resource according " +
+        "to stage execution plan. Only worked when aqe is enabled at the same 
time!!")
+      .booleanConf
+      .createWithDefault(false)
+
+  val AUTO_ADJUST_STAGE_RESOURCES_HEAP_RATIO =
+    buildConf("spark.gluten.auto.adjustStageResources.heap.ratio")
+      .internal()
+      .doc("Experimental: Increase executor heap memory when match adjust 
stage resource rule.")
+      .doubleConf
+      .createWithDefault(2.0d)
+
+  val AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD =
+    
buildConf("spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold")
+      .internal()
+      .doc("Experimental: Increase executor heap memory when stage contains 
fallen node " +
+        "count exceeds the total node count ratio.")
+      .doubleConf
+      .createWithDefault(0.5d)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to