This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 619624a06a [VL] Rework the implementation of spark.gluten.enabled 
(#7672)
619624a06a is described below

commit 619624a06a578290f5996511ec08595f2590aca4
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Oct 25 19:09:36 2024 +0800

    [VL] Rework the implementation of spark.gluten.enabled (#7672)
---
 .../spark/sql/delta/commands/VacuumCommand.scala   |  58 ++++----
 .../spark/sql/delta/commands/VacuumCommand.scala   |  13 +-
 .../spark/sql/delta/commands/VacuumCommand.scala   |  11 +-
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   3 -
 .../extension/CHAQEPropagateEmptyRelation.scala    |   4 +-
 .../extension/FallbackBroadcastHashJoinRules.scala |   3 +-
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   3 -
 .../gluten/datasource/ArrowConvertorRule.scala     |   3 +-
 .../gluten/extension/ArrowScanReplaceRule.scala    |   3 +-
 .../BloomFilterMightContainJointRewriteRule.scala  |   3 +-
 .../gluten/extension/CollectRewriteRule.scala      |   3 +-
 .../apache/gluten/extension/HLLRewriteRule.scala   |   3 +-
 .../gluten/execution/MiscOperatorSuite.scala       |  70 +++++----
 .../gluten/extension/GlutenSessionExtensions.scala |  28 +++-
 .../extension/columnar/ColumnarRuleApplier.scala   |   9 --
 .../columnar/enumerated/EnumeratedApplier.scala    |  10 +-
 .../columnar/heuristic/HeuristicApplier.scala      |  16 +--
 .../gluten/extension/injector/GlutenInjector.scala |  49 +++----
 .../extension/injector/InjectorControl.scala       | 160 +++++++++++++++++++++
 .../gluten/extension/injector/RuleInjector.scala   |   5 +-
 .../gluten/extension/injector/SparkInjector.scala  |  43 +++---
 .../apache/gluten/extension/injector/package.scala |  35 +++++
 .../gluten/execution/VeloxIcebergSuite.scala       |  16 +--
 .../apache/gluten/utils/QueryPlanSelector.scala    |  80 -----------
 .../sql/execution/FallbackStrategiesSuite.scala    |  15 +-
 .../extension/GlutenSessionExtensionSuite.scala    |   9 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  14 +-
 .../extension/GlutenSessionExtensionSuite.scala    |   9 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  14 +-
 .../extension/GlutenSessionExtensionSuite.scala    |   9 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  14 +-
 .../extension/GlutenSessionExtensionSuite.scala    |   9 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     |   4 +-
 33 files changed, 423 insertions(+), 305 deletions(-)

diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index c5527933b2..939e6bcbf2 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.delta.commands
 
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.extension.GlutenSessionExtensions
 
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.delta._
@@ -145,9 +145,11 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
 
       // --- modified start
       val originalEnabledGluten =
-        
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+        
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
       // gluten can not support vacuum command
-      
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+      spark.sparkContext.setLocalProperty(
+        GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
+        "false")
       // --- modified end
 
       val validFiles = snapshot.stateDS
@@ -284,31 +286,37 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
         } else {
           allFilesAndDirs
             .where('modificationTime < deleteBeforeTimestamp || 'isDir)
-            .mapPartitions { fileStatusIterator =>
-              val reservoirBase = new Path(basePath)
-              val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
-              fileStatusIterator.flatMap { fileStatus =>
-                if (fileStatus.isDir) {
-                  Iterator.single(relativize(fileStatus.getPath, fs, 
reservoirBase, isDir = true))
-                } else {
-                  val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
-                  val dirsWithSlash = dirs.map { p =>
-                    relativize(new Path(p), fs, reservoirBase, isDir = true)
-                  }
-                  dirsWithSlash ++ Iterator(
-                    relativize(new Path(fileStatus.path), fs, reservoirBase, 
isDir = false))
+            .mapPartitions {
+              fileStatusIterator =>
+                val reservoirBase = new Path(basePath)
+                val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
+                fileStatusIterator.flatMap {
+                  fileStatus =>
+                    if (fileStatus.isDir) {
+                      Iterator.single(
+                        relativize(fileStatus.getPath, fs, reservoirBase, 
isDir = true))
+                    } else {
+                      val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
+                      val dirsWithSlash = dirs.map {
+                        p => relativize(new Path(p), fs, reservoirBase, isDir 
= true)
+                      }
+                      dirsWithSlash ++ Iterator(
+                        relativize(new Path(fileStatus.path), fs, 
reservoirBase, isDir = false))
+                    }
                 }
-              }
-            }.groupBy($"value" as 'path)
+            }
+            .groupBy($"value".as('path))
             .count()
             .join(validFiles, Seq("path"), "leftanti")
             .where('count === 1)
             .select('path)
             .as[String]
-            .map { relativePath =>
-              assert(!stringToPath(relativePath).isAbsolute,
-                "Shouldn't have any absolute paths for deletion here.")
-              pathToString(DeltaFileOperations.absolutePath(basePath, 
relativePath))
+            .map {
+              relativePath =>
+                assert(
+                  !stringToPath(relativePath).isAbsolute,
+                  "Shouldn't have any absolute paths for deletion here.")
+                pathToString(DeltaFileOperations.absolutePath(basePath, 
relativePath))
             }
         }
         // --- modified end
@@ -371,10 +379,12 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
         // --- modified start
         if (originalEnabledGluten != null) {
           spark.sparkContext.setLocalProperty(
-            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, 
originalEnabledGluten)
+            GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
+            originalEnabledGluten)
         } else {
           spark.sparkContext.setLocalProperty(
-            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+            GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
+            "true")
         }
         // --- modified end
       }
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 9f82feeee2..e59645f58c 100644
--- 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -21,19 +21,16 @@ package org.apache.spark.sql.delta.commands
 import java.net.URI
 import java.util.Date
 import java.util.concurrent.TimeUnit
-
 import scala.collection.JavaConverters._
-
 import org.apache.spark.sql.delta._
 import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
 import org.apache.spark.sql.delta.sources.DeltaSQLConf
 import org.apache.spark.sql.delta.util.DeltaFileOperations
 import 
org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-
-import org.apache.gluten.utils.QueryPlanSelector
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
@@ -161,9 +158,9 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
 
       // --- modified start
       val originalEnabledGluten =
-        
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+        
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
       // gluten can not support vacuum command
-      
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+      
spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
       // --- modified end
 
       val validFiles = snapshot.stateDS
@@ -362,10 +359,10 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
         // --- modified start
         if (originalEnabledGluten != null) {
           spark.sparkContext.setLocalProperty(
-            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, 
originalEnabledGluten)
+            GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, 
originalEnabledGluten)
         } else {
           spark.sparkContext.setLocalProperty(
-            QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+            GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
         }
         // --- modified end
       }
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 9f455fb27b..5d05bdb868 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -28,8 +28,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
 import org.apache.spark.sql.delta.util.DeltaFileOperations
 import 
org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
-
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.broadcast.Broadcast
@@ -254,9 +253,9 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
 
       // --- modified start
       val originalEnabledGluten =
-        
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+        
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
       // gluten can not support vacuum command
-      
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+      
spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
       // --- modified end
 
       val validFiles =
@@ -461,10 +460,10 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
           // --- modified start
           if (originalEnabledGluten != null) {
             spark.sparkContext.setLocalProperty(
-              QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, 
originalEnabledGluten)
+              GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, 
originalEnabledGluten)
           } else {
             spark.sparkContext.setLocalProperty(
-              QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
+              GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
           }
           // --- modified end
         }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 9a1ead1e6c..4323dc9558 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -26,7 +26,6 @@ import org.apache.gluten.extension.injector.{RuleInjector, 
SparkInjector}
 import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, 
RasInjector}
 import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, 
GlutenClickhouseSqlParser}
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, 
EqualToRewrite}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -39,8 +38,6 @@ import org.apache.spark.util.SparkPlanRules
 class CHRuleApi extends RuleApi {
   import CHRuleApi._
   override def injectRules(injector: RuleInjector): Unit = {
-    injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
-
     injectSpark(injector.spark)
     injectLegacy(injector.gluten.legacy)
     injectRas(injector.gluten.ras)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
index 6f5afa9726..6d99a2ceec 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHAQEPropagateEmptyRelation.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
-import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.LeftAnti
@@ -28,8 +27,7 @@ import 
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
ClickHouseBuildSideRelation}
 
 case class CHAQEPropagateEmptyRelation(session: SparkSession) extends 
Rule[SparkPlan] {
-
-  def apply(plan: SparkPlan): SparkPlan = PhysicalPlanSelector.maybe(session, 
plan) {
+  def apply(plan: SparkPlan): SparkPlan = {
     if (!(session.conf.get(CHBackendSettings.GLUTEN_AQE_PROPAGATEEMPTY, 
"true").toBoolean)) {
       plan
     } else {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
index 842dc76153..ec465a3c15 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar._
 import 
org.apache.gluten.extension.columnar.FallbackTags.EncodeFallbackTagImplicits
-import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
@@ -38,7 +37,7 @@ import scala.util.control.Breaks.{break, breakable}
 // to columnar while BHJ fallbacks, BroadcastExec need to be tagged not 
transformable when applying
 // queryStagePrepRules.
 case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) 
extends Rule[SparkPlan] {
-  override def apply(plan: SparkPlan): SparkPlan = 
PhysicalPlanSelector.maybe(session, plan) {
+  override def apply(plan: SparkPlan): SparkPlan = {
     val columnarConf: GlutenConfig = GlutenConfig.getConf
     plan.foreach {
       case bhj: BroadcastHashJoinExec =>
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 7cddba157d..a838c463c3 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -27,7 +27,6 @@ import 
org.apache.gluten.extension.columnar.transition.{InsertTransitions, Remov
 import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
 import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, 
RasInjector}
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
GlutenFallbackReporter}
 
@@ -35,8 +34,6 @@ class VeloxRuleApi extends RuleApi {
   import VeloxRuleApi._
 
   override def injectRules(injector: RuleInjector): Unit = {
-    injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
-
     injectSpark(injector.spark)
     injectLegacy(injector.gluten.legacy)
     injectRas(injector.gluten.ras)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
index c4684e5a4b..b1b0b813f6 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.datasource
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.datasource.v2.ArrowCSVTable
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.LogicalPlanSelector
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.SparkSession
@@ -40,7 +39,7 @@ import scala.collection.convert.ImplicitConversions.`map 
AsScala`
 
 @Experimental
 case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] 
{
-  override def apply(plan: LogicalPlan): LogicalPlan = 
LogicalPlanSelector.maybe(session, plan) {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
     if (!BackendsApiManager.getSettings.enableNativeArrowReadFiles()) {
       return plan
     }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
index dba8df5cf1..adfc6ca742 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.extension
 import org.apache.gluten.datasource.ArrowCSVFileFormat
 import org.apache.gluten.datasource.v2.ArrowCSVScan
 import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
-import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -27,7 +26,7 @@ import 
org.apache.spark.sql.execution.{ArrowFileSourceScanExec, FileSourceScanEx
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
 case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
-  override def apply(plan: SparkPlan): SparkPlan = 
PhysicalPlanSelector.maybe(spark, plan) {
+  override def apply(plan: SparkPlan): SparkPlan = {
     plan.transformUp {
       case plan: FileSourceScanExec if 
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
         ArrowFileSourceScanExec(plan)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
index 735d6ad41b..56a3d86a90 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
@@ -20,14 +20,13 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.expression.VeloxBloomFilterMightContain
 import org.apache.gluten.expression.aggregate.VeloxBloomFilterAggregate
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
 
 case class BloomFilterMightContainJointRewriteRule(spark: SparkSession) 
extends Rule[SparkPlan] {
-  override def apply(plan: SparkPlan): SparkPlan = 
PhysicalPlanSelector.maybe(spark, plan) {
+  override def apply(plan: SparkPlan): SparkPlan = {
     if (!GlutenConfig.getConf.enableNativeBloomFilter) {
       return plan
     }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
index 85d31b8d02..48541b234e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CollectRewriteRule.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension
 
 import org.apache.gluten.expression.ExpressionMappings
 import org.apache.gluten.expression.aggregate.{VeloxCollectList, 
VeloxCollectSet}
-import org.apache.gluten.utils.LogicalPlanSelector
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Expression, WindowExpression}
@@ -36,7 +35,7 @@ import scala.reflect.{classTag, ClassTag}
  */
 case class CollectRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] {
   import CollectRewriteRule._
-  override def apply(plan: LogicalPlan): LogicalPlan = 
LogicalPlanSelector.maybe(spark, plan) {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
     if (!has[VeloxCollectSet] && !has[VeloxCollectList]) {
       return plan
     }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
index 8b44005646..8ceee3d573 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/HLLRewriteRule.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.extension
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.expression.aggregate.HLLAdapter
-import org.apache.gluten.utils.LogicalPlanSelector
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Literal
@@ -29,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXP
 import org.apache.spark.sql.types._
 
 case class HLLRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = 
LogicalPlanSelector.maybe(spark, plan) {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) {
       case a: Aggregate =>
         
a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index ba42b57f1b..7ddab8769f 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -64,35 +64,6 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
       .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
   }
 
-  test("field names contain non-ASCII characters") {
-    withTempPath {
-      path =>
-        // scalastyle:off nonascii
-        Seq((1, 2, 3, 4)).toDF("товары", "овары", "国ⅵ", 
"中文").write.parquet(path.getCanonicalPath)
-        // scalastyle:on
-        
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
-        runQueryAndCompare("select * from view") {
-          checkGlutenOperatorMatch[FileSourceScanExecTransformer]
-        }
-    }
-
-    withTempPath {
-      path =>
-        // scalastyle:off nonascii
-        spark.range(10).toDF("中文").write.parquet(path.getCanonicalPath)
-        
spark.read.parquet(path.getCanonicalPath).filter("`中文`>1").createOrReplaceTempView("view")
-        // scalastyle:on
-        runQueryAndCompare("select * from view") {
-          checkGlutenOperatorMatch[FileSourceScanExecTransformer]
-        }
-    }
-  }
-
-  test("simple_select") {
-    val df = runQueryAndCompare("select * from lineitem limit 1") { _ => }
-    checkLengthAndPlan(df, 1)
-  }
-
   test("select_part_column") {
     val df = runQueryAndCompare("select l_shipdate, l_orderkey from lineitem 
limit 1") {
       df =>
@@ -1924,4 +1895,45 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
       }
     }
   }
+
+  // Since https://github.com/apache/incubator-gluten/pull/7330.
+  test("field names contain non-ASCII characters") {
+    withTempPath {
+      path =>
+        // scalastyle:off nonascii
+        Seq((1, 2, 3, 4)).toDF("товары", "овары", "国ⅵ", 
"中文").write.parquet(path.getCanonicalPath)
+        // scalastyle:on
+        
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
+        runQueryAndCompare("select * from view") {
+          checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+        }
+    }
+
+    withTempPath {
+      path =>
+        // scalastyle:off nonascii
+        spark.range(10).toDF("中文").write.parquet(path.getCanonicalPath)
+        
spark.read.parquet(path.getCanonicalPath).filter("`中文`>1").createOrReplaceTempView("view")
+        // scalastyle:on
+        runQueryAndCompare("select * from view") {
+          checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+        }
+    }
+  }
+
+  test("test 'spark.gluten.enabled'") {
+    withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "true") {
+      runQueryAndCompare("select * from lineitem limit 1") {
+        checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+      }
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        runQueryAndCompare("select * from lineitem limit 1") {
+          checkSparkOperatorMatch[FileSourceScanExec]
+        }
+      }
+      runQueryAndCompare("select * from lineitem limit 1") {
+        checkGlutenOperatorMatch[FileSourceScanExecTransformer]
+      }
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index 697b41da9e..5c8b2260d3 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -16,19 +16,43 @@
  */
 package org.apache.gluten.extension
 
+import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backend.Backend
 import org.apache.gluten.extension.injector.RuleInjector
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSessionExtensions
 
-private[gluten] class GlutenSessionExtensions extends (SparkSessionExtensions 
=> Unit) {
+private[gluten] class GlutenSessionExtensions
+  extends (SparkSessionExtensions => Unit)
+  with Logging {
+  import GlutenSessionExtensions._
   override def apply(exts: SparkSessionExtensions): Unit = {
     val injector = new RuleInjector(exts)
+    injector.control.disableOn {
+      session =>
+        val glutenEnabledGlobally = session.conf
+          .get(GlutenConfig.GLUTEN_ENABLED_KEY, 
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
+          .toBoolean
+        val disabled = !glutenEnabledGlobally
+        logDebug(s"Gluten is disabled by variable: glutenEnabledGlobally: 
$glutenEnabledGlobally")
+        disabled
+    }
+    injector.control.disableOn {
+      session =>
+        val glutenEnabledForThread =
+          
Option(session.sparkContext.getLocalProperty(GLUTEN_ENABLE_FOR_THREAD_KEY))
+            .forall(_.toBoolean)
+        val disabled = !glutenEnabledForThread
+        logDebug(s"Gluten is disabled by variable: glutenEnabledForThread: 
$glutenEnabledForThread")
+        disabled
+    }
     Backend.get().injectRules(injector)
     injector.inject()
   }
 }
 
-private[gluten] object GlutenSessionExtensions {
+object GlutenSessionExtensions {
   val GLUTEN_SESSION_EXTENSION_NAME: String = 
classOf[GlutenSessionExtensions].getCanonicalName
+  val GLUTEN_ENABLE_FOR_THREAD_KEY: String = "gluten.enabledForCurrentThread"
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index d275c58564..ecf13967e3 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.util.AdaptiveContext
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
 
 trait ColumnarRuleApplier {
@@ -28,8 +27,6 @@ trait ColumnarRuleApplier {
 }
 
 object ColumnarRuleApplier {
-  type ColumnarRuleBuilder = ColumnarRuleCall => Rule[SparkPlan]
-
   class ColumnarRuleCall(
       val session: SparkSession,
       val ac: AdaptiveContext,
@@ -38,10 +35,4 @@ object ColumnarRuleApplier {
       new GlutenConfig(session.sessionState.conf)
     }
   }
-
-  // A temporary workaround for applying toggle `spark.gluten.enabled`, to be 
removed.
-  trait SkipCondition {
-    // True if the rule execution should be skipped.
-    def skip(session: SparkSession, plan: SparkPlan): Boolean
-  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index c4d53653c0..7ddeb33c7d 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -16,8 +16,8 @@
  */
 package org.apache.gluten.extension.columnar.enumerated
 
-import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.columnar.{ColumnarRuleApplier, 
ColumnarRuleExecutor}
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import org.apache.gluten.extension.util.AdaptiveContext
 import org.apache.gluten.logging.LogLevelUtil
 
@@ -38,17 +38,13 @@ import org.apache.spark.sql.execution.SparkPlan
 @Experimental
 class EnumeratedApplier(
     session: SparkSession,
-    skipConditions: Seq[SkipCondition],
-    ruleBuilders: Seq[ColumnarRuleBuilder])
+    ruleBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
   extends ColumnarRuleApplier
   with Logging
   with LogLevelUtil {
   private val adaptiveContext = AdaptiveContext(session)
 
   override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
-    if (skipConditions.exists(_.skip(session, plan))) {
-      return plan
-    }
     val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
     val finalPlan = maybeAqe {
       apply0(ruleBuilders.map(b => b(call)), plan)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index a039e9a562..e4825d2eb7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -16,8 +16,8 @@
  */
 package org.apache.gluten.extension.columnar.heuristic
 
-import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.columnar.{ColumnarRuleApplier, 
ColumnarRuleExecutor}
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import org.apache.gluten.extension.util.AdaptiveContext
 import org.apache.gluten.logging.LogLevelUtil
 
@@ -32,20 +32,16 @@ import org.apache.spark.sql.execution.SparkPlan
  */
 class HeuristicApplier(
     session: SparkSession,
-    skipConditions: Seq[SkipCondition],
-    transformBuilders: Seq[ColumnarRuleBuilder],
-    fallbackPolicyBuilders: Seq[ColumnarRuleBuilder],
-    postBuilders: Seq[ColumnarRuleBuilder],
-    finalBuilders: Seq[ColumnarRuleBuilder])
+    transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+    fallbackPolicyBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+    postBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]],
+    finalBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]])
   extends ColumnarRuleApplier
   with Logging
   with LogLevelUtil {
   private val adaptiveContext = AdaptiveContext(session)
 
   override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
-    if (skipConditions.exists(_.skip(session, plan))) {
-      return plan
-    }
     val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
     makeRule(call).apply(plan)
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index db3310151f..498f040c90 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -19,67 +19,62 @@ package org.apache.gluten.extension.injector
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.GlutenColumnarRule
 import org.apache.gluten.extension.columnar.ColumnarRuleApplier
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, 
SkipCondition}
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 
 import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
 
 import scala.collection.mutable
 
 /** Injector used to inject query planner rules into Gluten. */
-class GlutenInjector private[injector] {
+class GlutenInjector private[injector] (control: InjectorControl) {
   import GlutenInjector._
-  private val skipConditions: mutable.ListBuffer[SkipCondition] = 
mutable.ListBuffer()
   val legacy: LegacyInjector = new LegacyInjector()
   val ras: RasInjector = new RasInjector()
 
   private[injector] def inject(extensions: SparkSessionExtensions): Unit = {
-    extensions.injectColumnar(session => new GlutenColumnarRule(session, 
applier))
+    extensions.injectColumnar(
+      control.disabler().wrapColumnarRule(s => new GlutenColumnarRule(s, 
applier)))
   }
 
   private def applier(session: SparkSession): ColumnarRuleApplier = {
     val conf = new GlutenConfig(session.sessionState.conf)
     if (conf.enableRas) {
-      return ras.createApplier(session, skipConditions.toSeq)
+      return ras.createApplier(session)
     }
-    legacy.createApplier(session, skipConditions.toSeq)
-  }
-
-  def skipOn(skipCondition: SkipCondition): Unit = {
-    skipConditions += skipCondition
+    legacy.createApplier(session)
   }
 }
 
 object GlutenInjector {
   class LegacyInjector {
-    private val transformBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
-    private val fallbackPolicyBuilders = 
mutable.Buffer.empty[ColumnarRuleBuilder]
-    private val postBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
-    private val finalBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
+    private val transformBuilders = mutable.Buffer.empty[ColumnarRuleCall => 
Rule[SparkPlan]]
+    private val fallbackPolicyBuilders = mutable.Buffer.empty[ColumnarRuleCall 
=> Rule[SparkPlan]]
+    private val postBuilders = mutable.Buffer.empty[ColumnarRuleCall => 
Rule[SparkPlan]]
+    private val finalBuilders = mutable.Buffer.empty[ColumnarRuleCall => 
Rule[SparkPlan]]
 
-    def injectTransform(builder: ColumnarRuleBuilder): Unit = {
+    def injectTransform(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
       transformBuilders += builder
     }
 
-    def injectFallbackPolicy(builder: ColumnarRuleBuilder): Unit = {
+    def injectFallbackPolicy(builder: ColumnarRuleCall => Rule[SparkPlan]): 
Unit = {
       fallbackPolicyBuilders += builder
     }
 
-    def injectPost(builder: ColumnarRuleBuilder): Unit = {
+    def injectPost(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
       postBuilders += builder
     }
 
-    def injectFinal(builder: ColumnarRuleBuilder): Unit = {
+    def injectFinal(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
       finalBuilders += builder
     }
 
-    private[injector] def createApplier(
-        session: SparkSession,
-        skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
+    private[injector] def createApplier(session: SparkSession): 
ColumnarRuleApplier = {
       new HeuristicApplier(
         session,
-        skipConditions,
         transformBuilders.toSeq,
         fallbackPolicyBuilders.toSeq,
         postBuilders.toSeq,
@@ -88,16 +83,14 @@ object GlutenInjector {
   }
 
   class RasInjector {
-    private val ruleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
+    private val ruleBuilders = mutable.Buffer.empty[ColumnarRuleCall => 
Rule[SparkPlan]]
 
-    def inject(builder: ColumnarRuleBuilder): Unit = {
+    def inject(builder: ColumnarRuleCall => Rule[SparkPlan]): Unit = {
       ruleBuilders += builder
     }
 
-    private[injector] def createApplier(
-        session: SparkSession,
-        skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
-      new EnumeratedApplier(session, skipConditions, ruleBuilders.toSeq)
+    private[injector] def createApplier(session: SparkSession): 
ColumnarRuleApplier = {
+      new EnumeratedApplier(session, ruleBuilders.toSeq)
     }
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
new file mode 100644
index 0000000000..fd16d3dbad
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/InjectorControl.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.injector
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
+
+import java.lang.reflect.{InvocationHandler, InvocationTargetException, Method}
+
+import scala.collection.mutable
+
+class InjectorControl private[injector] () {
+  import InjectorControl._
+  private val disablerBuffer: mutable.ListBuffer[Disabler] =
+    mutable.ListBuffer()
+  private var combined: Disabler = (_: SparkSession) => false
+
+  def disableOn(one: Disabler): Unit = synchronized {
+    disablerBuffer += one
+    // Update the combined disabler.
+    val disablerList = disablerBuffer.toList
+    combined = s => disablerList.exists(_.disabled(s))
+  }
+
+  private[injector] def disabler(): Disabler = synchronized {
+    combined
+  }
+}
+
+object InjectorControl {
+  trait Disabler {
+    // If true, the injected rule will be disabled.
+    protected[injector] def disabled(session: SparkSession): Boolean
+  }
+
+  private object Disabler {
+    implicit private[injector] class DisablerOps(disabler: Disabler) {
+      def wrapRule[TreeType <: TreeNode[_]](
+          ruleBuilder: SparkSession => Rule[TreeType]): SparkSession => 
Rule[TreeType] = session =>
+        {
+          val rule = ruleBuilder(session)
+          new Rule[TreeType] with DisablerAware {
+            override val ruleName: String = rule.ruleName
+            override def apply(plan: TreeType): TreeType = {
+              if (disabler.disabled(session)) {
+                return plan
+              }
+              rule(plan)
+            }
+          }
+        }
+
+      def wrapStrategy(strategyBuilder: StrategyBuilder): StrategyBuilder = 
session => {
+        val strategy = strategyBuilder(session)
+        new Strategy with DisablerAware {
+          override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+            if (disabler.disabled(session)) {
+              return Nil
+            }
+            strategy(plan)
+          }
+        }
+      }
+
+      def wrapParser(parserBuilder: ParserBuilder): ParserBuilder = (session, 
parser) => {
+        val before = parser
+        val after = parserBuilder(session, before)
+        // Use dynamic proxy to get rid of 3.2 compatibility issues.
+        java.lang.reflect.Proxy
+          .newProxyInstance(
+            classOf[ParserInterface].getClassLoader,
+            Array(classOf[ParserInterface], classOf[DisablerAware]),
+            new InvocationHandler {
+              override def invoke(proxy: Any, method: Method, args: 
Array[AnyRef]): AnyRef = {
+                try {
+                  if (disabler.disabled(session)) {
+                    return method.invoke(before, args: _*)
+                  }
+                  method.invoke(after, args: _*)
+                } catch {
+                  case e: InvocationTargetException =>
+                    // Unwrap the UTE.
+                    throw e.getCause
+                }
+              }
+            }
+          )
+          .asInstanceOf[ParserInterface]
+      }
+
+      def wrapFunction(functionDescription: FunctionDescription): 
FunctionDescription = {
+        val (identifier, info, builder) = functionDescription
+        val wrappedBuilder: FunctionBuilder = children => {
+          if (
+            disabler.disabled(SparkSession.getActiveSession.getOrElse(
+              throw new IllegalStateException("Active Spark session not 
found")))
+          ) {
+            throw new UnsupportedOperationException(
+              s"Function ${info.getName} is not callable as Gluten is 
disabled")
+          }
+          builder(children)
+        }
+        (identifier, info, wrappedBuilder)
+      }
+
+      def wrapColumnarRule(columnarRuleBuilder: ColumnarRuleBuilder): 
ColumnarRuleBuilder =
+        session => {
+          val columnarRule = columnarRuleBuilder(session)
+          new ColumnarRule with DisablerAware {
+            override val preColumnarTransitions: Rule[SparkPlan] = {
+              new Rule[SparkPlan] {
+                override def apply(plan: SparkPlan): SparkPlan = {
+                  if (disabler.disabled(session)) {
+                    return plan
+                  }
+                  columnarRule.preColumnarTransitions.apply(plan)
+                }
+              }
+            }
+
+            override val postColumnarTransitions: Rule[SparkPlan] = {
+              new Rule[SparkPlan] {
+                override def apply(plan: SparkPlan): SparkPlan = {
+                  if (disabler.disabled(session)) {
+                    return plan
+                  }
+                  columnarRule.postColumnarTransitions.apply(plan)
+                }
+              }
+            }
+          }
+        }
+    }
+  }
+
+  /**
+   * The entity (could be a rule, a parser, cost evaluator) that is 
dynamically injected to Spark,
+   * whose effectivity is under the control by a disabler.
+   */
+  trait DisablerAware
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
index 60a649387d..c497a24a07 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
@@ -20,8 +20,9 @@ import org.apache.spark.sql.SparkSessionExtensions
 
 /** Injector used to inject query planner rules into Spark and Gluten. */
 class RuleInjector(extensions: SparkSessionExtensions) {
-  val spark: SparkInjector = new SparkInjector(extensions)
-  val gluten: GlutenInjector = new GlutenInjector()
+  val control = new InjectorControl()
+  val spark: SparkInjector = new SparkInjector(control, extensions)
+  val gluten: GlutenInjector = new GlutenInjector(control)
 
   private[extension] def inject(): Unit = {
     // The regular Spark rules already injected with the `injectRules` of 
`RuleApi` directly.
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
index 847a9349e4..87942c4155 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
@@ -16,44 +16,37 @@
  */
 package org.apache.gluten.extension.injector
 
-import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy}
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.SparkSessionExtensions
 
 /** Injector used to inject query planner rules into Spark. */
-class SparkInjector private[injector] (extensions: SparkSessionExtensions) {
-
-  def injectQueryStagePrepRule(builder: SparkSession => Rule[SparkPlan]): Unit 
= {
-    extensions.injectQueryStagePrepRule(builder)
+class SparkInjector private[injector] (
+    control: InjectorControl,
+    extensions: SparkSessionExtensions) {
+  def injectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit = {
+    extensions.injectQueryStagePrepRule(control.disabler().wrapRule(builder))
   }
 
-  def injectResolutionRule(builder: SparkSession => Rule[LogicalPlan]): Unit = 
{
-    extensions.injectResolutionRule(builder)
+  def injectResolutionRule(builder: RuleBuilder): Unit = {
+    extensions.injectResolutionRule(control.disabler().wrapRule(builder))
   }
 
-  def injectPostHocResolutionRule(builder: SparkSession => Rule[LogicalPlan]): 
Unit = {
-    extensions.injectPostHocResolutionRule(builder)
+  def injectPostHocResolutionRule(builder: RuleBuilder): Unit = {
+    
extensions.injectPostHocResolutionRule(control.disabler().wrapRule(builder))
   }
 
-  def injectOptimizerRule(builder: SparkSession => Rule[LogicalPlan]): Unit = {
-    extensions.injectOptimizerRule(builder)
+  def injectOptimizerRule(builder: RuleBuilder): Unit = {
+    extensions.injectOptimizerRule(control.disabler().wrapRule(builder))
   }
 
-  def injectPlannerStrategy(builder: SparkSession => Strategy): Unit = {
-    extensions.injectPlannerStrategy(builder)
+  def injectPlannerStrategy(builder: StrategyBuilder): Unit = {
+    extensions.injectPlannerStrategy(control.disabler().wrapStrategy(builder))
   }
 
-  def injectParser(builder: (SparkSession, ParserInterface) => 
ParserInterface): Unit = {
-    extensions.injectParser(builder)
+  def injectParser(builder: ParserBuilder): Unit = {
+    extensions.injectParser(control.disabler().wrapParser(builder))
   }
 
-  def injectFunction(
-      functionDescription: (FunctionIdentifier, ExpressionInfo, 
FunctionBuilder)): Unit = {
-    extensions.injectFunction(functionDescription)
+  def injectFunction(functionDescription: FunctionDescription): Unit = {
+    
extensions.injectFunction(control.disabler().wrapFunction(functionDescription))
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/package.scala 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/package.scala
new file mode 100644
index 0000000000..7262dbdc2e
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/package.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
+
+package object injector {
+  type RuleBuilder = SparkSession => Rule[LogicalPlan]
+  type StrategyBuilder = SparkSession => Strategy
+  type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
+  type FunctionDescription = (FunctionIdentifier, ExpressionInfo, 
FunctionBuilder)
+  type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
+  type ColumnarRuleBuilder = SparkSession => ColumnarRule
+}
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 5ebf8883c6..0d063d22f8 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -63,7 +63,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
       // Partition key of string type.
-      withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
         // Gluten does not support write iceberg table.
         spark.sql(s"""
                      |create table $leftTable(id int, name string, p string)
@@ -84,7 +84,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
 
       // Partition key of integer type.
       withSQLConf(
-        GlutenConfig.GLUTEN_ENABLE_KEY -> "false"
+        GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
       ) {
         // Gluten does not support write iceberg table.
         spark.sql(s"""
@@ -143,7 +143,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
       // Partition key of string type.
-      withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
         // Gluten does not support write iceberg table.
         spark.sql(s"""
                      |create table $leftTable(id int, name string, p int)
@@ -164,7 +164,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
 
       // Partition key of integer type.
       withSQLConf(
-        GlutenConfig.GLUTEN_ENABLE_KEY -> "false"
+        GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
       ) {
         // Gluten does not support write iceberg table.
         spark.sql(s"""
@@ -223,7 +223,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
       // Partition key of string type.
-      withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
         // Gluten does not support write iceberg table.
         spark.sql(s"""
                      |create table $leftTable(id int, name string, p int)
@@ -244,7 +244,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
 
       // Partition key of integer type.
       withSQLConf(
-        GlutenConfig.GLUTEN_ENABLE_KEY -> "false"
+        GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
       ) {
         // Gluten does not support write iceberg table.
         spark.sql(s"""
@@ -338,7 +338,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
 
   test("iceberg read mor table - delete and update") {
     withTable("iceberg_mor_tb") {
-      withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
         spark.sql("""
                     |create table iceberg_mor_tb (
                     |  id int,
@@ -390,7 +390,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
 
   test("iceberg read mor table - merge into") {
     withTable("iceberg_mor_tb", "merge_into_source_tb") {
-      withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
         spark.sql("""
                     |create table iceberg_mor_tb (
                     |  id int,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
deleted file mode 100644
index 2d8d7b29e4..0000000000
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.utils
-
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.columnar.ColumnarRuleApplier.SkipCondition
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
-
-object PhysicalPlanSelector extends QueryPlanSelector[SparkPlan] {
-  val skipCond: SkipCondition = (session: SparkSession, plan: SparkPlan) =>
-    !shouldUseGluten(session, plan)
-}
-
-object LogicalPlanSelector extends QueryPlanSelector[LogicalPlan] {}
-
-/** Select to decide whether a Spark plan can be accepted by Gluten for 
further execution. */
-abstract class QueryPlanSelector[T <: QueryPlan[_]] extends Logging {
-
-  private[this] def stackTrace(max: Int = 5): String = {
-    val trim: Int = 6
-    new Throwable().getStackTrace().slice(trim, trim + max).mkString("\n")
-  }
-
-  private def isGlutenEnabledForCurrentThread(session: SparkSession): Boolean 
= {
-    val enabled =
-      
session.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
-    if (enabled != null) {
-      enabled.toBoolean
-    } else {
-      true
-    }
-  }
-
-  def shouldUseGluten(session: SparkSession, plan: T): Boolean = {
-    val glutenEnabled = session.conf
-      .get(GlutenConfig.GLUTEN_ENABLE_KEY, 
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
-      .toBoolean && isGlutenEnabledForCurrentThread(session)
-    if (log.isDebugEnabled) {
-      logDebug(s"shouldUseGluten: $glutenEnabled")
-      logDebug(
-        s"=========================\n" +
-          s"running shouldUseGluten from:\n${stackTrace()}\n" +
-          s"plan:\n${plan.treeString}\n" +
-          "=========================")
-    }
-    glutenEnabled
-  }
-
-  def maybe(session: SparkSession, plan: T)(func: => T): T = {
-    if (shouldUseGluten(session, plan)) func else plan
-  }
-
-  def maybeNil(session: SparkSession, plan: T)(func: => Seq[SparkPlan]): 
Seq[SparkPlan] = {
-    if (shouldUseGluten(session, plan)) func else Nil
-  }
-}
-
-object QueryPlanSelector {
-  // control the usage of gluten at thread level
-  val GLUTEN_ENABLE_FOR_THREAD_KEY = "gluten.enabledForCurrentThread"
-}
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 82d37b8ca1..fc31289119 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,18 +18,18 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
 import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
RemoveFallbackTagRule}
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.rules.Rule
 
 class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   import FallbackStrategiesSuite._
@@ -142,14 +142,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait 
{
 
     val thread = new Thread(
       () => {
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
         val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val fallbackScanExec = fallbackPlan.collect {
           case e: FileSourceScanExec if 
!e.isInstanceOf[BasicScanExecTransformer] => true
         }
         assert(fallbackScanExec.size == 1)
 
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 null)
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
         val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val noFallbackScanExec = noFallbackPlan.collect { case _: 
BasicScanExecTransformer => true }
         assert(noFallbackScanExec.size == 1)
@@ -162,10 +164,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
 private object FallbackStrategiesSuite {
   def newRuleApplier(
       spark: SparkSession,
-      transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+      transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]): 
HeuristicApplier = {
     new HeuristicApplier(
       spark,
-      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+    assert(
+      spark.sessionState.columnarRules
+        .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
     if (BackendTestUtils.isCHBackendLoaded()) {
-      assert(
-        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+      
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
     } else {
       assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
     }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 866c16d52f..3e77672131 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,13 +18,12 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
 import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackTags, RemoveFallbackTagRule}
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -153,14 +152,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait 
{
 
     val thread = new Thread(
       () => {
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
         val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val fallbackScanExec = fallbackPlan.collect {
           case e: FileSourceScanExec if 
!e.isInstanceOf[BasicScanExecTransformer] => true
         }
         assert(fallbackScanExec.size == 1)
 
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 null)
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
         val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val noFallbackScanExec = noFallbackPlan.collect { case _: 
BasicScanExecTransformer => true }
         assert(noFallbackScanExec.size == 1)
@@ -173,10 +174,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
 private object FallbackStrategiesSuite {
   def newRuleApplier(
       spark: SparkSession,
-      transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+      transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]): 
HeuristicApplier = {
     new HeuristicApplier(
       spark,
-      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+    assert(
+      spark.sessionState.columnarRules
+        .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
     if (BackendTestUtils.isCHBackendLoaded()) {
-      assert(
-        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+      
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
     } else {
       assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
     }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 866c16d52f..3e77672131 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,13 +18,12 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
 import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackTags, RemoveFallbackTagRule}
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -153,14 +152,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait 
{
 
     val thread = new Thread(
       () => {
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
         val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val fallbackScanExec = fallbackPlan.collect {
           case e: FileSourceScanExec if 
!e.isInstanceOf[BasicScanExecTransformer] => true
         }
         assert(fallbackScanExec.size == 1)
 
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 null)
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
         val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val noFallbackScanExec = noFallbackPlan.collect { case _: 
BasicScanExecTransformer => true }
         assert(noFallbackScanExec.size == 1)
@@ -173,10 +174,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
 private object FallbackStrategiesSuite {
   def newRuleApplier(
       spark: SparkSession,
-      transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+      transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]): 
HeuristicApplier = {
     new HeuristicApplier(
       spark,
-      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+    assert(
+      spark.sessionState.columnarRules
+        .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
     if (BackendTestUtils.isCHBackendLoaded()) {
-      assert(
-        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+      
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
     } else {
       assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
     }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 6318c0e06b..a214d9755e 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,13 +18,12 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
 import org.apache.gluten.extension.columnar.{ExpandFallbackPolicy, 
FallbackTags, RemoveFallbackTagRule}
-import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -154,14 +153,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait 
{
 
     val thread = new Thread(
       () => {
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
         val fallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val fallbackScanExec = fallbackPlan.collect {
           case e: FileSourceScanExec if 
!e.isInstanceOf[BasicScanExecTransformer] => true
         }
         assert(fallbackScanExec.size == 1)
 
-        
spark.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 null)
+        spark.sparkContext
+          
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, null)
         val noFallbackPlan = spark.sql(sql).queryExecution.executedPlan
         val noFallbackScanExec = noFallbackPlan.collect { case _: 
BasicScanExecTransformer => true }
         assert(noFallbackScanExec.size == 1)
@@ -174,10 +175,9 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
 private object FallbackStrategiesSuite {
   def newRuleApplier(
       spark: SparkSession,
-      transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
+      transformBuilders: Seq[ColumnarRuleCall => Rule[SparkPlan]]): 
HeuristicApplier = {
     new HeuristicApplier(
       spark,
-      Seq(PhysicalPlanSelector.skipCond),
       transformBuilders,
       List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), 
c.ac.originalPlan())),
       List(
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index b3b8483c3b..4924ee4c4f 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.extension
 
-import org.apache.gluten.extension.GlutenColumnarRule
+import org.apache.gluten.extension.injector.InjectorControl
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
@@ -31,7 +31,9 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
   }
 
   testGluten("test gluten extensions") {
-    
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
+    assert(
+      spark.sessionState.columnarRules
+        .exists(_.isInstanceOf[InjectorControl.DisablerAware]))
 
     
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
     
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
@@ -39,8 +41,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait 
{
     
assert(spark.sessionState.analyzer.extendedCheckRules.contains(MyCheckRule(spark)))
     
assert(spark.sessionState.optimizer.batches.flatMap(_.rules).contains(MyRule(spark)))
     if (BackendTestUtils.isCHBackendLoaded()) {
-      assert(
-        
spark.sessionState.sqlParser.getClass.getSimpleName.equals("GlutenClickhouseSqlParser"))
+      
assert(spark.sessionState.sqlParser.isInstanceOf[InjectorControl.DisablerAware])
     } else {
       assert(spark.sessionState.sqlParser.isInstanceOf[MyParser])
     }
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index de111d33ed..dfe7e4a082 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -471,7 +471,7 @@ object GlutenConfig {
   import SQLConf._
 
   var GLUTEN_ENABLE_BY_DEFAULT = true
-  val GLUTEN_ENABLE_KEY = "spark.gluten.enabled"
+  val GLUTEN_ENABLED_KEY = "spark.gluten.enabled"
   val GLUTEN_LIB_NAME = "spark.gluten.sql.columnar.libname"
   val GLUTEN_LIB_PATH = "spark.gluten.sql.columnar.libpath"
   val GLUTEN_EXECUTOR_LIB_PATH = "spark.gluten.sql.columnar.executor.libpath"
@@ -810,7 +810,7 @@ object GlutenConfig {
   }
 
   val GLUTEN_ENABLED =
-    buildConf(GLUTEN_ENABLE_KEY)
+    buildConf(GLUTEN_ENABLED_KEY)
       .internal()
       .doc("Whether to enable gluten. Default value is true. Just an 
experimental property." +
         " Recommend to enable/disable Gluten through the setting for 
spark.plugins.")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to