[ https://issues.apache.org/jira/browse/SPARK-37682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-37682: ------------------------------------ Assignee: (was: Apache Spark) > Reduce memory pressure of RewriteDistinctAggregates > --------------------------------------------------- > > Key: SPARK-37682 > URL: https://issues.apache.org/jira/browse/SPARK-37682 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.2.0 > Reporter: Kevin Liu > Priority: Minor > Labels: performance > > In some cases, current RewriteDistinctAggregates duplicates unnecessary input > data in distinct groups. > This will cause a lot of waste of memory and affects performance. > We could apply 'merged column' and 'bit vector' tricks to alleviate the > problem. For example: > {code:sql} > SELECT > COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_filter_cnt_dist, > COUNT(DISTINCT cat2) FILTER (WHERE id > 2) as cat2_filter_cnt_dist, > COUNT(DISTINCT IF(value > 5, cat1, null)) as cat1_if_cnt_dist, > COUNT(DISTINCT id) as id_cnt_dist, > SUM(DISTINCT value) as id_sum_dist > FROM data > GROUP BY key > {code} > Current rule will rewrite the above sql plan to the following (pseudo) > logical plan: > {noformat} > Aggregate( > key = ['key] > functions = [ > count('cat1) FILTER (WHERE (('gid = 1) AND 'max(id > 1))), > count('(IF((value > 5), cat1, null))) FILTER (WHERE ('gid = 5)), > count('cat2) FILTER (WHERE (('gid = 3) AND 'max(id > 2))), > count('id) FILTER (WHERE ('gid = 2)), > sum('value) FILTER (WHERE ('gid = 4)) > ] > output = ['key, 'cat1_filter_cnt_dist, 'cat2_filter_cnt_dist, > 'cat1_if_cnt_dist, > 'id_cnt_dist, 'id_sum_dist]) > Aggregate( > key = ['key, 'cat1, 'value, 'cat2, '(IF((value > 5), cat1, null)), 'id, > 'gid] > functions = [max('id > 1), max('id > 2)] > output = ['key, 'cat1, 'value, 'cat2, '(IF((value > 5), cat1, null)), > 'id, 'gid, > 'max(id > 1), 'max(id > 2)]) > Expand( > projections = [ > ('key, 'cat1, null, null, null, null, 1, ('id > 1), null), > ('key, null, null, null, null, 'id, 2, null, null), > ('key, null, null, 'cat2, null, null, 3, null, ('id > 2)), > ('key, null, 'value, null, null, null, 4, null, null), > ('key, null, null, null, if (('value > 5)) 'cat1 else null, null, 5, > null, null) > ] > output = ['key, 'cat1, 'value, 'cat2, '(IF((value > 5), cat1, null)), > 'id, > 'gid, '(id > 1), '(id > 2)]) > LocalTableScan [...] > {noformat} > After applying 'merged column' and 'bit vector' tricks, the logical plan will > become: > {noformat} > Aggregate( > key = ['key] > functions = [ > count('merged_string_1) FILTER (WHERE (('gid = 1) AND NOT > (('filter_vector_1 & 1) = 0))) > count('merged_string_1) FILTER (WHERE ('gid = 1)), > count(if (NOT (('if_vector_1 & 1) = 0)) 'merged_string_1 else null) > FILTER (WHERE ('gid = 1)), > count('merged_string_1) FILTER (WHERE (('gid = 2) AND NOT > (('filter_vector_1 & 1) = 0))), > count('merged_integer_1) FILTER (WHERE ('gid = 3)), > sum('merged_integer_1) FILTER (WHERE ('gid = 4)) > ] > output = ['key, 'cat1_filter_cnt_dist, 'cat2_filter_cnt_dist, > 'cat1_if_cnt_dist, > 'id_cnt_dist, 'id_sum_dist]) > Aggregate( > key = ['key, 'merged_string_1, 'merged_integer_1, 'gid] > functions = [bit_or('if_vector_1),bit_or('filter_vector_1)] > output = ['key, 'merged_string_1, 'merged_integer_1, 'gid, > 'bit_or(if_vector_1), 'bit_or(filter_vector_1)]) > Expand( > projections = [ > ('key, 'cat1, null, 1, if ('value > 5) 1 else 0, if ('id > 1) 1 else > 0), > ('key, 'cat2, null, 2, null, if ('id > 2) 1 else 0), > ('key, null, 'id, 3, null, null), > ('key, null, 'value, 4, null, null) > ] > output = ['key, 'merged_string_1, 'merged_integer_1, 'gid, > 'if_vector_1, 'filter_vector_1]) > LocalTableScan [...] > {noformat} > 1. merged column: Children with same datatype from different aggregate > functions can share same project column (e.g. cat1, cat2). > 2. bit vector: If multiple aggregate function children have conditional > expressions, these conditions will output one column when it is true, and > output null when it is false. The detail logic will be in > RewriteDistinctAggregates.groupDistinctAggExpr of the following github link. > Then these aggregate functions can share one row group, and store the results > of their respective conditional expressions in the bit vector column, > reducing the number of rows of data expansion (e.g. cat1_filter_cnt_dist, > cat1_if_cnt_dist). > If there are many similar aggregate functions with or without filter in > distinct, these tricks can save mass memory and improve performance. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org