This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-parallel-scan-kdev-build in repository https://gitbox.apache.org/repos/asf/auron.git
commit 08b6fa77309d2b42cc02c1968ba9dc8bb282abb6 Author: zhangli20 <[email protected]> AuthorDate: Sat Mar 7 15:18:53 2026 +0800 disable max/min + nested params conversion (https://team.corp.kuaishou.com/task/T10892667) --- .../org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala | 2 ++ .../main/scala/org/apache/spark/sql/blaze/NativeConverters.scala | 4 ++++ .../org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala | 4 +++- .../apache/spark/sql/execution/blaze/plan/NativeWindowBase.scala | 6 +++++- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala index 3dc3c4ee..6cfbbc36 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala @@ -47,6 +47,8 @@ case class NativeAggExec( child) with BaseAggregateExec { + override val output: Seq[Attribute] = outputAttributes + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") override val requiredChildDistributionExpressions: Option[Seq[Expression]] = theRequiredChildDistributionExpressions diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala index f798dcee..2bcb756a 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeConverters.scala @@ -1057,9 +1057,13 @@ object NativeConverters extends Logging { e.aggregateFunction match { case e: Max => + assert(!e.child.dataType.isInstanceOf[ArrayType] && !e.child.dataType.isInstanceOf[MapType] && !e.child.dataType.isInstanceOf[StructType], + s"Max on complex types (Array/Map/Struct) is not supported in native execution. Expression: ${e}") aggBuilder.setAggFunction(pb.AggFunction.MAX) aggBuilder.addChildren(convertExpr(e.child)) case e: Min => + assert(!e.child.dataType.isInstanceOf[ArrayType] && !e.child.dataType.isInstanceOf[MapType] && !e.child.dataType.isInstanceOf[StructType], + s"Min on complex types (Array/Map/Struct) is not supported in native execution. Expression: ${e}") aggBuilder.setAggFunction(pb.AggFunction.MIN) aggBuilder.addChildren(convertExpr(e.child)) case e: Sum if e.dataType.isInstanceOf[AtomicType] => diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala index 2078117b..5824fac3 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala @@ -139,7 +139,7 @@ abstract class NativeAggBase( nativeAggrs nativeAggrModes - override def output: Seq[Attribute] = + def outputAttributes: Seq[Attribute] = if (nativeAggrModes.contains(pb.AggMode.FINAL)) { groupingExpressions.map(_.toAttribute) ++ aggregateAttributes } else { @@ -149,6 +149,8 @@ abstract class NativeAggBase( groupingExpressions.map(_.toAttribute) ++ aggBufferAttrs } + override def output: Seq[Attribute] = outputAttributes + override def outputPartitioning: Partitioning = child.outputPartitioning diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowBase.scala index 4ccf4112..4fd23354 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowBase.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.blaze.plan import scala.collection.JavaConverters._ import scala.collection.immutable.SortedMap - import org.apache.spark.OneToOneDependency import org.apache.spark.sql.blaze.MetricNode import org.apache.spark.sql.blaze.NativeConverters @@ -47,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.expressions.aggregate.Max import org.apache.spark.sql.catalyst.expressions.aggregate.Min import org.apache.spark.sql.catalyst.expressions.aggregate.Sum +import org.apache.spark.sql.types.{ArrayType, MapType, StructType} import org.blaze.protobuf.WindowGroupLimit abstract class NativeWindowBase( @@ -136,6 +136,8 @@ abstract class NativeWindowBase( assert( spec.frameSpecification == RowNumber().frame, // only supports RowFrame(Unbounded, CurrentRow) s"window frame not supported: ${spec.frameSpecification}") + assert(!e.child.dataType.isInstanceOf[ArrayType] && !e.child.dataType.isInstanceOf[MapType] && !e.child.dataType.isInstanceOf[StructType], + s"Max on complex types (Array/Map/Struct) is not supported in native execution. Expression: ${e}") windowExprBuilder.setFuncType(pb.WindowFunctionType.Agg) windowExprBuilder.setAggFunc(pb.AggFunction.MAX) windowExprBuilder.addChildren(NativeConverters.convertExpr(e.child)) @@ -144,6 +146,8 @@ abstract class NativeWindowBase( assert( spec.frameSpecification == RowNumber().frame, // only supports RowFrame(Unbounded, CurrentRow) s"window frame not supported: ${spec.frameSpecification}") + assert(!e.child.dataType.isInstanceOf[ArrayType] && !e.child.dataType.isInstanceOf[MapType] && !e.child.dataType.isInstanceOf[StructType], + s"Min on complex types (Array/Map/Struct) is not supported in native execution. Expression: ${e}") windowExprBuilder.setFuncType(pb.WindowFunctionType.Agg) windowExprBuilder.setAggFunc(pb.AggFunction.MIN) windowExprBuilder.addChildren(NativeConverters.convertExpr(e.child))
