EmilyMatt commented on code in PR #1390: URL: https://github.com/apache/datafusion-comet/pull/1390#discussion_r1963121789
########## common/src/main/scala/org/apache/comet/CometConf.scala: ########## @@ -216,6 +216,17 @@ object CometConf extends ShimCometConf { val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("initCap", defaultValue = false) + val COMET_EXEC_AGGREGATE_ENFORCE_RESULTS: ConfigEntry[Boolean] = + conf("spark.comet.exec.aggregate.enforceResults") + .doc("Whether to enforce converting results in the Final stage of a HashAggregate, " + + "When enabled, Final-mode hashAggregates will not be converted to Comet, this can cause " + + "issues when native shuffle is enabled. " + + "If this is disabled, unsupported result expressions will be " + + "separated into a ProjectExec to allow HashAggregate to complete natively. " + + "This is disabled by default.") Review Comment: @kazuyukitanimura Apologies, I believe I intended to say when native shuffle is disabled(or more correctly - when columnar shuffle is used)^ Essentially whenever the Schema would come from Spark and not from the data/batch itself, we will start having problems. Looking at the tpcds tests, we can see an example: `HashAggregate [cp_catalog_page_id,sum,sum,isEmpty,sum,isEmpty] [sum(UnscaledValue(cs_ext_sales_price)),sum(coalesce(cast(cr_return_amount as decimal(12,2)), 0.00)),sum((cs_net_profit - coalesce(cast(cr_net_loss as decimal(12,2)), 0.00))),sales,returns,profit,channel,id,sum,sum,isEmpty,sum,isEmpty] CometColumnarToRow InputAdapter CometExchange [cp_catalog_page_id] #10 CometHashAggregate ` See how the Partial HA is a Comet one, and the Final one is a HashAggregate with a conversion, This is the current implementation, and works for both shuffles, as comet still produces the data Spark expects. However, the moment we'll output something Spark does not expect(like having the partial results of a CollectSet, the columnar shuffle will crash, due to the mismatch in data types) The issue with the shuffle can probably be circumvented by shuffling the aggregate buffer as a binary column regardless of its datatype, then reforming it in the Final aggregate, that way both shuffles will function. However, as discussed in the issue, this will only delay the inevitable in cases such as an unsupported ResultExpression, as a CometColumnarToRow will not recreate the expected data type a regular HashAggregate will expect, and there is no other path forward but to let the Comet aggregate expressions run their course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org