This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-parallel-scan-kdev-build in repository https://gitbox.apache.org/repos/asf/auron.git
commit ea2dd42b48f3ae30ea7d2277ecb15227a428426c Author: zhangli20 <[email protected]> AuthorDate: Sat Mar 7 16:15:57 2026 +0800 supports spark.blaze.enableScanFallback --- .idea/vcs.xml | 3 --- .../src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala | 7 +++++-- .../src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala | 7 +++++-- .../org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala | 1 - .../src/main/java/org/apache/spark/sql/blaze/BlazeConf.java | 2 ++ .../src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala | 3 +-- 6 files changed, 13 insertions(+), 10 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index bd37bc65..02158c9e 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -12,8 +12,5 @@ </component> <component name="VcsDirectoryMappings"> <mapping directory="$PROJECT_DIR$" vcs="Git" /> - <mapping directory="$PROJECT_DIR$/thirdparty/arrow-datafusion" vcs="Git" /> - <mapping directory="$PROJECT_DIR$/thirdparty/arrow-rs" vcs="Git" /> - <mapping directory="$PROJECT_DIR$/thirdparty/datafusion-orc" vcs="Git" /> </component> </project> \ No newline at end of file diff --git a/spark-extension-shims-spark241kwaiae/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark241kwaiae/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala index 60b203e6..26820773 100644 --- a/spark-extension-shims-spark241kwaiae/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala +++ b/spark-extension-shims-spark241kwaiae/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala @@ -746,7 +746,7 @@ case class BlazeRuleEngine(sparkSession: SparkSession) extends Rule[LogicalPlan] plan.foreachUp { case p @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => // non parquet table rule - if (!( + if (!BlazeConf.ENABLE_SCAN_FALLBACK.booleanConf() && !( BlazeConverters.enableScanParquet && fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] || BlazeConverters.enableScanOrc && fsRelation.fileFormat.isInstanceOf[OrcFileFormat] )) { @@ -775,7 +775,9 @@ case class BlazeRuleEngine(sparkSession: SparkSession) extends Rule[LogicalPlan] } case h: HiveTableRelation => - turnOffBlazeWithReason(h.conf, BlazeMissPatterns.NonParquetFormat) + if (!BlazeConf.ENABLE_SCAN_FALLBACK.booleanConf()) { + turnOffBlazeWithReason(h.conf, BlazeMissPatterns.NonParquetFormat) + } case _ => } @@ -783,6 +785,7 @@ case class BlazeRuleEngine(sparkSession: SparkSession) extends Rule[LogicalPlan] } private def turnOffBlazeWithReason(planConf: SQLConf, blazeMissPattern: String): Unit = { + logWarning(s"Turn off Blaze due to: $blazeMissPattern") planConf.setConf(blazeEnabledKey, false) sparkSession.sparkContext.conf .set(BlazeRuleEngine.blazeMissPatterns, blazeMissPattern) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala index 8b74e735..2b6ca98d 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala @@ -1063,7 +1063,7 @@ case class BlazeRuleEngine(sparkSession: SparkSession) extends Rule[LogicalPlan] plan.foreachUp { case p @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => // non parquet table rule - if (!( + if (!BlazeConf.ENABLE_SCAN_FALLBACK.booleanConf() && !( BlazeConverters.enableScanParquet && fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] || BlazeConverters.enableScanOrc && fsRelation.fileFormat.isInstanceOf[OrcFileFormat] )) { @@ -1092,7 +1092,9 @@ case class BlazeRuleEngine(sparkSession: SparkSession) extends Rule[LogicalPlan] } case h: HiveTableRelation => - turnOffBlazeWithReason(h.conf, BlazeMissPatterns.NonParquetFormat) + if (!BlazeConf.ENABLE_SCAN_FALLBACK.booleanConf()) { + turnOffBlazeWithReason(h.conf, BlazeMissPatterns.NonParquetFormat) + } case _ => } @@ -1100,6 +1102,7 @@ case class BlazeRuleEngine(sparkSession: SparkSession) extends Rule[LogicalPlan] } private def turnOffBlazeWithReason(planConf: SQLConf, blazeMissPattern: String): Unit = { + logWarning(s"Turn off Blaze due to: $blazeMissPattern") planConf.setConf(blazeEnabledKey, false) sparkSession.sparkContext.conf .set(BlazeRuleEngine.blazeMissPatterns, blazeMissPattern) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala index 6cfbbc36..1cee51b4 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.execution.aggregate.BaseAggregateExec -import org.apache.spark.sql.execution.auron.plan.NativeAggBase.AggExecMode import org.apache.spark.sql.execution.blaze.plan.NativeAggBase.AggExecMode import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.BinaryType diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java index 5136b8b5..76f90653 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java @@ -115,6 +115,8 @@ public enum BlazeConf { // number of parallel scan files NUM_PARALLEL_SCAN_FILES("spark.blaze.numParallelScanFiles", 4), + ENABLE_SCAN_FALLBACK("spark.blaze.enableScanFallback", false), + NATIVE_LOG_LEVEL("spark.blaze.native.log.level", "info"); public final String key; diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala index 4b8659f9..5ea13b36 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala @@ -16,7 +16,6 @@ package org.apache.spark.sql.blaze import scala.collection.immutable.TreeMap - import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.Partition import org.apache.spark.SparkConf @@ -24,13 +23,13 @@ import org.apache.spark.SparkContext import org.apache.spark.SparkEnv import org.apache.spark.TaskContext import org.blaze.protobuf.PhysicalPlanNode - import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.util.Utils object NativeHelper extends Logging { val currentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
