zhztheplayer commented on code in PR #5007:
URL: https://github.com/apache/incubator-gluten/pull/5007#discussion_r1591954924
##########
gluten-core/src/main/scala/io/glutenproject/extension/columnar/RewriteTypedImperativeAggregate.scala:
##########
@@ -67,6 +100,28 @@ object RewriteTypedImperativeAggregate extends
Rule[SparkPlan] with PullOutProje
}
copyBaseAggregateExec(agg)(newResultExpressions = newResultExpressions)
+ case agg: BaseAggregateExec
+ if
agg.aggregateExpressions.exists(shouldRewriteForPercentileLikeExpr) =>
+ val exprMap = agg.aggregateExpressions
+ .filter(shouldRewriteForPercentileLikeExpr)
+ .map(ae => ae.aggregateFunction.inputAggBufferAttributes.head -> ae)
+ .toMap
+ val newResultExpressions = agg.resultExpressions.map {
+ case attr: AttributeReference =>
+ exprMap
+ .get(attr)
+ .map {
+ ae =>
+ attr.copy(dataType =
getPercentileLikeInterminateDataType(ae.aggregateFunction))(
+ exprId = attr.exprId,
+ qualifier = attr.qualifier
+ )
+ }
+ .getOrElse(attr)
+ case other => other
+ }
+ copyBaseAggregateExec(agg)(newResultExpressions = newResultExpressions)
Review Comment:
IIUC this could make the offloaded partial aggregate operator incompatible
with vanilla Spark's final aggregate operator? Say if final aggregation is
fallen back then UB will be led.
I am considering whether we could use the approach like we had done for
`approx_count_distinct`, to replace `approx_percentile` with something like
`velox_approx_percentile` that has different intermediate type definition at
the very early planning phase? Then we can distinguish between vanilla's and
Velox's implementations for this function.
##########
gluten-core/src/main/scala/io/glutenproject/extension/columnar/RewriteTypedImperativeAggregate.scala:
##########
@@ -67,6 +100,28 @@ object RewriteTypedImperativeAggregate extends
Rule[SparkPlan] with PullOutProje
}
copyBaseAggregateExec(agg)(newResultExpressions = newResultExpressions)
+ case agg: BaseAggregateExec
+ if
agg.aggregateExpressions.exists(shouldRewriteForPercentileLikeExpr) =>
+ val exprMap = agg.aggregateExpressions
+ .filter(shouldRewriteForPercentileLikeExpr)
+ .map(ae => ae.aggregateFunction.inputAggBufferAttributes.head -> ae)
+ .toMap
+ val newResultExpressions = agg.resultExpressions.map {
+ case attr: AttributeReference =>
+ exprMap
+ .get(attr)
+ .map {
+ ae =>
+ attr.copy(dataType =
getPercentileLikeInterminateDataType(ae.aggregateFunction))(
+ exprId = attr.exprId,
+ qualifier = attr.qualifier
+ )
+ }
+ .getOrElse(attr)
+ case other => other
+ }
+ copyBaseAggregateExec(agg)(newResultExpressions = newResultExpressions)
Review Comment:
IIUC this could make the offloaded partial aggregate operator incompatible
with vanilla Spark's final aggregate operator? Say if final aggregation is
fallen back then UB will be led.
I am considering whether we could use the approach like we had done for
`approx_count_distinct` (https://github.com/apache/incubator-gluten/pull/1676),
to replace `approx_percentile` with something like `velox_approx_percentile`
that has different intermediate type definition at the very early planning
phase? Then we can distinguish between vanilla's and Velox's implementations
for this function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]