rf972 commented on a change in pull request #29695:
URL: https://github.com/apache/spark/pull/29695#discussion_r529825209



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -73,33 +77,25 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
               postScanFilters)
             Aggregate(groupingExpressions, resultExpressions, plan)
           } else {
-            val resultAttributes = resultExpressions.map(_.toAttribute)
-              .map ( e => e match { case a: AttributeReference => a })
-            var index = 0
             val aggOutputBuilder = ArrayBuilder.make[AttributeReference]
-            for (a <- resultAttributes) {
-              val newName = if (a.name.contains("FILTER")) {
-                a.name.substring(0, a.name.indexOf("FILTER") - 1)
-              } else if (a.name.contains("DISTINCT")) {
-                a.name.replace("DISTINCT ", "")
-              } else {
-                a.name
-              }
-
-              aggOutputBuilder +=
-                a.copy(name = newName,
-                  dataType = aggregates(index).dataType)(exprId = 
NamedExpression.newExprId,
-                  qualifier = a.qualifier)
-              index += 1
+            for (a <- aggregates) {
+                aggOutputBuilder += AttributeReference(toPrettySQL(a), 
a.dataType)()
             }
             val aggOutput = aggOutputBuilder.result
 
-            var newOutput = aggOutput
-            for (col <- output) {
-              if (!aggOutput.exists(_.name.contains(col.name))) {
-                newOutput = col +: newOutput
+            val newOutputBuilder = ArrayBuilder.make[AttributeReference]
+            for (col1 <- output) {
+              var found = false
+              for (col2 <- aggOutput) {
+                  if (contains(col2.name, col1.name)) {

Review comment:
       Thanks @huaxingao  for incorporating our suggestions into the patch 
regarding aggregates containing expressions!  
   With your changes, TPCH Q6 test shows a substantial reduction in data 
transfer.  
   With just filter and projection pushdown, Q6 transfer size is about 1.5 MB.  
But with aggregate pushdown this now gets reduced to 17 bytes.
   
   In our testing we found a case that throws an error.  Grouping by more than 
one column seems to cause an error.  Here is an example that fails for us:
   
   ```
   val df = sparkSession.sql("select BONUS, SUM(SALARY+BONUS), SALARY FROM 
h2.test.employee" + 
          " GROUP BY SALARY, BONUS")
   df.show()
   ```
   
   If you have any thoughts or suggestions on a solution for this, we would 
appreciate it.  Thanks !
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to