This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new a583db3b9 Chore: refactor Comparison out of QueryPlanSerde (#2028)
a583db3b9 is described below

commit a583db3b94bc3ef36540d6804e1830a57fdb9f96
Author: Yu-Chuan Hung <86523891+cutechuanch...@users.noreply.github.com>
AuthorDate: Wed Aug 6 00:12:50 2025 +0800

    Chore: refactor Comparison out of QueryPlanSerde (#2028)
---
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 110 ++---------
 .../scala/org/apache/comet/serde/comparisons.scala | 208 +++++++++++++++++++++
 2 files changed, 219 insertions(+), 99 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index b08cb89df..a184bc94c 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -127,6 +127,15 @@ object QueryPlanSerde extends Logging with CometExprShim {
     classOf[MapValues] -> CometMapValues,
     classOf[MapFromArrays] -> CometMapFromArrays,
     classOf[GetMapValue] -> CometMapExtract,
+    classOf[GreaterThan] -> CometGreaterThan,
+    classOf[GreaterThanOrEqual] -> CometGreaterThanOrEqual,
+    classOf[LessThan] -> CometLessThan,
+    classOf[LessThanOrEqual] -> CometLessThanOrEqual,
+    classOf[IsNull] -> CometIsNull,
+    classOf[IsNotNull] -> CometIsNotNull,
+    classOf[IsNaN] -> CometIsNaN,
+    classOf[In] -> CometIn,
+    classOf[InSet] -> CometInSet,
     classOf[Rand] -> CometRand,
     classOf[Randn] -> CometRandn,
     classOf[SparkPartitionID] -> CometSparkPartitionId,
@@ -684,42 +693,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
           binding,
           (builder, binaryExpr) => builder.setNeqNullSafe(binaryExpr))
 
-      case GreaterThan(left, right) =>
-        createBinaryExpr(
-          expr,
-          left,
-          right,
-          inputs,
-          binding,
-          (builder, binaryExpr) => builder.setGt(binaryExpr))
-
-      case GreaterThanOrEqual(left, right) =>
-        createBinaryExpr(
-          expr,
-          left,
-          right,
-          inputs,
-          binding,
-          (builder, binaryExpr) => builder.setGtEq(binaryExpr))
-
-      case LessThan(left, right) =>
-        createBinaryExpr(
-          expr,
-          left,
-          right,
-          inputs,
-          binding,
-          (builder, binaryExpr) => builder.setLt(binaryExpr))
-
-      case LessThanOrEqual(left, right) =>
-        createBinaryExpr(
-          expr,
-          left,
-          right,
-          inputs,
-          binding,
-          (builder, binaryExpr) => builder.setLtEq(binaryExpr))
-
       case Literal(value, dataType)
           if supportedDataType(dataType, allowComplex = value == null) =>
         val exprBuilder = ExprOuterClass.Literal.newBuilder()
@@ -1066,29 +1039,6 @@ object QueryPlanSerde extends Logging with CometExprShim 
{
           })
         optExprWithInfo(optExpr, expr, child)
 
-      case IsNull(child) =>
-        createUnaryExpr(
-          expr,
-          child,
-          inputs,
-          binding,
-          (builder, unaryExpr) => builder.setIsNull(unaryExpr))
-
-      case IsNotNull(child) =>
-        createUnaryExpr(
-          expr,
-          child,
-          inputs,
-          binding,
-          (builder, unaryExpr) => builder.setIsNotNull(unaryExpr))
-
-      case IsNaN(child) =>
-        val childExpr = exprToProtoInternal(child, inputs, binding)
-        val optExpr =
-          scalarFunctionExprToProtoWithReturnType("isnan", BooleanType, 
childExpr)
-
-        optExprWithInfo(optExpr, expr, child)
-
       case SortOrder(child, direction, nullOrdering, _) =>
         val childExpr = exprToProtoInternal(child, inputs, binding)
 
@@ -1458,20 +1408,8 @@ object QueryPlanSerde extends Logging with CometExprShim 
{
           binding,
           (builder, binaryExpr) => builder.setBitwiseAnd(binaryExpr))
 
-      case In(value, list) =>
-        in(expr, value, list, inputs, binding, negate = false)
-
-      case InSet(value, hset) =>
-        val valueDataType = value.dataType
-        val list = hset.map { setVal =>
-          Literal(setVal, valueDataType)
-        }.toSeq
-        // Change `InSet` to `In` expression
-        // We do Spark `InSet` optimization in native (DataFusion) side.
-        in(expr, value, list, inputs, binding, negate = false)
-
-      case Not(In(value, list)) =>
-        in(expr, value, list, inputs, binding, negate = true)
+      case Not(In(_, _)) =>
+        CometNotIn.convert(expr, inputs, binding)
 
       case Not(child) =>
         createUnaryExpr(
@@ -1815,32 +1753,6 @@ object QueryPlanSerde extends Logging with CometExprShim 
{
     }
   }
 
-  def in(
-      expr: Expression,
-      value: Expression,
-      list: Seq[Expression],
-      inputs: Seq[Attribute],
-      binding: Boolean,
-      negate: Boolean): Option[Expr] = {
-    val valueExpr = exprToProtoInternal(value, inputs, binding)
-    val listExprs = list.map(exprToProtoInternal(_, inputs, binding))
-    if (valueExpr.isDefined && listExprs.forall(_.isDefined)) {
-      val builder = ExprOuterClass.In.newBuilder()
-      builder.setInValue(valueExpr.get)
-      builder.addAllLists(listExprs.map(_.get).asJava)
-      builder.setNegated(negate)
-      Some(
-        ExprOuterClass.Expr
-          .newBuilder()
-          .setIn(builder)
-          .build())
-    } else {
-      val allExprs = list ++ Seq(value)
-      withInfo(expr, allExprs: _*)
-      None
-    }
-  }
-
   def scalarFunctionExprToProtoWithReturnType(
       funcName: String,
       returnType: DataType,
diff --git a/spark/src/main/scala/org/apache/comet/serde/comparisons.scala 
b/spark/src/main/scala/org/apache/comet/serde/comparisons.scala
new file mode 100644
index 000000000..aced6c94c
--- /dev/null
+++ b/spark/src/main/scala/org/apache/comet/serde/comparisons.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.serde
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
GreaterThan, GreaterThanOrEqual, In, InSet, IsNaN, IsNotNull, IsNull, LessThan, 
LessThanOrEqual, Literal, Not}
+import org.apache.spark.sql.types.BooleanType
+
+import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.serde.ExprOuterClass.Expr
+import org.apache.comet.serde.QueryPlanSerde._
+
+object CometGreaterThan extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val greaterThan = expr.asInstanceOf[GreaterThan]
+
+    createBinaryExpr(
+      expr,
+      greaterThan.left,
+      greaterThan.right,
+      inputs,
+      binding,
+      (builder, binaryExpr) => builder.setGt(binaryExpr))
+  }
+}
+
+object CometGreaterThanOrEqual extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val greaterThanOrEqual = expr.asInstanceOf[GreaterThanOrEqual]
+
+    createBinaryExpr(
+      expr,
+      greaterThanOrEqual.left,
+      greaterThanOrEqual.right,
+      inputs,
+      binding,
+      (builder, binaryExpr) => builder.setGtEq(binaryExpr))
+  }
+}
+
+object CometLessThan extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val lessThan = expr.asInstanceOf[LessThan]
+
+    createBinaryExpr(
+      expr,
+      lessThan.left,
+      lessThan.right,
+      inputs,
+      binding,
+      (builder, binaryExpr) => builder.setLt(binaryExpr))
+  }
+}
+
+object CometLessThanOrEqual extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val lessThanOrEqual = expr.asInstanceOf[LessThanOrEqual]
+
+    createBinaryExpr(
+      expr,
+      lessThanOrEqual.left,
+      lessThanOrEqual.right,
+      inputs,
+      binding,
+      (builder, binaryExpr) => builder.setLtEq(binaryExpr))
+  }
+}
+
+object CometIsNull extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val isNull = expr.asInstanceOf[IsNull]
+
+    createUnaryExpr(
+      expr,
+      isNull.child,
+      inputs,
+      binding,
+      (builder, unaryExpr) => builder.setIsNull(unaryExpr))
+  }
+}
+
+object CometIsNotNull extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val isNotNull = expr.asInstanceOf[IsNotNull]
+
+    createUnaryExpr(
+      expr,
+      isNotNull.child,
+      inputs,
+      binding,
+      (builder, unaryExpr) => builder.setIsNotNull(unaryExpr))
+  }
+}
+
+object CometIsNaN extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val isNaN = expr.asInstanceOf[IsNaN]
+    val childExpr = exprToProtoInternal(isNaN.child, inputs, binding)
+    val optExpr = scalarFunctionExprToProtoWithReturnType("isnan", 
BooleanType, childExpr)
+
+    optExprWithInfo(optExpr, expr, isNaN.child)
+  }
+}
+
+object CometIn extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val inExpr = expr.asInstanceOf[In]
+    ComparisonUtils.in(expr, inExpr.value, inExpr.list, inputs, binding, 
negate = false)
+  }
+}
+
+object CometNotIn extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val notExpr = expr.asInstanceOf[Not]
+    val inExpr = notExpr.child.asInstanceOf[In]
+    ComparisonUtils.in(expr, inExpr.value, inExpr.list, inputs, binding, 
negate = true)
+  }
+}
+
+object CometInSet extends CometExpressionSerde {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val inSetExpr = expr.asInstanceOf[InSet]
+    val valueDataType = inSetExpr.child.dataType
+    val list = inSetExpr.hset.map { setVal =>
+      Literal(setVal, valueDataType)
+    }.toSeq
+    // Change `InSet` to `In` expression
+    // We do Spark `InSet` optimization in native (DataFusion) side.
+    ComparisonUtils.in(expr, inSetExpr.child, list, inputs, binding, negate = 
false)
+  }
+}
+
+object ComparisonUtils {
+
+  def in(
+      expr: Expression,
+      value: Expression,
+      list: Seq[Expression],
+      inputs: Seq[Attribute],
+      binding: Boolean,
+      negate: Boolean): Option[Expr] = {
+    val valueExpr = exprToProtoInternal(value, inputs, binding)
+    val listExprs = list.map(exprToProtoInternal(_, inputs, binding))
+    if (valueExpr.isDefined && listExprs.forall(_.isDefined)) {
+      val builder = ExprOuterClass.In.newBuilder()
+      builder.setInValue(valueExpr.get)
+      builder.addAllLists(listExprs.map(_.get).asJava)
+      builder.setNegated(negate)
+      Some(
+        ExprOuterClass.Expr
+          .newBuilder()
+          .setIn(builder)
+          .build())
+    } else {
+      val allExprs = list ++ Seq(value)
+      withInfo(expr, allExprs: _*)
+      None
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to