This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b33b051d fix: Mark SortOrder with floating-point as incompatible 
(#2650)
0b33b051d is described below

commit 0b33b051ddab43d188e3637b635fed18330bccc5
Author: Andy Grove <[email protected]>
AuthorDate: Tue Oct 28 11:21:58 2025 -0600

    fix: Mark SortOrder with floating-point as incompatible (#2650)
---
 docs/source/user-guide/latest/compatibility.md     |  4 ++
 docs/source/user-guide/latest/tuning.md            |  6 ++
 .../scala/org/apache/comet/serde/CometSort.scala   | 60 +++++++++++++++++++-
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 31 +----------
 .../org/apache/comet/CometExpressionSuite.scala    | 65 ++++++++++++++++++++++
 .../scala/org/apache/spark/sql/CometTestBase.scala |  4 ++
 6 files changed, 140 insertions(+), 30 deletions(-)

diff --git a/docs/source/user-guide/latest/compatibility.md 
b/docs/source/user-guide/latest/compatibility.md
index 562baabfd..6c3bab59d 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -97,6 +97,10 @@ because they are handled well in Spark (e.g., 
`SQLOrderingUtil.compareFloats`).
 functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., 
[arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
 So Comet will add additional normalization expression of NaN and zero for 
comparison.
 
+Sorting on floating-point data types (or complex types containing 
floating-point values) is not compatible with 
+Spark if the data contains both zero and negative zero. This is likely an edge 
case that is not of concern for many users
+and sorting on floating-point data can be enabled by setting 
`spark.comet.expression.SortOrder.allowIncompatible=true`.
+
 There is a known bug with using count(distinct) within aggregate queries, 
where each NaN value will be counted
 separately [#1824](https://github.com/apache/datafusion-comet/issues/1824).
 
diff --git a/docs/source/user-guide/latest/tuning.md 
b/docs/source/user-guide/latest/tuning.md
index cc0109526..21b1df652 100644
--- a/docs/source/user-guide/latest/tuning.md
+++ b/docs/source/user-guide/latest/tuning.md
@@ -100,6 +100,12 @@ Comet Performance
 
 It may be possible to reduce Comet's memory overhead by reducing batch sizes 
or increasing number of partitions.
 
+## Optimizing Sorting on Floating-Point Values
+
+Sorting on floating-point data types (or complex types containing 
floating-point values) is not compatible with
+Spark if the data contains both zero and negative zero. This is likely an edge 
case that is not of concern for many users
+and sorting on floating-point data can be enabled by setting 
`spark.comet.expression.SortOrder.allowIncompatible=true`.
+
 ## Optimizing Joins
 
 Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability 
reasons. If the build-side of a
diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala 
b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
index 2dec25c0d..4a1063458 100644
--- a/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/CometSort.scala
@@ -21,12 +21,70 @@ package org.apache.comet.serde
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Descending, NullsFirst, NullsLast, SortOrder}
 import org.apache.spark.sql.execution.SortExec
+import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, 
StructType}
 
 import org.apache.comet.{CometConf, ConfigEntry}
 import org.apache.comet.CometSparkSessionExtensions.withInfo
 import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde.{exprToProto, supportedSortType}
+import org.apache.comet.serde.QueryPlanSerde.{exprToProto, 
exprToProtoInternal, supportedSortType}
+
+object CometSortOrder extends CometExpressionSerde[SortOrder] {
+
+  override def getSupportLevel(expr: SortOrder): SupportLevel = {
+
+    def containsFloatingPoint(dt: DataType): Boolean = {
+      dt match {
+        case DataTypes.FloatType | DataTypes.DoubleType => true
+        case ArrayType(elementType, _) => containsFloatingPoint(elementType)
+        case StructType(fields) => fields.exists(f => 
containsFloatingPoint(f.dataType))
+        case MapType(keyType, valueType, _) =>
+          containsFloatingPoint(keyType) || containsFloatingPoint(valueType)
+        case _ => false
+      }
+    }
+
+    if (containsFloatingPoint(expr.child.dataType)) {
+      Incompatible(Some(
+        s"Sorting on floating-point is not 100% compatible with Spark. 
${CometConf.COMPAT_GUIDE}"))
+    } else {
+      Compatible()
+    }
+  }
+
+  override def convert(
+      expr: SortOrder,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val childExpr = exprToProtoInternal(expr.child, inputs, binding)
+
+    if (childExpr.isDefined) {
+      val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
+
+      sortOrderBuilder.setChild(childExpr.get)
+
+      expr.direction match {
+        case Ascending => sortOrderBuilder.setDirectionValue(0)
+        case Descending => sortOrderBuilder.setDirectionValue(1)
+      }
+
+      expr.nullOrdering match {
+        case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
+        case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
+      }
+
+      Some(
+        ExprOuterClass.Expr
+          .newBuilder()
+          .setSortOrder(sortOrderBuilder)
+          .build())
+    } else {
+      withInfo(expr, expr.child)
+      None
+    }
+  }
+}
 
 object CometSort extends CometOperatorSerde[SortExec] {
 
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 9f418e306..8361f1e95 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -221,7 +221,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
     classOf[Cast] -> CometCast)
 
   private val miscExpressions: Map[Class[_ <: Expression], 
CometExpressionSerde[_]] = Map(
-    // TODO SortOrder (?)
     // TODO PromotePrecision
     // TODO KnownFloatingPointNormalized
     // TODO ScalarSubquery
@@ -235,7 +234,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
     classOf[Coalesce] -> CometCoalesce,
     classOf[Literal] -> CometLiteral,
     classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId,
-    classOf[SparkPartitionID] -> CometSparkPartitionId)
+    classOf[SparkPartitionID] -> CometSparkPartitionId,
+    classOf[SortOrder] -> CometSortOrder)
 
   /**
    * Mapping of Spark expression class to Comet expression handler.
@@ -698,33 +698,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
 
     versionSpecificExprToProtoInternal(expr, inputs, binding).orElse(expr 
match {
 
-      case SortOrder(child, direction, nullOrdering, _) =>
-        val childExpr = exprToProtoInternal(child, inputs, binding)
-
-        if (childExpr.isDefined) {
-          val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
-          sortOrderBuilder.setChild(childExpr.get)
-
-          direction match {
-            case Ascending => sortOrderBuilder.setDirectionValue(0)
-            case Descending => sortOrderBuilder.setDirectionValue(1)
-          }
-
-          nullOrdering match {
-            case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
-            case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
-          }
-
-          Some(
-            ExprOuterClass.Expr
-              .newBuilder()
-              .setSortOrder(sortOrderBuilder)
-              .build())
-        } else {
-          withInfo(expr, child)
-          None
-        }
-
       case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
         // `UnaryExpression` includes `PromotePrecision` for Spark 3.3
         // `PromotePrecision` is just a wrapper, don't need to serialize it.
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index ddbe7d14e..9aecf55b3 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -42,6 +42,7 @@ import 
org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
 import org.apache.spark.sql.types._
 
 import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
+import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
 
 class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
   import testImplicits._
@@ -60,6 +61,70 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   val DIVIDE_BY_ZERO_EXCEPTION_MSG =
     """Division by zero. Use `try_divide` to tolerate divisor being 0 and 
return NULL instead"""
 
+  test("sort floating point with negative zero") {
+    val schema = StructType(
+      Seq(
+        StructField("c0", DataTypes.FloatType, true),
+        StructField("c1", DataTypes.DoubleType, true)))
+    val df = FuzzDataGenerator.generateDataFrame(
+      new Random(42),
+      spark,
+      schema,
+      1000,
+      DataGenOptions(generateNegativeZero = true))
+    df.createOrReplaceTempView("tbl")
+
+    withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> 
"false") {
+      checkSparkAnswerAndFallbackReason(
+        "select * from tbl order by 1, 2",
+        "unsupported range partitioning sort order")
+    }
+  }
+
+  test("sort array of floating point with negative zero") {
+    val schema = StructType(
+      Seq(
+        StructField("c0", DataTypes.createArrayType(DataTypes.FloatType), 
true),
+        StructField("c1", DataTypes.createArrayType(DataTypes.DoubleType), 
true)))
+    val df = FuzzDataGenerator.generateDataFrame(
+      new Random(42),
+      spark,
+      schema,
+      1000,
+      DataGenOptions(generateNegativeZero = true))
+    df.createOrReplaceTempView("tbl")
+
+    withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> 
"false") {
+      checkSparkAnswerAndFallbackReason(
+        "select * from tbl order by 1, 2",
+        "unsupported range partitioning sort order")
+    }
+  }
+
+  test("sort struct containing floating point with negative zero") {
+    val schema = StructType(
+      Seq(
+        StructField(
+          "float_struct",
+          StructType(Seq(StructField("c0", DataTypes.FloatType, true)))),
+        StructField(
+          "float_double",
+          StructType(Seq(StructField("c0", DataTypes.DoubleType, true))))))
+    val df = FuzzDataGenerator.generateDataFrame(
+      new Random(42),
+      spark,
+      schema,
+      1000,
+      DataGenOptions(generateNegativeZero = true))
+    df.createOrReplaceTempView("tbl")
+
+    withSQLConf(CometConf.getExprAllowIncompatConfigKey("SortOrder") -> 
"false") {
+      checkSparkAnswerAndFallbackReason(
+        "select * from tbl order by 1, 2",
+        "unsupported range partitioning sort order")
+    }
+  }
+
   test("compare true/false to negative zero") {
     Seq(false, true).foreach { dictionary =>
       withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 2308858f6..fe2edc705 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -86,6 +86,10 @@ abstract class CometTestBase
     conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
     conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
     
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, 
"true")
+    // SortOrder is incompatible for mixed zero and negative zero floating 
point values, but
+    // this is an edge case, and we expect most users to allow sorts on 
floating point, so we
+    // enable this for the tests
+    conf.set(CometConf.getExprAllowIncompatConfigKey("SortOrder"), "true")
     conf
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to