maropu commented on issue #26420: [SPARK-27986][SQL] Support ANSI SQL filter 
predicate for aggregate expression.
URL: https://github.com/apache/spark/pull/26420#issuecomment-552186656
 
 
   I meant we might not need to modify the physical plans for aggregates (e.g., 
HashAggregateExec). Instead, in the analyzer phase, we might be able to 
transform filter expressions into projections as shown above;
   ```
   // For the query "select k, sum(v1) filter (where v1 > 2), avg(v2) filter 
(where v2 < 6) from t group by k"
   scala> sql("select k, sum(v1), avg(v2) from (select k, if(v1 > 2, v1, null) 
v1, if(v2 < 6, v2, null) v2 from t) group by k").show()
   +---+-------+-------+
   |  k|sum(v1)|avg(v2)|
   +---+-------+-------+
   |  1|      4|    4.0|
   |  2|      8|   null|
   +---+-------+-------+
   
   scala> sql("select k, sum(v1), avg(v2) from (select k, if(v1 > 2, v1, null) 
v1, if(v2 < 6, v2, null) v2 from t) group by k").explain(true)
   == Parsed Logical Plan ==
   'Aggregate ['k], ['k, unresolvedalias('sum('v1), None), 
unresolvedalias('avg('v2), None)]
   +- 'SubqueryAlias `__auto_generated_subquery_name`
      +- 'Project ['k, 'if(('v1 > 2), 'v1, null) AS v1#190, 'if(('v2 < 6), 'v2, 
null) AS v2#191]
         +- 'UnresolvedRelation [t]
   
   == Analyzed Logical Plan ==
   k: int, sum(v1): bigint, avg(v2): double
   Aggregate [k#127], [k#127, sum(cast(v1#190 as bigint)) AS sum(v1)#194L, 
avg(cast(v2#191 as bigint)) AS avg(v2)#195]
   +- SubqueryAlias `__auto_generated_subquery_name`
      +- Project [k#127, if ((v1#128 > 2)) v1#128 else cast(null as int) AS 
v1#190, if ((v2#129 < 6)) v2#129 else cast(null as int) AS v2#191]
         +- SubqueryAlias `default`.`t`
            +- Relation[k#127,v1#128,v2#129] parquet
   
   == Optimized Logical Plan ==
   Aggregate [k#127], [k#127, sum(cast(v1#190 as bigint)) AS sum(v1)#194L, 
avg(cast(v2#191 as bigint)) AS avg(v2)#195]
   +- Project [k#127, if ((v1#128 > 2)) v1#128 else null AS v1#190, if ((v2#129 
< 6)) v2#129 else null AS v2#191]
      +- Relation[k#127,v1#128,v2#129] parquet
   
   == Physical Plan ==
   *(2) HashAggregate(keys=[k#127], functions=[sum(cast(v1#190 as bigint)), 
avg(cast(v2#191 as bigint))], output=[k#127, sum(v1)#194L, avg(v2)#195])
   +- Exchange hashpartitioning(k#127, 200), true, [id=#320]
      +- *(1) HashAggregate(keys=[k#127], functions=[partial_sum(cast(v1#190 as 
bigint)), partial_avg(cast(v2#191 as bigint))], output=[k#127, sum#202L, 
sum#203, count#204L])
         +- *(1) Project [k#127, if ((v1#128 > 2)) v1#128 else null AS v1#190, 
if ((v2#129 < 6)) v2#129 else null AS v2#191]
            +- *(1) ColumnarToRow
               +- FileScan parquet default.t[k#127,v1#128,v2#129] Batched: 
true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t],
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<k:int,v1:int,v2:int>
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to