This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 0b33b051d fix: Mark SortOrder with floating-point as incompatible
(#2650)
0b33b051d is described below
commit 0b33b051ddab43d188e3637b635fed18330bccc5
Author: Andy Grove <[email protected]>
AuthorDate: Tue Oct 28 11:21:58 2025 -0600
fix: Mark SortOrder with floating-point as incompatible (#2650)
---
docs/source/user-guide/latest/compatibility.md | 4 ++
docs/source/user-guide/latest/tuning.md | 6 ++
.../scala/org/apache/comet/serde/CometSort.scala | 60 +++++++++++++++++++-
.../org/apache/comet/serde/QueryPlanSerde.scala | 31 +----------
.../org/apache/comet/CometExpressionSuite.scala | 65 ++++++++++++++++++++++
.../scala/org/apache/spark/sql/CometTestBase.scala | 4 ++
6 files changed, 140 insertions(+), 30 deletions(-)
diff --git a/docs/source/user-guide/latest/compatibility.md
b/docs/source/user-guide/latest/compatibility.md
index 562baabfd..6c3bab59d 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -97,6 +97,10 @@ because they are handled well in Spark (e.g.,
`SQLOrderingUtil.compareFloats`).
functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g.,
[arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
So Comet will add additional normalization expression of NaN and zero for
comparison.
+Sorting on floating-point data types (or complex types containing
floating-point values) is not compatible with
+Spark if the data contains both zero and negative zero. This is likely an edge
case that is not of concern for many users
+and sorting on floating-point data can be enabled by setting
`spark.comet.expression.SortOrder.allowIncompatible=true`.
+
There is a known bug with using count(distinct) within aggregate queries,
where each NaN value will be counted
separately [#1824](https://github.com/apache/datafusion-comet/issues/1824).
diff --git a/docs/source/user-guide/latest/tuning.md
b/docs/source/user-guide/latest/tuning.md
index cc0109526..21b1df652 100644
--- a/docs/source/user-guide/latest/tuning.md
+++ b/docs/source/user-guide/latest/tuning.md
@@ -100,6 +100,12 @@ Comet Performance
It may be possible to reduce Comet's memory overhead by reducing batch sizes
or increasing number of partitions.
+## Optimizing Sorting on Floating-Point Values
+
+Sorting on floating-point data types (or complex types containing
floating-point values) is not compatible with
+Spark if the data contains both zero and negative zero. This is likely an edge
case that is not of concern for many users
+and sorting on floating-point data can be enabled by setting
`spark.comet.expression.SortOrder.allowIncompatible=true`.
+
## Optimizing Joins
Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability
reasons. If the build-side of a
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
index 2dec25c0d..4a1063458 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
@@ -21,12 +21,70 @@ package org.apache.comet.serde
import scala.jdk.CollectionConverters._
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
Descending, NullsFirst, NullsLast, SortOrder}
import org.apache.spark.sql.execution.SortExec
+import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType,
StructType}
import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde.{exprToProto, supportedSortType}
+import org.apache.comet.serde.QueryPlanSerde.{exprToProto,
exprToProtoInternal, supportedSortType}
+
+object CometSortOrder extends CometExpressionSerde[SortOrder] {
+
+ override def getSupportLevel(expr: SortOrder): SupportLevel = {
+
+ def containsFloatingPoint(dt: DataType): Boolean = {
+ dt match {
+ case DataTypes.FloatType | DataTypes.DoubleType => true
+ case ArrayType(elementType, _) => containsFloatingPoint(elementType)
+ case StructType(fields) => fields.exists(f =>
containsFloatingPoint(f.dataType))
+ case MapType(keyType, valueType, _) =>
+ containsFloatingPoint(keyType) || containsFloatingPoint(valueType)
+ case _ => false
+ }
+ }
+
+ if (containsFloatingPoint(expr.child.dataType)) {
+ Incompatible(Some(
+ s"Sorting on floating-point is not 100% compatible with Spark.
${CometConf.COMPAT_GUIDE}"))
+ } else {
+ Compatible()
+ }
+ }
+
+ override def convert(
+ expr: SortOrder,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val childExpr = exprToProtoInternal(expr.child, inputs, binding)
+
+ if (childExpr.isDefined) {
+ val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
+
+ sortOrderBuilder.setChild(childExpr.get)
+
+ expr.direction match {
+ case Ascending => sortOrderBuilder.setDirectionValue(0)
+ case Descending => sortOrderBuilder.setDirectionValue(1)
+ }
+
+ expr.nullOrdering match {
+ case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
+ case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
+ }
+
+ Some(
+ ExprOuterClass.Expr
+ .newBuilder()
+ .setSortOrder(sortOrderBuilder)
+ .build())
+ } else {
+ withInfo(expr, expr.child)
+ None
+ }
+ }
+}
object CometSort extends CometOperatorSerde[SortExec] {
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 9f418e306..8361f1e95 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -221,7 +221,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Cast] -> CometCast)
private val miscExpressions: Map[Class[_ <: Expression],
CometExpressionSerde[_]] = Map(
- // TODO SortOrder (?)
// TODO PromotePrecision
// TODO KnownFloatingPointNormalized
// TODO ScalarSubquery
@@ -235,7 +234,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Coalesce] -> CometCoalesce,
classOf[Literal] -> CometLiteral,
classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
- classOf[SparkPartitionID] -> CometSparkPartitionId)
+ classOf[SparkPartitionID] -> CometSparkPartitionId,
+ classOf[SortOrder] -> CometSortOrder)
/**
* Mapping of Spark expression class to Comet expression handler.
@@ -698,33 +698,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
versionSpecificExprToProtoInternal(expr, inputs, binding).orElse(expr
match {
- case SortOrder(child, direction, nullOrdering, _) =>
- val childExpr = exprToProtoInternal(child, inputs, binding)
-
- if (childExpr.isDefined) {
- val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
- sortOrderBuilder.setChild(childExpr.get)
-
- direction match {
- case Ascending => sortOrderBuilder.setDirectionValue(0)
- case Descending => sortOrderBuilder.setDirectionValue(1)
- }
-
- nullOrdering match {
- case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
- case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
- }
-
- Some(
- ExprOuterClass.Expr
- .newBuilder()
- .setSortOrder(sortOrderBuilder)
- .build())
- } else {
- withInfo(expr, child)
- None
- }
-
case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
// `UnaryExpression` includes `PromotePrecision` for Spark 3.3
// `PromotePrecision` is just a wrapper, don't need to serialize it.
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index ddbe7d14e..9aecf55b3 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -42,6 +42,7 @@ import
org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
import org.apache.spark.sql.types._
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
+import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._
@@ -60,6 +61,70 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
val DIVIDE_BY_ZERO_EXCEPTION_MSG =
"""Division by zero. Use `try_divide` to tolerate divisor being 0 and
return NULL instead"""
+ test("sort floating point with negative zero") {
+ val schema = StructType(
+ Seq(
+ StructField("c0", DataTypes.FloatType, true),
+ StructField("c1", DataTypes.DoubleType, true)))
+ val df = FuzzDataGenerator.generateDataFrame(
+ new Random(42),
+ spark,
+ schema,
+ 1000,
+ DataGenOptions(generateNegativeZero = true))
+ df.createOrReplaceTempView("tbl")
+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") ->
"false") {
+ checkSparkAnswerAndFallbackReason(
+ "select * from tbl order by 1, 2",
+ "unsupported range partitioning sort order")
+ }
+ }
+
+ test("sort array of floating point with negative zero") {
+ val schema = StructType(
+ Seq(
+ StructField("c0", DataTypes.createArrayType(DataTypes.FloatType),
true),
+ StructField("c1", DataTypes.createArrayType(DataTypes.DoubleType),
true)))
+ val df = FuzzDataGenerator.generateDataFrame(
+ new Random(42),
+ spark,
+ schema,
+ 1000,
+ DataGenOptions(generateNegativeZero = true))
+ df.createOrReplaceTempView("tbl")
+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") ->
"false") {
+ checkSparkAnswerAndFallbackReason(
+ "select * from tbl order by 1, 2",
+ "unsupported range partitioning sort order")
+ }
+ }
+
+ test("sort struct containing floating point with negative zero") {
+ val schema = StructType(
+ Seq(
+ StructField(
+ "float_struct",
+ StructType(Seq(StructField("c0", DataTypes.FloatType, true)))),
+ StructField(
+ "float_double",
+ StructType(Seq(StructField("c0", DataTypes.DoubleType, true))))))
+ val df = FuzzDataGenerator.generateDataFrame(
+ new Random(42),
+ spark,
+ schema,
+ 1000,
+ DataGenOptions(generateNegativeZero = true))
+ df.createOrReplaceTempView("tbl")
+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") ->
"false") {
+ checkSparkAnswerAndFallbackReason(
+ "select * from tbl order by 1, 2",
+ "unsupported range partitioning sort order")
+ }
+ }
+
test("compare true/false to negative zero") {
Seq(false, true).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 2308858f6..fe2edc705 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -86,6 +86,10 @@ abstract class CometTestBase
conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key,
"true")
+ // SortOrder is incompatible for mixed zero and negative zero floating
point values, but
+ // this is an edge case, and we expect most users to allow sorts on
floating point, so we
+ // enable this for the tests
+ conf.set(CometConf.getExprAllowIncompatConfigKey("SortOrder"), "true")
conf
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]