This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5fa091e7c minor: Move `operator2Proto` to `CometExecRule` (#2767)
5fa091e7c is described below
commit 5fa091e7ce78827f4763416cc18785d058d394fa
Author: Andy Grove <[email protected]>
AuthorDate: Thu Nov 13 09:08:17 2025 -0700
minor: Move `operator2Proto` to `CometExecRule` (#2767)
* move operator2Proto to CometExecRule
* rename methods
---
.../org/apache/comet/rules/CometExecRule.scala | 241 +++++++++++++++++++--
.../org/apache/comet/serde/QueryPlanSerde.scala | 208 +-----------------
2 files changed, 222 insertions(+), 227 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index c10a8b5af..a0dc8ccb1 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -20,6 +20,7 @@
package org.apache.comet.rules
import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral,
EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan,
GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual,
NamedExpression, Remainder}
@@ -43,8 +44,34 @@ import org.apache.spark.sql.types._
import org.apache.comet.{CometConf, ExtendedExplainInfo}
import org.apache.comet.CometConf.COMET_EXEC_SHUFFLE_ENABLED
import org.apache.comet.CometSparkSessionExtensions._
+import org.apache.comet.rules.CometExecRule.opSerdeMap
+import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible,
OperatorOuterClass, QueryPlanSerde, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde
+import org.apache.comet.serde.QueryPlanSerde.{serializeDataType,
supportedDataType}
+import org.apache.comet.serde.operator._
+
+object CometExecRule {
+
+ /**
+ * Mapping of Spark operator class to Comet operator handler.
+ */
+ val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
+ Map(
+ classOf[ProjectExec] -> CometProject,
+ classOf[FilterExec] -> CometFilter,
+ classOf[LocalLimitExec] -> CometLocalLimit,
+ classOf[GlobalLimitExec] -> CometGlobalLimit,
+ classOf[ExpandExec] -> CometExpand,
+ classOf[HashAggregateExec] -> CometHashAggregate,
+ classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
+ classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
+ classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
+ classOf[SortMergeJoinExec] -> CometSortMergeJoin,
+ classOf[SortExec] -> CometSort,
+ classOf[LocalTableScanExec] -> CometLocalTableScan,
+ classOf[WindowExec] -> CometWindow)
+
+}
/**
* Spark physical optimizer rule for replacing Spark operators with Comet
operators.
@@ -137,11 +164,9 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
*/
// spotless:on
private def transform(plan: SparkPlan): SparkPlan = {
- def operator2Proto(op: SparkPlan): Option[Operator] = {
+ def operator2ProtoIfAllChildrenAreNative(op: SparkPlan): Option[Operator]
= {
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
- QueryPlanSerde.operator2Proto(
- op,
- op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
+ operator2Proto(op,
op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
} else {
None
}
@@ -151,23 +176,23 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
* Convert operator to proto and then apply a transformation to wrap the
proto in a new plan.
*/
def newPlanWithProto(op: SparkPlan, fun: Operator => SparkPlan): SparkPlan
= {
- operator2Proto(op).map(fun).getOrElse(op)
+ operator2ProtoIfAllChildrenAreNative(op).map(fun).getOrElse(op)
}
def convertNode(op: SparkPlan): SparkPlan = op match {
// Fully native scan for V1
case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION =>
- val nativeOp = QueryPlanSerde.operator2Proto(scan).get
+ val nativeOp = operator2Proto(scan).get
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
// Comet JVM + native scan for V1 and V2
case op if isCometScan(op) =>
- val nativeOp = QueryPlanSerde.operator2Proto(op)
+ val nativeOp = operator2Proto(op)
CometScanWrapper(nativeOp.get, op)
case op if shouldApplySparkToColumnar(conf, op) =>
val cometOp = CometSparkToColumnarExec(op)
- val nativeOp = QueryPlanSerde.operator2Proto(cometOp)
+ val nativeOp = operator2Proto(cometOp)
CometScanWrapper(nativeOp.get, cometOp)
case op: ProjectExec =>
@@ -215,8 +240,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
// no reason to report reason if child is not native
op
} else {
- QueryPlanSerde
- .operator2Proto(op)
+ operator2Proto(op)
.map { nativeOp =>
val cometOp =
CometCollectLimitExec(op, op.limit, op.offset, op.child)
@@ -339,8 +363,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
case c @ CoalesceExec(numPartitions, child)
if CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf)
&& isCometNative(child) =>
- QueryPlanSerde
- .operator2Proto(c)
+ operator2Proto(c)
.map { nativeOp =>
val cometOp = CometCoalesceExec(c, c.output, numPartitions, child)
CometSinkPlaceHolder(nativeOp, c, cometOp)
@@ -358,8 +381,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
.get(conf)
&& isCometShuffleEnabled(conf) &&
CometTakeOrderedAndProjectExec.isSupported(s) =>
- QueryPlanSerde
- .operator2Proto(s)
+ operator2Proto(s)
.map { nativeOp =>
val cometOp =
CometTakeOrderedAndProjectExec(
@@ -430,7 +452,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
- QueryPlanSerde.operator2Proto(b) match {
+ operator2Proto(b) match {
case Some(nativeOp) =>
val cometOp = CometBroadcastExchangeExec(b, b.output, b.mode,
b.child)
CometSinkPlaceHolder(nativeOp, b, cometOp)
@@ -478,7 +500,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
case s: ShuffleExchangeExec =>
val nativeShuffle: Option[SparkPlan] =
if (nativeShuffleSupported(s)) {
- val newOp = operator2Proto(s)
+ val newOp = operator2ProtoIfAllChildrenAreNative(s)
newOp match {
case Some(nativeOp) =>
// Switch to use Decimal128 regardless of precision, since
Arrow native execution
@@ -501,7 +523,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
// If the child of ShuffleExchangeExec is also a
ShuffleExchangeExec, we should not
// convert it to CometColumnarShuffle,
if (columnarShuffleSupported(s)) {
- val newOp = QueryPlanSerde.operator2Proto(s)
+ val newOp = operator2Proto(s)
newOp match {
case Some(nativeOp) =>
s.child match {
@@ -528,8 +550,7 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
case op: LocalTableScanExec =>
if (CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.get(conf)) {
- QueryPlanSerde
- .operator2Proto(op)
+ operator2Proto(op)
.map { nativeOp =>
val cometOp = CometLocalTableScanExec(op, op.rows, op.output)
CometScanWrapper(nativeOp, cometOp)
@@ -993,4 +1014,184 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
}
}
+ /**
+ * Convert a Spark plan operator to a protobuf Comet operator.
+ *
+ * @param op
+ * Spark plan operator
+ * @param childOp
+ * previously converted protobuf Comet operators, which will be consumed
by the Spark plan
+ * operator as its children
+ * @return
+ * The converted Comet native operator for the input `op`, or `None` if
the `op` cannot be
+ * converted to a native operator.
+ */
+ private def operator2Proto(op: SparkPlan, childOp: Operator*):
Option[Operator] = {
+ val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
+ childOp.foreach(builder.addChildren)
+
+ // look for registered handler first
+ val serde = opSerdeMap.get(op.getClass)
+ serde match {
+ case Some(handler) if isOperatorEnabled(handler, op) =>
+ val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
+ val maybeConverted = opSerde.convert(op, builder, childOp: _*)
+ if (maybeConverted.isDefined) {
+ return maybeConverted
+ }
+ case _ =>
+ }
+
+ // now handle special cases that cannot be handled as a simple mapping
from class name
+ // and see if operator can be used as a sink
+ op match {
+
+ // Fully native scan for V1
+ case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION =>
+ CometNativeScan.convert(scan, builder, childOp: _*)
+
+ case op if isCometSink(op) =>
+ val supportedTypes =
+ op.output.forall(a => supportedDataType(a.dataType, allowComplex =
true))
+
+ if (!supportedTypes) {
+ withInfo(op, "Unsupported data type")
+ return None
+ }
+
+ // These operators are source of Comet native execution chain
+ val scanBuilder = OperatorOuterClass.Scan.newBuilder()
+ val source = op.simpleStringWithNodeId()
+ if (source.isEmpty) {
+ scanBuilder.setSource(op.getClass.getSimpleName)
+ } else {
+ scanBuilder.setSource(source)
+ }
+
+ val ffiSafe = op match {
+ case _ if isExchangeSink(op) =>
+ // Source of broadcast exchange batches is ArrowStreamReader
+ // Source of shuffle exchange batches is NativeBatchDecoderIterator
+ true
+ case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_COMET =>
+ // native_comet scan reuses mutable buffers
+ false
+ case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
+ // native_iceberg_compat scan reuses mutable buffers for constant
columns
+ // https://github.com/apache/datafusion-comet/issues/2152
+ false
+ case _ =>
+ false
+ }
+ scanBuilder.setArrowFfiSafe(ffiSafe)
+
+ val scanTypes = op.output.flatten { attr =>
+ serializeDataType(attr.dataType)
+ }
+
+ if (scanTypes.length == op.output.length) {
+ scanBuilder.addAllFields(scanTypes.asJava)
+
+ // Sink operators don't have children
+ builder.clearChildren()
+
+ Some(builder.setScan(scanBuilder).build())
+ } else {
+ // There are unsupported scan type
+ withInfo(
+ op,
+ s"unsupported Comet operator: ${op.nodeName}, due to unsupported
data types above")
+ None
+ }
+
+ case _ =>
+ // Emit warning if:
+ // 1. it is not Spark shuffle operator, which is handled separately
+ // 2. it is not a Comet operator
+ if (serde.isEmpty && !op.nodeName.contains("Comet") &&
+ !op.isInstanceOf[ShuffleExchangeExec]) {
+ withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
+ }
+ None
+ }
+ }
+
+ private def isOperatorEnabled(handler: CometOperatorSerde[_], op:
SparkPlan): Boolean = {
+ val enabled = handler.enabledConfig.forall(_.get(op.conf))
+ val opName = op.getClass.getSimpleName
+ if (enabled) {
+ val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
+ opSerde.getSupportLevel(op) match {
+ case Unsupported(notes) =>
+ withInfo(op, notes.getOrElse(""))
+ false
+ case Incompatible(notes) =>
+ val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
+ val incompatConf =
CometConf.getOperatorAllowIncompatConfigKey(opName)
+ if (allowIncompat) {
+ if (notes.isDefined) {
+ logWarning(
+ s"Comet supports $opName when $incompatConf=true " +
+ s"but has notes: ${notes.get}")
+ }
+ true
+ } else {
+ val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
+ withInfo(
+ op,
+ s"$opName is not fully compatible with Spark$optionalNotes. " +
+ s"To enable it anyway, set $incompatConf=true. " +
+ s"${CometConf.COMPAT_GUIDE}.")
+ false
+ }
+ case Compatible(notes) =>
+ if (notes.isDefined) {
+ logWarning(s"Comet supports $opName but has notes: ${notes.get}")
+ }
+ true
+ }
+ } else {
+ withInfo(
+ op,
+ s"Native support for operator $opName is disabled. " +
+ s"Set ${handler.enabledConfig.get.key}=true to enable it.")
+ false
+ }
+ }
+
+ /**
+ * Whether the input Spark operator `op` can be considered as a Comet sink,
i.e., the start of
+ * native execution. If it is true, we'll wrap `op` with `CometScanWrapper`
or
+ * `CometSinkPlaceHolder` later in `CometSparkSessionExtensions` after
`operator2proto` is
+ * called.
+ */
+ private def isCometSink(op: SparkPlan): Boolean = {
+ if (isExchangeSink(op)) {
+ return true
+ }
+ op match {
+ case s if isCometScan(s) => true
+ case _: CometSparkToColumnarExec => true
+ case _: CometSinkPlaceHolder => true
+ case _: CoalesceExec => true
+ case _: CollectLimitExec => true
+ case _: UnionExec => true
+ case _: TakeOrderedAndProjectExec => true
+ case _ => false
+ }
+ }
+
+ private def isExchangeSink(op: SparkPlan): Boolean = {
+ op match {
+ case _: ShuffleExchangeExec => true
+ case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
+ case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _:
CometShuffleExchangeExec), _) => true
+ case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
+ case BroadcastQueryStageExec(_, ReusedExchangeExec(_, _:
CometBroadcastExchangeExec), _) =>
+ true
+ case _: BroadcastExchangeExec => true
+ case _ => false
+ }
+ }
+
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index ecb936150..44890c1c9 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -27,22 +27,15 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.comet._
-import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec,
ShuffleQueryStageExec}
-import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeExec}
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
ShuffledHashJoinExec, SortMergeJoinExec}
-import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions._
import org.apache.comet.serde.ExprOuterClass.{AggExpr, Expr, ScalarFunc}
-import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.Types.{DataType => ProtoDataType}
import org.apache.comet.serde.Types.DataType._
import org.apache.comet.serde.literals.CometLiteral
@@ -54,25 +47,6 @@ import org.apache.comet.shims.CometExprShim
*/
object QueryPlanSerde extends Logging with CometExprShim {
- /**
- * Mapping of Spark operator class to Comet operator handler.
- */
- private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
- Map(
- classOf[ProjectExec] -> CometProject,
- classOf[FilterExec] -> CometFilter,
- classOf[LocalLimitExec] -> CometLocalLimit,
- classOf[GlobalLimitExec] -> CometGlobalLimit,
- classOf[ExpandExec] -> CometExpand,
- classOf[HashAggregateExec] -> CometHashAggregate,
- classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate,
- classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin,
- classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin,
- classOf[SortMergeJoinExec] -> CometSortMergeJoin,
- classOf[SortExec] -> CometSort,
- classOf[LocalTableScanExec] -> CometLocalTableScan,
- classOf[WindowExec] -> CometWindow)
-
private val arrayExpressions: Map[Class[_ <: Expression],
CometExpressionSerde[_]] = Map(
classOf[ArrayAppend] -> CometArrayAppend,
classOf[ArrayCompact] -> CometArrayCompact,
@@ -893,186 +867,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build())
}
- /**
- * Convert a Spark plan operator to a protobuf Comet operator.
- *
- * @param op
- * Spark plan operator
- * @param childOp
- * previously converted protobuf Comet operators, which will be consumed
by the Spark plan
- * operator as its children
- * @return
- * The converted Comet native operator for the input `op`, or `None` if
the `op` cannot be
- * converted to a native operator.
- */
- def operator2Proto(op: SparkPlan, childOp: Operator*): Option[Operator] = {
- val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
- childOp.foreach(builder.addChildren)
-
- // look for registered handler first
- val serde = opSerdeMap.get(op.getClass)
- serde match {
- case Some(handler) if isOperatorEnabled(handler, op) =>
- val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
- val maybeConverted = opSerde.convert(op, builder, childOp: _*)
- if (maybeConverted.isDefined) {
- return maybeConverted
- }
- case _ =>
- }
-
- // now handle special cases that cannot be handled as a simple mapping
from class name
- // and see if operator can be used as a sink
- op match {
-
- // Fully native scan for V1
- case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_DATAFUSION =>
- CometNativeScan.convert(scan, builder, childOp: _*)
-
- case op if isCometSink(op) =>
- val supportedTypes =
- op.output.forall(a => supportedDataType(a.dataType, allowComplex =
true))
-
- if (!supportedTypes) {
- withInfo(op, "Unsupported data type")
- return None
- }
-
- // These operators are source of Comet native execution chain
- val scanBuilder = OperatorOuterClass.Scan.newBuilder()
- val source = op.simpleStringWithNodeId()
- if (source.isEmpty) {
- scanBuilder.setSource(op.getClass.getSimpleName)
- } else {
- scanBuilder.setSource(source)
- }
-
- val ffiSafe = op match {
- case _ if isExchangeSink(op) =>
- // Source of broadcast exchange batches is ArrowStreamReader
- // Source of shuffle exchange batches is NativeBatchDecoderIterator
- true
- case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_COMET =>
- // native_comet scan reuses mutable buffers
- false
- case scan: CometScanExec if scan.scanImpl ==
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
- // native_iceberg_compat scan reuses mutable buffers for constant
columns
- // https://github.com/apache/datafusion-comet/issues/2152
- false
- case _ =>
- false
- }
- scanBuilder.setArrowFfiSafe(ffiSafe)
-
- val scanTypes = op.output.flatten { attr =>
- serializeDataType(attr.dataType)
- }
-
- if (scanTypes.length == op.output.length) {
- scanBuilder.addAllFields(scanTypes.asJava)
-
- // Sink operators don't have children
- builder.clearChildren()
-
- Some(builder.setScan(scanBuilder).build())
- } else {
- // There are unsupported scan type
- withInfo(
- op,
- s"unsupported Comet operator: ${op.nodeName}, due to unsupported
data types above")
- None
- }
-
- case _ =>
- // Emit warning if:
- // 1. it is not Spark shuffle operator, which is handled separately
- // 2. it is not a Comet operator
- if (serde.isEmpty && !op.nodeName.contains("Comet") &&
- !op.isInstanceOf[ShuffleExchangeExec]) {
- withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
- }
- None
- }
- }
-
- private def isOperatorEnabled(handler: CometOperatorSerde[_], op:
SparkPlan): Boolean = {
- val enabled = handler.enabledConfig.forall(_.get(op.conf))
- val opName = op.getClass.getSimpleName
- if (enabled) {
- val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
- opSerde.getSupportLevel(op) match {
- case Unsupported(notes) =>
- withInfo(op, notes.getOrElse(""))
- false
- case Incompatible(notes) =>
- val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
- val incompatConf =
CometConf.getOperatorAllowIncompatConfigKey(opName)
- if (allowIncompat) {
- if (notes.isDefined) {
- logWarning(
- s"Comet supports $opName when $incompatConf=true " +
- s"but has notes: ${notes.get}")
- }
- true
- } else {
- val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
- withInfo(
- op,
- s"$opName is not fully compatible with Spark$optionalNotes. " +
- s"To enable it anyway, set $incompatConf=true. " +
- s"${CometConf.COMPAT_GUIDE}.")
- false
- }
- case Compatible(notes) =>
- if (notes.isDefined) {
- logWarning(s"Comet supports $opName but has notes: ${notes.get}")
- }
- true
- }
- } else {
- withInfo(
- op,
- s"Native support for operator $opName is disabled. " +
- s"Set ${handler.enabledConfig.get.key}=true to enable it.")
- false
- }
- }
-
- /**
- * Whether the input Spark operator `op` can be considered as a Comet sink,
i.e., the start of
- * native execution. If it is true, we'll wrap `op` with `CometScanWrapper`
or
- * `CometSinkPlaceHolder` later in `CometSparkSessionExtensions` after
`operator2proto` is
- * called.
- */
- private def isCometSink(op: SparkPlan): Boolean = {
- if (isExchangeSink(op)) {
- return true
- }
- op match {
- case s if isCometScan(s) => true
- case _: CometSparkToColumnarExec => true
- case _: CometSinkPlaceHolder => true
- case _: CoalesceExec => true
- case _: CollectLimitExec => true
- case _: UnionExec => true
- case _: TakeOrderedAndProjectExec => true
- case _ => false
- }
- }
-
- private def isExchangeSink(op: SparkPlan): Boolean = {
- op match {
- case _: ShuffleExchangeExec => true
- case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
- case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _:
CometShuffleExchangeExec), _) => true
- case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
- case BroadcastQueryStageExec(_, ReusedExchangeExec(_, _:
CometBroadcastExchangeExec), _) =>
- true
- case _: BroadcastExchangeExec => true
- case _ => false
- }
- }
-
// Utility method. Adds explain info if the result of calling exprToProto is
None
def optExprWithInfo(
optExpr: Option[Expr],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]