This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cb314870dd [GLUTEN-7143][VL] Fix several UTs for RAS (#7701)
cb314870dd is described below
commit cb314870dd27a746f95f3f88ebc870c2cf9d11c0
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Oct 29 16:13:00 2024 +0800
[GLUTEN-7143][VL] Fix several UTs for RAS (#7701)
---
.../apache/gluten/execution/FallbackSuite.scala | 10 ++++++--
.../org/apache/gluten/planner/VeloxRasSuite.scala | 12 +++------
.../gluten/extension/GlutenSessionExtensions.scala | 2 +-
.../extension/injector/InjectorControl.scala | 21 +++++++++-------
.../columnar/enumerated/EnumeratedTransform.scala | 3 ++-
.../gluten/planner/cost/LegacyCostModel.scala | 6 +++--
.../gluten/planner/cost/RoughCostModel.scala | 6 +++--
.../gluten/planner/plan/GlutenPlanModel.scala | 27 ++++++++++++++++++--
.../org/apache/gluten/planner/property/Conv.scala | 4 ---
.../scala/org/apache/gluten/utils/PlanUtil.scala | 9 ++++---
.../sql/execution/ColumnarWriteFilesExec.scala | 29 ++++++++++++++++++++++
.../scala/org/apache/gluten/GlutenConfig.scala | 4 +--
12 files changed, 97 insertions(+), 36 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 2680ea4101..597009ed71 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -97,7 +97,9 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
}
test("fallback with collect") {
- withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key ->
"1") {
+ withSQLConf(
+ GlutenConfig.RAS_ENABLED.key -> "false",
+ GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") {
runQueryAndCompare("SELECT count(*) FROM tmp1") {
df =>
val columnarToRow =
collectColumnarToRow(df.queryExecution.executedPlan)
@@ -141,6 +143,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
test("fallback final aggregate of collect_list") {
withSQLConf(
+ GlutenConfig.RAS_ENABLED.key -> "false",
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
GlutenConfig.COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR.key -> "false",
GlutenConfig.EXPRESSION_BLACK_LIST.key -> "element_at"
@@ -159,6 +162,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
// until we can exactly align with vanilla Spark.
ignore("fallback final aggregate of collect_set") {
withSQLConf(
+ GlutenConfig.RAS_ENABLED.key -> "false",
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
GlutenConfig.COLUMNAR_FALLBACK_IGNORE_ROW_TO_COLUMNAR.key -> "false",
GlutenConfig.EXPRESSION_BLACK_LIST.key -> "element_at"
@@ -191,7 +195,9 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
}
test("Do not fallback eagerly with ColumnarToRowExec") {
- withSQLConf(GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key ->
"1") {
+ withSQLConf(
+ GlutenConfig.RAS_ENABLED.key -> "false",
+ GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1") {
runQueryAndCompare("select count(*) from tmp1") {
df =>
assert(
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
index 646e94c696..38a6832c4e 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
@@ -21,7 +21,6 @@ import
org.apache.gluten.extension.columnar.transition.ConventionReq
import org.apache.gluten.planner.cost.GlutenCostModel
import org.apache.gluten.planner.property.Conv
import org.apache.gluten.ras.{Cost, CostModel, Ras}
-import org.apache.gluten.ras.Best.BestNotFoundException
import org.apache.gluten.ras.RasSuiteBase._
import org.apache.gluten.ras.path.RasPath
import org.apache.gluten.ras.property.PropertySet
@@ -111,13 +110,10 @@ class VeloxRasSuite extends SharedSparkSession {
val out = planner.plan()
assert(out == RowUnary(RowLeaf(EMPTY_SCHEMA)))
- assertThrows[BestNotFoundException] {
- // Could not optimize to columnar output since R2C transitions for empty
schema node
- // is not allowed.
- val planner2 =
- newRas().newPlanner(in,
PropertySet(List(Conv.req(ConventionReq.vanillaBatch))))
- planner2.plan()
- }
+ val planner2 =
+ newRas().newPlanner(in,
PropertySet(List(Conv.req(ConventionReq.vanillaBatch))))
+ val out2 = planner2.plan()
+ assert(out2 == RowToColumnarExec(RowUnary(RowLeaf(EMPTY_SCHEMA))))
}
test("User cost model") {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index 5c8b2260d3..d5afc6b7e7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -32,7 +32,7 @@ private[gluten] class GlutenSessionExtensions
injector.control.disableOn {
session =>
val glutenEnabledGlobally = session.conf
- .get(GlutenConfig.GLUTEN_ENABLED_KEY,
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
+ .get(GlutenConfig.GLUTEN_ENABLED_KEY,
GlutenConfig.GLUTEN_ENABLED_BY_DEFAULT.toString)
.toBoolean
val disabled = !glutenEnabledGlobally
logDebug(s"Gluten is disabled by variable: glutenEnabledGlobally:
$glutenEnabledGlobally")
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
index fd16d3dbad..335eac232e 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.extension.injector
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
@@ -98,7 +99,7 @@ object InjectorControl {
method.invoke(after, args: _*)
} catch {
case e: InvocationTargetException =>
- // Unwrap the UTE.
+ // Unwrap the ITE.
throw e.getCause
}
}
@@ -109,15 +110,17 @@ object InjectorControl {
def wrapFunction(functionDescription: FunctionDescription):
FunctionDescription = {
val (identifier, info, builder) = functionDescription
- val wrappedBuilder: FunctionBuilder = children => {
- if (
- disabler.disabled(SparkSession.getActiveSession.getOrElse(
- throw new IllegalStateException("Active Spark session not
found")))
- ) {
- throw new UnsupportedOperationException(
- s"Function ${info.getName} is not callable as Gluten is
disabled")
+ val wrappedBuilder: FunctionBuilder = new FunctionBuilder with
DisablerAware {
+ override def apply(children: Seq[Expression]): Expression = {
+ if (
+ disabler.disabled(SparkSession.getActiveSession.getOrElse(
+ throw new IllegalStateException("Active Spark session not
found")))
+ ) {
+ throw new UnsupportedOperationException(
+ s"Function ${info.getName} is not callable as Gluten is
disabled")
+ }
+ builder(children)
}
- builder(children)
}
(identifier, info, wrappedBuilder)
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index ea9078df1e..e6d1c4859e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -82,7 +82,8 @@ case class EnumeratedTransform(session: SparkSession,
outputsColumnar: Boolean)
.from(SparkShimLoader.getSparkShims.isWindowGroupLimitExec,
OffloadOthers()),
RasOffload.from[LimitExec](OffloadOthers()),
RasOffload.from[GenerateExec](OffloadOthers()),
- RasOffload.from[EvalPythonExec](OffloadOthers())
+ RasOffload.from[EvalPythonExec](OffloadOthers()),
+ RasOffload.from[SampleExec](OffloadOthers())
).map(RasOffload.Rule(_, validator))
private val optimization = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala
index c6f4f1fc9f..2e32e3f06a 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/LegacyCostModel.scala
@@ -16,10 +16,10 @@
*/
package org.apache.gluten.planner.cost
-import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike}
+import
org.apache.gluten.extension.columnar.transition.{ColumnarToColumnarLike,
ColumnarToRowLike, RowToColumnarLike}
import org.apache.gluten.utils.PlanUtil
-import org.apache.spark.sql.execution.{ColumnarToRowExec, ProjectExec,
RowToColumnarExec, SparkPlan}
+import org.apache.spark.sql.execution.{ColumnarToRowExec,
ColumnarWriteFilesExec, ProjectExec, RowToColumnarExec, SparkPlan}
/**
* A cost model that is supposed to drive RAS planner create the same query
plan with legacy
@@ -32,10 +32,12 @@ class LegacyCostModel extends LongCostModel {
// much as possible.
override def selfLongCostOf(node: SparkPlan): Long = {
node match {
+ case ColumnarWriteFilesExec.OnNoopLeafPath(_) => 0
case ColumnarToRowExec(_) => 10L
case RowToColumnarExec(_) => 10L
case ColumnarToRowLike(_) => 10L
case RowToColumnarLike(_) => 10L
+ case ColumnarToColumnarLike(_) => 5L
case p if PlanUtil.isGlutenColumnarOp(p) => 10L
// 1. 100L << 1000L, to keep the pulled out non-offload-able projects if
the main op
// turns into offload-able after pulling.
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala
index 9b499b967c..2576a10084 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel.scala
@@ -17,11 +17,11 @@
package org.apache.gluten.planner.cost
import org.apache.gluten.execution.RowToColumnarExecBase
-import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike}
+import
org.apache.gluten.extension.columnar.transition.{ColumnarToColumnarLike,
ColumnarToRowLike, RowToColumnarLike}
import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
NamedExpression}
-import org.apache.spark.sql.execution.{ColumnarToRowExec, ProjectExec,
RowToColumnarExec, SparkPlan}
+import org.apache.spark.sql.execution.{ColumnarToRowExec,
ColumnarWriteFilesExec, ProjectExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
/** A rough cost model with some empirical heuristics. */
@@ -29,6 +29,7 @@ class RoughCostModel extends LongCostModel {
override def selfLongCostOf(node: SparkPlan): Long = {
node match {
+ case ColumnarWriteFilesExec.OnNoopLeafPath(_) => 0
case ProjectExec(projectList, _) if
projectList.forall(isCheapExpression) =>
// Make trivial ProjectExec has the same cost as ProjectExecTransform
to reduce unnecessary
// c2r and r2c.
@@ -41,6 +42,7 @@ class RoughCostModel extends LongCostModel {
case RowToColumnarExec(_) => 10L
case ColumnarToRowLike(_) => 10L
case RowToColumnarLike(_) => 10L
+ case ColumnarToColumnarLike(_) => 5L
case p if PlanUtil.isGlutenColumnarOp(p) => 10L
case p if PlanUtil.isVanillaColumnarOp(p) => 1000L
// Other row ops. Usually a vanilla row op.
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
index 0b3adbbe49..abf7908a38 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
@@ -28,6 +28,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{ColumnarToRowExec, LeafExecNode,
SparkPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
import org.apache.spark.task.{SparkTaskUtil, TaskResources}
import java.util.{Objects, Properties}
@@ -96,9 +97,10 @@ object GlutenPlanModel {
other.withNewChildren(children)
}
- override def hashCode(node: SparkPlan): Int = Objects.hashCode(node)
+ override def hashCode(node: SparkPlan): Int =
Objects.hashCode(withEqualityWrapper(node))
- override def equals(one: SparkPlan, other: SparkPlan): Boolean =
Objects.equals(one, other)
+ override def equals(one: SparkPlan, other: SparkPlan): Boolean =
+ Objects.equals(withEqualityWrapper(one), withEqualityWrapper(other))
override def newGroupLeaf(
groupId: Int,
@@ -115,5 +117,26 @@ object GlutenPlanModel {
case gl: GroupLeafExec => gl.groupId
case _ => throw new IllegalStateException()
}
+
+ private def withEqualityWrapper(node: SparkPlan): AnyRef = node match {
+ case scan: DataSourceV2ScanExecBase =>
+ // Override V2 scan operators' equality implementation to include
output attributes.
+ //
+ // Spark's V2 scans don't incorporate out attributes in equality so
E.g.,
+ // BatchScan[date#1] can be considered equal to BatchScan[date#2],
which is unexpected
+ // in RAS planner because it strictly relies on plan equalities for
sanity.
+ //
+ // Related UT: `VeloxOrcDataTypeValidationSuite#Date type`
+ // Related Spark PRs:
+ // https://github.com/apache/spark/pull/23086
+ // https://github.com/apache/spark/pull/23619
+ // https://github.com/apache/spark/pull/23430
+ ScanV2ExecEqualityWrapper(scan, scan.output)
+ case other => other
+ }
+
+ private case class ScanV2ExecEqualityWrapper(
+ scan: DataSourceV2ScanExecBase,
+ output: Seq[Attribute])
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/property/Conv.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/property/Conv.scala
index 18db0f9594..78f290d195 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/property/Conv.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/property/Conv.scala
@@ -89,10 +89,6 @@ object ConvDef extends PropertyDef[SparkPlan, Conv] {
case class ConvEnforcerRule(reqConv: Conv) extends RasRule[SparkPlan] {
override def shift(node: SparkPlan): Iterable[SparkPlan] = {
- if (node.output.isEmpty) {
- // Disable transitions for node that has output with empty schema.
- return List.empty
- }
val conv = Conv.get(node)
if (conv.satisfies(reqConv)) {
return List.empty
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index 9ebd722781..f2b1bd0900 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.utils
-import org.apache.gluten.backend.Backend
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.sql.execution._
@@ -27,7 +26,7 @@ import scala.annotation.tailrec
object PlanUtil {
private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
- Convention.get(i).batchType == Backend.get().defaultBatchType
+ isGlutenBatchType(Convention.get(i).batchType)
}
@tailrec
@@ -47,6 +46,10 @@ object PlanUtil {
}
def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
- Convention.get(plan).batchType == Backend.get().defaultBatchType
+ isGlutenBatchType(Convention.get(plan).batchType)
+ }
+
+ private def isGlutenBatchType(batchType: Convention.BatchType) = {
+ batchType != Convention.BatchType.None && batchType !=
Convention.BatchType.VanillaBatch
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index 92dd74796c..0f2139c3b6 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -139,6 +140,7 @@ object ColumnarWriteFilesExec {
bucketSpec,
options,
staticPartitions)
+ right.foreach(node => node.setTagValue(NoopTag, true))
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec(
child,
@@ -150,6 +152,33 @@ object ColumnarWriteFilesExec {
staticPartitions)
}
+ private val NoopTag =
+
TreeNodeTag[Boolean]("org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopTag")
+
+ // Decides whether a plan not is on the dummy `WriteFilesExec + NoopLeaf`
path.
+ object OnNoopLeafPath {
+ def unapply(plan: SparkPlan): Option[NoopLeaf] = {
+ val leafs = traverseDown(plan)
+ if (leafs.size > 1) {
+ throw new IllegalArgumentException(s"More than one noop leafs were
found in plan: $plan")
+ }
+ leafs.headOption
+ }
+
+ private def traverseDown(plan: SparkPlan): Seq[NoopLeaf] = {
+ val hasNoopTag = plan.getTagValue(NoopTag).getOrElse(false)
+ if (!hasNoopTag) {
+ return Nil
+ }
+ plan match {
+ case leaf: NoopLeaf =>
+ Seq(leaf)
+ case other =>
+ other.children.map(traverseDown).reduce(_ ++ _)
+ }
+ }
+ }
+
case class NoopLeaf() extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new GlutenException(s"$nodeName does not support doExecute")
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 789d57414d..49ed28d1dd 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -476,7 +476,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
object GlutenConfig {
import SQLConf._
- var GLUTEN_ENABLE_BY_DEFAULT = true
+ var GLUTEN_ENABLED_BY_DEFAULT = true
val GLUTEN_ENABLED_KEY = "spark.gluten.enabled"
val GLUTEN_LIB_NAME = "spark.gluten.sql.columnar.libname"
val GLUTEN_LIB_PATH = "spark.gluten.sql.columnar.libpath"
@@ -821,7 +821,7 @@ object GlutenConfig {
.doc("Whether to enable gluten. Default value is true. Just an
experimental property." +
" Recommend to enable/disable Gluten through the setting for
spark.plugins.")
.booleanConf
- .createWithDefault(GLUTEN_ENABLE_BY_DEFAULT)
+ .createWithDefault(GLUTEN_ENABLED_BY_DEFAULT)
// FIXME the option currently controls both JVM and native validation
against a Substrait plan.
val NATIVE_VALIDATION_ENABLED =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]