maropu commented on a change in pull request #27058: [SPARK-30276][SQL] Support 
Filter expression allows simultaneous use of DISTINCT
URL: https://github.com/apache/spark/pull/27058#discussion_r369536759
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
 ##########
 @@ -118,7 +118,64 @@ import org.apache.spark.sql.types.IntegerType
  *       LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ *     COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ *     COUNT(DISTINCT cat2) as cat2_cnt,
+ *     SUM(value) AS total
+ *  FROM
+ *    data
+ *  GROUP BY
+ *    key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *    key = ['key]
+ *    functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ *                 COUNT(DISTINCT 'cat2),
+ *                 sum('value)]
+ *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *      key = ['key]
+ *      functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *                   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *                   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *      output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *     Aggregate(
+ *        key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *        functions = [sum('value)]
+ *        output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *       Expand(
+ *           projections = [('key, null, null, 0, 'value),
+ *                         ('key, '_gen_distinct_1, null, 1, null),
+ *                         ('key, null, '_gen_distinct_2, 2, null)]
+ *           output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ *         Expand(
+ *            projections = [('key, if ('id > 1) 'cat1 else null, 'cat2, 
cast('value as bigint))]
+ *            output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'value])
+ *           LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows.
+ *
+ * In the first phase, expands distinct aggregates which exists filter clause:
+ * 1. Guaranteed to compute filter clauses in the first aggregate locally.
+ * 2. The attributes referenced by different distinct aggregate expressions 
are likely to overlap,
+ *    and if no additional processing is performed, data loss will occur. To 
prevent this, we
+ *    generate new attributes and replace the original ones.
+ * 3. If we apply the first phase to distinct aggregate expressions which 
exists filter
+ *    clause, the aggregate after expand may have at least two distinct 
aggregates, so we need to
+ *    apply the second phase too. Please refer to the second phase for more 
details.
+ *
+ * In the second phase, rewrite when aggregate exists at least two distinct 
aggregates:
  * 1. Expand the data. There are three aggregation groups in this query:
 
 Review comment:
   Really? I think the statements below point to the second query: 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala#L78-L119
   ```
    *    i. the non-distinct group;
    *    ii. the distinct 'cat1 group;
    *    iii. the distinct 'cat2 group.
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to