EmilyMatt commented on code in PR #1390:
URL: https://github.com/apache/datafusion-comet/pull/1390#discussion_r1963121789


##########
common/src/main/scala/org/apache/comet/CometConf.scala:
##########
@@ -216,6 +216,17 @@ object CometConf extends ShimCometConf {
   val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
     createExecEnabledConfig("initCap", defaultValue = false)
 
+  val COMET_EXEC_AGGREGATE_ENFORCE_RESULTS: ConfigEntry[Boolean] =
+    conf("spark.comet.exec.aggregate.enforceResults")
+      .doc("Whether to enforce converting results in the Final stage of a 
HashAggregate, " +
+        "When enabled, Final-mode hashAggregates will not be converted to 
Comet, this can cause " +
+        "issues when native shuffle is enabled. " +
+        "If this is disabled, unsupported result expressions will be " +
+        "separated into a ProjectExec to allow HashAggregate to complete 
natively. " +
+        "This is disabled by default.")

Review Comment:
   @kazuyukitanimura 
   Apologies, I believe I intended to say when native shuffle is disabled(or 
more correctly - when columnar shuffle is used)^
   Essentially whenever the Schema would come from Spark and not from the 
data/batch itself, we will start having problems.
   
   Looking at the tpcds tests, we can see an example:
   `HashAggregate [cp_catalog_page_id,sum,sum,isEmpty,sum,isEmpty] 
[sum(UnscaledValue(cs_ext_sales_price)),sum(coalesce(cast(cr_return_amount as 
decimal(12,2)), 0.00)),sum((cs_net_profit - coalesce(cast(cr_net_loss as 
decimal(12,2)), 
0.00))),sales,returns,profit,channel,id,sum,sum,isEmpty,sum,isEmpty]
                               CometColumnarToRow
                                 InputAdapter
                                   CometExchange [cp_catalog_page_id] #10
                                     CometHashAggregate `
    
    See how the Partial HA is a Comet one, and the Final one is a HashAggregate 
with a conversion,
    This is the current implementation, and works for both shuffles, as comet 
still produces the data Spark expects.
    However, the moment we'll output something Spark does not expect(like 
having the partial results of a CollectSet, the columnar shuffle will crash, 
due to the mismatch in data types)
    
    The issue with the shuffle can probably be circumvented by shuffling the 
aggregate buffer as a binary column regardless of its datatype, then reforming 
it in the Final aggregate, that way both shuffles will function.
    However, as discussed in the issue, this will only delay the inevitable in 
cases such as an unsupported ResultExpression, as a CometColumnarToRow will not 
recreate the expected data type a regular HashAggregate will expect, and there 
is no other path forward but to let the Comet aggregate expressions run their 
course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to