This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-v6.0.0-decimal-cast
in repository https://gitbox.apache.org/repos/asf/auron.git

commit c44088557602444e766a05c918780f7ff3b43dfe
Author: zhangli20 <[email protected]>
AuthorDate: Thu Dec 11 20:41:28 2025 +0800

    introduce BlazeRuleEngine from spark241kwaiae to spark3
---
 pom.xml                                            |  2 +-
 .../org/apache/spark/sql/blaze/ShimsImpl.scala     | 81 ++++++++++++++++++++--
 2 files changed, 78 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0301c25f..1a186ce6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -416,7 +416,7 @@
       <id>spark-3.5</id>
       <properties>
         <shimName>spark-3.5</shimName>
-        <pkgSuffix>-kwai-adapt-cele060-fix-non-partition-len</pkgSuffix>
+        <pkgSuffix>-kwai-hotfix-rule-engine</pkgSuffix>
         <shimPkg>spark-extension-shims-spark3</shimPkg>
         <javaVersion>1.8</javaVersion>
         <scalaVersion>2.12</scalaVersion>
diff --git 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
index 44ce358e..8b74e735 100644
--- 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
+++ 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
@@ -17,7 +17,6 @@ package org.apache.spark.sql.blaze
 
 import java.io.File
 import java.util.UUID
-
 import org.apache.commons.lang3.reflect.FieldUtils
 import org.apache.spark.OneToOneDependency
 import org.apache.spark.ShuffleDependency
@@ -25,6 +24,7 @@ import org.apache.spark.SparkEnv
 import org.apache.spark.SparkException
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.OptionalConfigEntry
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.IndexShuffleBlockResolver
@@ -37,7 +37,7 @@ import org.apache.spark.sql.SparkSession
 import 
org.apache.spark.sql.blaze.BlazeConverters.ForceNativeExecutionWrapperBase
 import org.apache.spark.sql.blaze.NativeConverters.NativeExprWrapperBase
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.Generator
@@ -101,7 +101,7 @@ import org.apache.spark.sql.execution.blaze.plan._
 import org.apache.spark.sql.execution.blaze.shuffle.RssPartitionWriterBase
 import 
org.apache.spark.sql.execution.blaze.shuffle.celeborn.BlazeCelebornShuffleManager
 import 
org.apache.spark.sql.execution.blaze.shuffle.BlazeBlockStoreShuffleReaderBase
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, PartitionedFile}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.blaze.plan.NativeBroadcastJoinExec
 import 
org.apache.spark.sql.execution.joins.blaze.plan.NativeShuffledHashJoinExecProvider
@@ -112,12 +112,17 @@ import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.storage.FileSegment
 import 
org.apache.spark.sql.execution.blaze.shuffle.uniffle.BlazeUniffleShuffleManager
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.storage.ShuffleDataBlockId
 import org.apache.spark.util.ExternalBlockStoreUtils
-import org.blaze.{protobuf => pb, sparkver}
+import org.blaze.{sparkver, protobuf => pb}
 
 class ShimsImpl extends Shims with Logging {
 
@@ -156,6 +161,10 @@ class ShimsImpl extends Shims with Logging {
         logWarning(s"${BlazeConf.FORCE_SHUFFLED_HASH_JOIN.key} is not 
supported in $shimVersion")
       }
     }
+
+    extension.injectOptimizerRule(sparkSession => {
+      BlazeRuleEngine(sparkSession)
+    })
   }
 
   override def createConvertToNativeExec(child: SparkPlan): 
ConvertToNativeBase =
@@ -1046,3 +1055,67 @@ case class NativeExprWrapper(
   @sparkver("3.2 / 3.3 / 3.4 / 3.5")
   override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): 
Expression = copy()
 }
+
+case class BlazeRuleEngine(sparkSession: SparkSession) extends 
Rule[LogicalPlan] with Logging {
+  import BlazeSparkSessionExtension._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.foreachUp {
+      case p @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) =>
+        // non parquet table rule
+        if (!(
+          BlazeConverters.enableScanParquet && 
fsRelation.fileFormat.isInstanceOf[ParquetFileFormat]
+            || BlazeConverters.enableScanOrc && 
fsRelation.fileFormat.isInstanceOf[OrcFileFormat]
+          )) {
+          turnOffBlazeWithReason(p.conf, BlazeMissPatterns.NonParquetFormat)
+        }
+
+        // read encrypted table rule
+        val readEncryptedTableEnable = sparkSession.sparkContext.conf
+          .getBoolean("spark.hive.exist.read.encrypted.table", defaultValue = 
false)
+        if (readEncryptedTableEnable) {
+          turnOffBlazeWithReason(p.conf, BlazeMissPatterns.ReadEncryptedTable)
+        }
+
+        // skip scan dp_dd.*** table because parquet statics don't get min/max 
info
+        if 
(p.catalogTable.map(_.identifier.unquotedString).getOrElse("").contains("dp_dd"))
 {
+          turnOffBlazeWithReason(p.conf, BlazeMissPatterns.ReadHbaseTable)
+        }
+
+        // skip scan offline_attribution_mover_v2
+        // issue: https://team.corp.kuaishou.com/task/T6538018
+        if (p.catalogTable
+          .map(_.identifier.unquotedString)
+          .getOrElse("")
+          .contains("offline_attribution_mover_v2")) {
+          turnOffBlazeWithReason(p.conf, 
BlazeMissPatterns.ReadBlacklistedTable)
+        }
+
+      case h: HiveTableRelation =>
+        turnOffBlazeWithReason(h.conf, BlazeMissPatterns.NonParquetFormat)
+
+      case _ =>
+    }
+    plan
+  }
+
+  private def turnOffBlazeWithReason(planConf: SQLConf, blazeMissPattern: 
String): Unit = {
+    planConf.setConf(blazeEnabledKey, false)
+    sparkSession.sparkContext.conf
+      .set(BlazeRuleEngine.blazeMissPatterns, blazeMissPattern)
+  }
+
+  object BlazeMissPatterns extends Enumeration {
+    val NonParquetFormat = "NonParquetFormat"
+    val ReadEncryptedTable = "ReadEncryptedTable"
+    val ReadHbaseTable = "ReadHbaseTable"
+    val ReadBlacklistedTable = "ReadBlacklistedTable"
+  }
+}
+
+object BlazeRuleEngine {
+  lazy val blazeMissPatterns: OptionalConfigEntry[String] = SQLConf
+    .buildConf("spark.blaze.blazeMissPatterns")
+    .stringConf
+    .createOptional
+}

Reply via email to