sunchao commented on code in PR #100:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/100#discussion_r1505366024
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -451,6 +468,23 @@ class CometSparkSessionExtensions
}
}
}
+
+ // CometExec already wraps a `ColumnarToRowExec` for row-based operators.
Therefore,
+ // `ColumnarToRowExec` is redundant and can be eliminated.
+ //
+ // It was added during ApplyColumnarRulesAndInsertTransitions'
insertTransitions phase when Spark
+ // requests row-based output such as `collect` call. It's correct to add a
redundant
+ // `ColumnarToRowExec` for `CometExec`. However, for certain operators such
as
+ // `CometCollectLimitExec` which overrides `executeCollect`, the redundant
`ColumnarToRowExec`
+ // makes the override ineffective. The purpose of this rule is to eliminate
the redundant
+ // `ColumnarToRowExec` for such operators.
+ case class EliminateRedundantColumnarToRow(session: SparkSession) extends
Rule[SparkPlan] {
Review Comment:
Thanks, makes sense. This also removes `ColumnarToRowExec` from the plan
even if instead of `executeCollect`, `doExecute` is used, but I think it is OK
since `doExecute` itself calls `ColumnarToRowExec`
##########
spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala:
##########
@@ -32,4 +34,19 @@ trait ShimCometSparkSessionExtensions {
.map { a => a.setAccessible(true); a }
.flatMap(_.get(scan).asInstanceOf[Option[Aggregation]])
.headOption
+
+ /**
+ * TODO: delete after dropping Spark 3.2 and 3.3 support
+ */
+ def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)
Review Comment:
Looks good 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]