This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fa091e7c minor: Move `operator2Proto` to `CometExecRule` (#2767)
5fa091e7c is described below

commit 5fa091e7ce78827f4763416cc18785d058d394fa
Author: Andy Grove <[email protected]>
AuthorDate: Thu Nov 13 09:08:17 2025 -0700

    minor: Move `operator2Proto` to `CometExecRule` (#2767)
    
    * move operator2Proto to CometExecRule
    
    * rename methods
---
 .../org/apache/comet/rules/CometExecRule.scala     | 241 +++++++++++++++++++--
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 208 +-----------------
 2 files changed, 222 insertions(+), 227 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index c10a8b5af..a0dc8ccb1 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -20,6 +20,7 @@
 package org.apache.comet.rules
 
 import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, 
EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, 
GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, 
NamedExpression, Remainder}
@@ -43,8 +44,34 @@ import org.apache.spark.sql.types._
 import org.apache.comet.{CometConf, ExtendedExplainInfo}
 import org.apache.comet.CometConf.COMET_EXEC_SHUFFLE_ENABLED
 import org.apache.comet.CometSparkSessionExtensions._
+import org.apache.comet.rules.CometExecRule.opSerdeMap
+import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, 
OperatorOuterClass, QueryPlanSerde, Unsupported}
 import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde
