This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new a583db3b9 Chore: refactor Comparison out of QueryPlanSerde (#2028) a583db3b9 is described below commit a583db3b94bc3ef36540d6804e1830a57fdb9f96 Author: Yu-Chuan Hung <86523891+cutechuanch...@users.noreply.github.com> AuthorDate: Wed Aug 6 00:12:50 2025 +0800 Chore: refactor Comparison out of QueryPlanSerde (#2028) --- .../org/apache/comet/serde/QueryPlanSerde.scala | 110 ++--------- .../scala/org/apache/comet/serde/comparisons.scala | 208 +++++++++++++++++++++ 2 files changed, 219 insertions(+), 99 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b08cb89df..a184bc94c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -127,6 +127,15 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapValues] -> CometMapValues, classOf[MapFromArrays] -> CometMapFromArrays, classOf[GetMapValue] -> CometMapExtract, + classOf[GreaterThan] -> CometGreaterThan, + classOf[GreaterThanOrEqual] -> CometGreaterThanOrEqual, + classOf[LessThan] -> CometLessThan, + classOf[LessThanOrEqual] -> CometLessThanOrEqual, + classOf[IsNull] -> CometIsNull, + classOf[IsNotNull] -> CometIsNotNull, + classOf[IsNaN] -> CometIsNaN, + classOf[In] -> CometIn, + classOf[InSet] -> CometInSet, classOf[Rand] -> CometRand, classOf[Randn] -> CometRandn, classOf[SparkPartitionID] -> CometSparkPartitionId, @@ -684,42 +693,6 @@ object QueryPlanSerde extends Logging with CometExprShim { binding, (builder, binaryExpr) => builder.setNeqNullSafe(binaryExpr)) - case GreaterThan(left, right) => - createBinaryExpr( - expr, - left, - right, - inputs, - binding, - (builder, binaryExpr) => builder.setGt(binaryExpr)) - - case GreaterThanOrEqual(left, right) => - createBinaryExpr( - expr, - left, - right, - inputs, - binding, - (builder, binaryExpr) => builder.setGtEq(binaryExpr)) - - case LessThan(left, right) => - createBinaryExpr( - expr, - left, - right, - inputs, - binding, - (builder, binaryExpr) => builder.setLt(binaryExpr)) - - case LessThanOrEqual(left, right) => - createBinaryExpr( - expr, - left, - right, - inputs, - binding, - (builder, binaryExpr) => builder.setLtEq(binaryExpr)) - case Literal(value, dataType) if supportedDataType(dataType, allowComplex = value == null) => val exprBuilder = ExprOuterClass.Literal.newBuilder() @@ -1066,29 +1039,6 @@ object QueryPlanSerde extends Logging with CometExprShim { }) optExprWithInfo(optExpr, expr, child) - case IsNull(child) => - createUnaryExpr( - expr, - child, - inputs, - binding, - (builder, unaryExpr) => builder.setIsNull(unaryExpr)) - - case IsNotNull(child) => - createUnaryExpr( - expr, - child, - inputs, - binding, - (builder, unaryExpr) => builder.setIsNotNull(unaryExpr)) - - case IsNaN(child) => - val childExpr = exprToProtoInternal(child, inputs, binding) - val optExpr = - scalarFunctionExprToProtoWithReturnType("isnan", BooleanType, childExpr) - - optExprWithInfo(optExpr, expr, child) - case SortOrder(child, direction, nullOrdering, _) => val childExpr = exprToProtoInternal(child, inputs, binding) @@ -1458,20 +1408,8 @@ object QueryPlanSerde extends Logging with CometExprShim { binding, (builder, binaryExpr) => builder.setBitwiseAnd(binaryExpr)) - case In(value, list) => - in(expr, value, list, inputs, binding, negate = false) - - case InSet(value, hset) => - val valueDataType = value.dataType - val list = hset.map { setVal => - Literal(setVal, valueDataType) - }.toSeq - // Change `InSet` to `In` expression - // We do Spark `InSet` optimization in native (DataFusion) side. - in(expr, value, list, inputs, binding, negate = false) - - case Not(In(value, list)) => - in(expr, value, list, inputs, binding, negate = true) + case Not(In(_, _)) => + CometNotIn.convert(expr, inputs, binding) case Not(child) => createUnaryExpr( @@ -1815,32 +1753,6 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - def in( - expr: Expression, - value: Expression, - list: Seq[Expression], - inputs: Seq[Attribute], - binding: Boolean, - negate: Boolean): Option[Expr] = { - val valueExpr = exprToProtoInternal(value, inputs, binding) - val listExprs = list.map(exprToProtoInternal(_, inputs, binding)) - if (valueExpr.isDefined && listExprs.forall(_.isDefined)) { - val builder = ExprOuterClass.In.newBuilder() - builder.setInValue(valueExpr.get) - builder.addAllLists(listExprs.map(_.get).asJava) - builder.setNegated(negate) - Some( - ExprOuterClass.Expr - .newBuilder() - .setIn(builder) - .build()) - } else { - val allExprs = list ++ Seq(value) - withInfo(expr, allExprs: _*) - None - } - } - def scalarFunctionExprToProtoWithReturnType( funcName: String, returnType: DataType, diff --git a/spark/src/main/scala/org/apache/comet/serde/comparisons.scala b/spark/src/main/scala/org/apache/comet/serde/comparisons.scala new file mode 100644 index 000000000..aced6c94c --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/comparisons.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GreaterThan, GreaterThanOrEqual, In, InSet, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not} +import org.apache.spark.sql.types.BooleanType + +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde._ + +object CometGreaterThan extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val greaterThan = expr.asInstanceOf[GreaterThan] + + createBinaryExpr( + expr, + greaterThan.left, + greaterThan.right, + inputs, + binding, + (builder, binaryExpr) => builder.setGt(binaryExpr)) + } +} + +object CometGreaterThanOrEqual extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val greaterThanOrEqual = expr.asInstanceOf[GreaterThanOrEqual] + + createBinaryExpr( + expr, + greaterThanOrEqual.left, + greaterThanOrEqual.right, + inputs, + binding, + (builder, binaryExpr) => builder.setGtEq(binaryExpr)) + } +} + +object CometLessThan extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val lessThan = expr.asInstanceOf[LessThan] + + createBinaryExpr( + expr, + lessThan.left, + lessThan.right, + inputs, + binding, + (builder, binaryExpr) => builder.setLt(binaryExpr)) + } +} + +object CometLessThanOrEqual extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val lessThanOrEqual = expr.asInstanceOf[LessThanOrEqual] + + createBinaryExpr( + expr, + lessThanOrEqual.left, + lessThanOrEqual.right, + inputs, + binding, + (builder, binaryExpr) => builder.setLtEq(binaryExpr)) + } +} + +object CometIsNull extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val isNull = expr.asInstanceOf[IsNull] + + createUnaryExpr( + expr, + isNull.child, + inputs, + binding, + (builder, unaryExpr) => builder.setIsNull(unaryExpr)) + } +} + +object CometIsNotNull extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val isNotNull = expr.asInstanceOf[IsNotNull] + + createUnaryExpr( + expr, + isNotNull.child, + inputs, + binding, + (builder, unaryExpr) => builder.setIsNotNull(unaryExpr)) + } +} + +object CometIsNaN extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val isNaN = expr.asInstanceOf[IsNaN] + val childExpr = exprToProtoInternal(isNaN.child, inputs, binding) + val optExpr = scalarFunctionExprToProtoWithReturnType("isnan", BooleanType, childExpr) + + optExprWithInfo(optExpr, expr, isNaN.child) + } +} + +object CometIn extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val inExpr = expr.asInstanceOf[In] + ComparisonUtils.in(expr, inExpr.value, inExpr.list, inputs, binding, negate = false) + } +} + +object CometNotIn extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val notExpr = expr.asInstanceOf[Not] + val inExpr = notExpr.child.asInstanceOf[In] + ComparisonUtils.in(expr, inExpr.value, inExpr.list, inputs, binding, negate = true) + } +} + +object CometInSet extends CometExpressionSerde { + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val inSetExpr = expr.asInstanceOf[InSet] + val valueDataType = inSetExpr.child.dataType + val list = inSetExpr.hset.map { setVal => + Literal(setVal, valueDataType) + }.toSeq + // Change `InSet` to `In` expression + // We do Spark `InSet` optimization in native (DataFusion) side. + ComparisonUtils.in(expr, inSetExpr.child, list, inputs, binding, negate = false) + } +} + +object ComparisonUtils { + + def in( + expr: Expression, + value: Expression, + list: Seq[Expression], + inputs: Seq[Attribute], + binding: Boolean, + negate: Boolean): Option[Expr] = { + val valueExpr = exprToProtoInternal(value, inputs, binding) + val listExprs = list.map(exprToProtoInternal(_, inputs, binding)) + if (valueExpr.isDefined && listExprs.forall(_.isDefined)) { + val builder = ExprOuterClass.In.newBuilder() + builder.setInValue(valueExpr.get) + builder.addAllLists(listExprs.map(_.get).asJava) + builder.setNegated(negate) + Some( + ExprOuterClass.Expr + .newBuilder() + .setIn(builder) + .build()) + } else { + val allExprs = list ++ Seq(value) + withInfo(expr, allExprs: _*) + None + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org