+import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, 
supportedDataType}
+import org.apache.comet.serde.operator._
+
+object CometExecRule {
+
+  /**
+   * Mapping of Spark operator class to Comet operator handler.
+   */
+  val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
+    Map(
+      classOf[ProjectExec] -> CometProject,
+      classOf[FilterExec] -> CometFilter,
+      classOf[LocalLimitExec] -> CometLocalLimit,
+      classOf[GlobalLimitExec] -> CometGlobalLimit,
+      classOf[ExpandExec] -> CometExpand,
+      classOf[HashAggregateExec] -> CometHashAggregate,
+      classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
+      classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
+      classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
+      classOf[SortMergeJoinExec] -> CometSortMergeJoin,
+      classOf[SortExec] -> CometSort,
+      classOf[LocalTableScanExec] -> CometLocalTableScan,
+      classOf[WindowExec] -> CometWindow)
+
+}
 
 /**
  * Spark physical optimizer rule for replacing Spark operators with Comet 
operators.
@@ -137,11 +164,9 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
    */
   // spotless:on
   private def transform(plan: SparkPlan): SparkPlan = {
-    def operator2Proto(op: SparkPlan): Option[Operator] = {
+    def operator2ProtoIfAllChildrenAreNative(op: SparkPlan): Option[Operator] 
= {
       if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
-        QueryPlanSerde.operator2Proto(
-          op,
-          op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
+        operator2Proto(op, 
op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
       } else {
         None
       }
@@ -151,23 +176,23 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
      * Convert operator to proto and then apply a transformation to wrap the 
proto in a new plan.
      */
     def newPlanWithProto(op: SparkPlan, fun: Operator => SparkPlan): SparkPlan 
= {
-      operator2Proto(op).map(fun).getOrElse(op)
+      operator2ProtoIfAllChildrenAreNative(op).map(fun).getOrElse(op)
     }
 
     def convertNode(op: SparkPlan): SparkPlan = op match {
       // Fully native scan for V1
       case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION =>
-        val nativeOp = QueryPlanSerde.operator2Proto(scan).get
+        val nativeOp = operator2Proto(scan).get
         CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
 
       // Comet JVM + native scan for V1 and V2
       case op if isCometScan(op) =>
-        val nativeOp = QueryPlanSerde.operator2Proto(op)
+        val nativeOp = operator2Proto(op)
         CometScanWrapper(nativeOp.get, op)
 
       case op if shouldApplySparkToColumnar(conf, op) =>
         val cometOp = CometSparkToColumnarExec(op)
-        val nativeOp = QueryPlanSerde.operator2Proto(cometOp)
+        val nativeOp = operator2Proto(cometOp)
         CometScanWrapper(nativeOp.get, cometOp)
 
       case op: ProjectExec =>
@@ -215,8 +240,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
             // no reason to report reason if child is not native
             op
           } else {
-            QueryPlanSerde
-              .operator2Proto(op)
+            operator2Proto(op)
               .map { nativeOp =>
                 val cometOp =
                   CometCollectLimitExec(op, op.limit, op.offset, op.child)
@@ -339,8 +363,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
       case c @ CoalesceExec(numPartitions, child)
           if CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf)
             && isCometNative(child) =>
-        QueryPlanSerde
-          .operator2Proto(c)
+        operator2Proto(c)
           .map { nativeOp =>
             val cometOp = CometCoalesceExec(c, c.output, numPartitions, child)
             CometSinkPlaceHolder(nativeOp, c, cometOp)
@@ -358,8 +381,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
             .get(conf)
             && isCometShuffleEnabled(conf) &&
             CometTakeOrderedAndProjectExec.isSupported(s) =>
-        QueryPlanSerde
-          .operator2Proto(s)
+        operator2Proto(s)
           .map { nativeOp =>
             val cometOp =
               CometTakeOrderedAndProjectExec(
@@ -430,7 +452,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
           case b: BroadcastExchangeExec
               if isCometNative(b.child) &&
                 CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
-            QueryPlanSerde.operator2Proto(b) match {
+            operator2Proto(b) match {
               case Some(nativeOp) =>
                 val cometOp = CometBroadcastExchangeExec(b, b.output, b.mode, 
b.child)
                 CometSinkPlaceHolder(nativeOp, b, cometOp)
@@ -478,7 +500,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
       case s: ShuffleExchangeExec =>
         val nativeShuffle: Option[SparkPlan] =
           if (nativeShuffleSupported(s)) {
-            val newOp = operator2Proto(s)
+            val newOp = operator2ProtoIfAllChildrenAreNative(s)
             newOp match {
               case Some(nativeOp) =>
                 // Switch to use Decimal128 regardless of precision, since 
Arrow native execution
@@ -501,7 +523,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
           // If the child of ShuffleExchangeExec is also a 
ShuffleExchangeExec, we should not
           // convert it to CometColumnarShuffle,
           if (columnarShuffleSupported(s)) {
-            val newOp = QueryPlanSerde.operator2Proto(s)
+            val newOp = operator2Proto(s)
             newOp match {
               case Some(nativeOp) =>
                 s.child match {
@@ -528,8 +550,7 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
 
       case op: LocalTableScanExec =>
         if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) {
-          QueryPlanSerde
-            .operator2Proto(op)
+          operator2Proto(op)
             .map { nativeOp =>
               val cometOp = CometLocalTableScanExec(op, op.rows, op.output)
               CometScanWrapper(nativeOp, cometOp)
@@ -993,4 +1014,184 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
     }
   }
 
+  /**
+   * Convert a Spark plan operator to a protobuf Comet operator.
+   *
+   * @param op
+   *   Spark plan operator
+   * @param childOp
+   *   previously converted protobuf Comet operators, which will be consumed 
by the Spark plan
+   *   operator as its children
+   * @return
+   *   The converted Comet native operator for the input `op`, or `None` if 
the `op` cannot be
+   *   converted to a native operator.
+   */
+  private def operator2Proto(op: SparkPlan, childOp: Operator*): 
Option[Operator] = {
+    val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
+    childOp.foreach(builder.addChildren)
+
+    // look for registered handler first
+    val serde = opSerdeMap.get(op.getClass)
+    serde match {
+      case Some(handler) if isOperatorEnabled(handler, op) =>
+        val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
+        val maybeConverted = opSerde.convert(op, builder, childOp: _*)
+        if (maybeConverted.isDefined) {
+          return maybeConverted
+        }
+      case _ =>
+    }
+
+    // now handle special cases that cannot be handled as a simple mapping 
from class name
+    // and see if operator can be used as a sink
+    op match {
+
+      // Fully native scan for V1
+      case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION =>
+        CometNativeScan.convert(scan, builder, childOp: _*)
+
+      case op if isCometSink(op) =>
+        val supportedTypes =
+          op.output.forall(a => supportedDataType(a.dataType, allowComplex = 
true))
+
+        if (!supportedTypes) {
+          withInfo(op, "Unsupported data type")
+          return None
+        }
+
+        // These operators are source of Comet native execution chain
+        val scanBuilder = OperatorOuterClass.Scan.newBuilder()
+        val source = op.simpleStringWithNodeId()
+        if (source.isEmpty) {
+          scanBuilder.setSource(op.getClass.getSimpleName)
+        } else {
+          scanBuilder.setSource(source)
+        }
+
+        val ffiSafe = op match {
+          case _ if isExchangeSink(op) =>
+            // Source of broadcast exchange batches is ArrowStreamReader
+            // Source of shuffle exchange batches is NativeBatchDecoderIterator
+            true
+          case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_COMET =>
+            // native_comet scan reuses mutable buffers
+            false
+          case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
+            // native_iceberg_compat scan reuses mutable buffers for constant 
columns
+            // https://github.com/apache/datafusion-comet/issues/2152
+            false
+          case _ =>
+            false
+        }
+        scanBuilder.setArrowFfiSafe(ffiSafe)
+
+        val scanTypes = op.output.flatten { attr =>
+          serializeDataType(attr.dataType)
+        }
+
+        if (scanTypes.length == op.output.length) {
+          scanBuilder.addAllFields(scanTypes.asJava)
+
+          // Sink operators don't have children
+          builder.clearChildren()
+
+          Some(builder.setScan(scanBuilder).build())
+        } else {
+          // There are unsupported scan type
+          withInfo(
+            op,
+            s"unsupported Comet operator: ${op.nodeName}, due to unsupported 
data types above")
+          None
+        }
+
+      case _ =>
+        // Emit warning if:
+        //  1. it is not Spark shuffle operator, which is handled separately
+        //  2. it is not a Comet operator
+        if (serde.isEmpty && !op.nodeName.contains("Comet") &&
+          !op.isInstanceOf[ShuffleExchangeExec]) {
+          withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
+        }
+        None
+    }
+  }
+
+  private def isOperatorEnabled(handler: CometOperatorSerde[_], op: 
SparkPlan): Boolean = {
+    val enabled = handler.enabledConfig.forall(_.get(op.conf))
+    val opName = op.getClass.getSimpleName
+    if (enabled) {
+      val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
+      opSerde.getSupportLevel(op) match {
+        case Unsupported(notes) =>
+          withInfo(op, notes.getOrElse(""))
+          false
+        case Incompatible(notes) =>
+          val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
+          val incompatConf = 
CometConf.getOperatorAllowIncompatConfigKey(opName)
+          if (allowIncompat) {
+            if (notes.isDefined) {
+              logWarning(
+                s"Comet supports $opName when $incompatConf=true " +
+                  s"but has notes: ${notes.get}")
+            }
+            true
+          } else {
+            val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
+            withInfo(
+              op,
+              s"$opName is not fully compatible with Spark$optionalNotes. " +
+                s"To enable it anyway, set $incompatConf=true. " +
+                s"${CometConf.COMPAT_GUIDE}.")
+            false
+          }
+        case Compatible(notes) =>
+          if (notes.isDefined) {
+            logWarning(s"Comet supports $opName but has notes: ${notes.get}")
+          }
+          true
+      }
+    } else {
+      withInfo(
+        op,
+        s"Native support for operator $opName is disabled. " +
+          s"Set ${handler.enabledConfig.get.key}=true to enable it.")
+      false
+    }
+  }
+
+  /**
+   * Whether the input Spark operator `op` can be considered as a Comet sink, 
i.e., the start of
+   * native execution. If it is true, we'll wrap `op` with `CometScanWrapper` 
or
+   * `CometSinkPlaceHolder` later in `CometSparkSessionExtensions` after 
`operator2proto` is
+   * called.
+   */
+  private def isCometSink(op: SparkPlan): Boolean = {
+    if (isExchangeSink(op)) {
+      return true
+    }
+    op match {
+      case s if isCometScan(s) => true
+      case _: CometSparkToColumnarExec => true
+      case _: CometSinkPlaceHolder => true
+      case _: CoalesceExec => true
+      case _: CollectLimitExec => true
+      case _: UnionExec => true
+      case _: TakeOrderedAndProjectExec => true
+      case _ => false
+    }
+  }
+
+  private def isExchangeSink(op: SparkPlan): Boolean = {
+    op match {
+      case _: ShuffleExchangeExec => true
+      case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
+      case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: 
CometShuffleExchangeExec), _) => true
+      case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
+      case BroadcastQueryStageExec(_, ReusedExchangeExec(_, _: 
CometBroadcastExchangeExec), _) =>
+        true
+      case _: BroadcastExchangeExec => true
+      case _ => false
+    }
+  }
+
 }
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index ecb936150..44890c1c9 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -27,22 +27,15 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
 import org.apache.spark.sql.comet._
-import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
 import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, 
ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec}
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
ShuffledHashJoinExec, SortMergeJoinExec}
-import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.withInfo
 import org.apache.comet.expressions._
 import org.apache.comet.serde.ExprOuterClass.{AggExpr, Expr, ScalarFunc}
-import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.Types.{DataType => ProtoDataType}
 import org.apache.comet.serde.Types.DataType._
 import org.apache.comet.serde.literals.CometLiteral
@@ -54,25 +47,6 @@ import org.apache.comet.shims.CometExprShim
  */
 object QueryPlanSerde extends Logging with CometExprShim {
 
-  /**
-   * Mapping of Spark operator class to Comet operator handler.
-   */
-  private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
-    Map(
-      classOf[ProjectExec] -> CometProject,
-      classOf[FilterExec] -> CometFilter,
-      classOf[LocalLimitExec] -> CometLocalLimit,
-      classOf[GlobalLimitExec] -> CometGlobalLimit,
-      classOf[ExpandExec] -> CometExpand,
-      classOf[HashAggregateExec] -> CometHashAggregate,
-      classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
-      classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
-      classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
-      classOf[SortMergeJoinExec] -> CometSortMergeJoin,
-      classOf[SortExec] -> CometSort,
-      classOf[LocalTableScanExec] -> CometLocalTableScan,
-      classOf[WindowExec] -> CometWindow)
-
   private val arrayExpressions: Map[Class[_ <: Expression], 
CometExpressionSerde[_]] = Map(
     classOf[ArrayAppend] -> CometArrayAppend,
     classOf[ArrayCompact] -> CometArrayCompact,
@@ -893,186 +867,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
     Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build())
   }
 
-  /**
-   * Convert a Spark plan operator to a protobuf Comet operator.
-   *
-   * @param op
-   *   Spark plan operator
-   * @param childOp
-   *   previously converted protobuf Comet operators, which will be consumed 
by the Spark plan
-   *   operator as its children
-   * @return
-   *   The converted Comet native operator for the input `op`, or `None` if 
the `op` cannot be
-   *   converted to a native operator.
-   */
-  def operator2Proto(op: SparkPlan, childOp: Operator*): Option[Operator] = {
-    val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
-    childOp.foreach(builder.addChildren)
-
-    // look for registered handler first
-    val serde = opSerdeMap.get(op.getClass)
-    serde match {
-      case Some(handler) if isOperatorEnabled(handler, op) =>
-        val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
-        val maybeConverted = opSerde.convert(op, builder, childOp: _*)
-        if (maybeConverted.isDefined) {
-          return maybeConverted
-        }
-      case _ =>
-    }
-
-    // now handle special cases that cannot be handled as a simple mapping 
from class name
-    // and see if operator can be used as a sink
-    op match {
-
-      // Fully native scan for V1
-      case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_DATAFUSION =>
-        CometNativeScan.convert(scan, builder, childOp: _*)
-
-      case op if isCometSink(op) =>
-        val supportedTypes =
-          op.output.forall(a => supportedDataType(a.dataType, allowComplex = 
true))
-
-        if (!supportedTypes) {
-          withInfo(op, "Unsupported data type")
-          return None
-        }
-
-        // These operators are source of Comet native execution chain
-        val scanBuilder = OperatorOuterClass.Scan.newBuilder()
-        val source = op.simpleStringWithNodeId()
-        if (source.isEmpty) {
-          scanBuilder.setSource(op.getClass.getSimpleName)
-        } else {
-          scanBuilder.setSource(source)
-        }
-
-        val ffiSafe = op match {
-          case _ if isExchangeSink(op) =>
-            // Source of broadcast exchange batches is ArrowStreamReader
-            // Source of shuffle exchange batches is NativeBatchDecoderIterator
-            true
-          case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_COMET =>
-            // native_comet scan reuses mutable buffers
-            false
-          case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
-            // native_iceberg_compat scan reuses mutable buffers for constant 
columns
-            // https://github.com/apache/datafusion-comet/issues/2152
-            false
-          case _ =>
-            false
-        }
-        scanBuilder.setArrowFfiSafe(ffiSafe)
-
-        val scanTypes = op.output.flatten { attr =>
-          serializeDataType(attr.dataType)
-        }
-
-        if (scanTypes.length == op.output.length) {
-          scanBuilder.addAllFields(scanTypes.asJava)
-
-          // Sink operators don't have children
-          builder.clearChildren()
-
-          Some(builder.setScan(scanBuilder).build())
-        } else {
-          // There are unsupported scan type
-          withInfo(
-            op,
-            s"unsupported Comet operator: ${op.nodeName}, due to unsupported 
data types above")
-          None
-        }
-
-      case _ =>
-        // Emit warning if:
-        //  1. it is not Spark shuffle operator, which is handled separately
-        //  2. it is not a Comet operator
-        if (serde.isEmpty && !op.nodeName.contains("Comet") &&
-          !op.isInstanceOf[ShuffleExchangeExec]) {
-          withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
-        }
-        None
-    }
-  }
-
-  private def isOperatorEnabled(handler: CometOperatorSerde[_], op: 
SparkPlan): Boolean = {
-    val enabled = handler.enabledConfig.forall(_.get(op.conf))
-    val opName = op.getClass.getSimpleName
-    if (enabled) {
-      val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
-      opSerde.getSupportLevel(op) match {
-        case Unsupported(notes) =>
-          withInfo(op, notes.getOrElse(""))
-          false
-        case Incompatible(notes) =>
-          val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
-          val incompatConf = 
CometConf.getOperatorAllowIncompatConfigKey(opName)
-          if (allowIncompat) {
-            if (notes.isDefined) {
-              logWarning(
-                s"Comet supports $opName when $incompatConf=true " +
-                  s"but has notes: ${notes.get}")
-            }
-            true
-          } else {
-            val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
-            withInfo(
-              op,
-              s"$opName is not fully compatible with Spark$optionalNotes. " +
-                s"To enable it anyway, set $incompatConf=true. " +
-                s"${CometConf.COMPAT_GUIDE}.")
-            false
-          }
-        case Compatible(notes) =>
-          if (notes.isDefined) {
-            logWarning(s"Comet supports $opName but has notes: ${notes.get}")
-          }
-          true
-      }
-    } else {
-      withInfo(
-        op,
-        s"Native support for operator $opName is disabled. " +
-          s"Set ${handler.enabledConfig.get.key}=true to enable it.")
-      false
-    }
-  }
-
-  /**
-   * Whether the input Spark operator `op` can be considered as a Comet sink, 
i.e., the start of
-   * native execution. If it is true, we'll wrap `op` with `CometScanWrapper` 
or
-   * `CometSinkPlaceHolder` later in `CometSparkSessionExtensions` after 
`operator2proto` is
-   * called.
-   */
-  private def isCometSink(op: SparkPlan): Boolean = {
-    if (isExchangeSink(op)) {
-      return true
-    }
-    op match {
-      case s if isCometScan(s) => true
-      case _: CometSparkToColumnarExec => true
-      case _: CometSinkPlaceHolder => true
-      case _: CoalesceExec => true
-      case _: CollectLimitExec => true
-      case _: UnionExec => true
-      case _: TakeOrderedAndProjectExec => true
-      case _ => false
-    }
-  }
-
-  private def isExchangeSink(op: SparkPlan): Boolean = {
-    op match {
-      case _: ShuffleExchangeExec => true
-      case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
-      case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: 
CometShuffleExchangeExec), _) => true
-      case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
-      case BroadcastQueryStageExec(_, ReusedExchangeExec(_, _: 
CometBroadcastExchangeExec), _) =>
-        true
-      case _: BroadcastExchangeExec => true
-      case _ => false
-    }
-  }
-
   // Utility method. Adds explain info if the result of calling exprToProto is 
None
   def optExprWithInfo(
       optExpr: Option[Expr],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to