[ https://issues.apache.org/jira/browse/SPARK-32542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
karl wang updated SPARK-32542: ------------------------------ Description: Split an expand into several small Expand, which contains the Specified number of projections. For instance, like this sql.select a, b, c, d, count(1) from table1 group by a, b, c, d with cube. It can expand 2^4 times of original data size. Now we specify the spark.sql.optimizer.projections.size=4.The Expand will be split into 2^4/4 smallExpand.It can reduce the shuffle pressure and improve performance in multidimensional analysis when data is huge. was: Split an expand into several small Expand, which contains the Specified number of projections. For instance, like this sql.select a, b, c, d, count(1) from table1 group by a, b, c, d with cube. It can expand 2^4 times of original data size. Now we specify the spark.sql.optimizer.projections.size=4.The Expand will be split into 2^4/4 smallExpand.It can reduce the shuffle pressure and improve performance in multidimensional analysis when data is huge. runBenchmark("cube multianalysis agg") { val N = 20 << 20 val benchmark = new Benchmark("cube multianalysis agg", N, output = output) def f(): Unit = { val df = spark.range(N).cache() df.selectExpr( "id", "(id & 1023) as k1", "cast(id & 1023 as string) as k2", "cast(id & 1023 as int) as k3", "cast(id & 1023 as double) as k4", "cast(id & 1023 as float) as k5") .cube("k1", "k2", "k3", "k4", "k5") .sum() .noop() df.unpersist() } benchmark.addCase("grouping = F") { _ => withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.GROUPING_WITH_UNION.key -> "false") { f() } } benchmark.addCase("grouping = T projectionSize= 16") { _ => withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.GROUPING_WITH_UNION.key -> "true", SQLConf.GROUPING_EXPAND_PROJECTIONS.key -> "16") { f() } } benchmark.addCase("grouping = T projectionSize= 8") { _ => withSQLConf( SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.GROUPING_WITH_UNION.key -> "true", SQLConf.GROUPING_EXPAND_PROJECTIONS.key -> "8") { f() } } benchmark.run() } Running benchmark: cube multianalysis agg : cube 5 fields k1, k2, k3, k4, k5 Running case: GROUPING_WITH_UNION off Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 16 Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 8 Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15 Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz cube multianalysis agg: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ grouping = F 54329 54931 852 0.4 2590.6 1.0X grouping = T projectionSize= 16 44584 44781 278 0.5 2125.9 1.2X grouping = T projectionSize= 8 42764 43272 718 0.5 2039.1 1.3X Running benchmark: cube multianalysis agg : cube 6 fields k1, k2, k3, k4, k5, k6 Running case: GROUPING_WITH_UNION off Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 32 Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 16 Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15 Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz cube multianalysis agg: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ grouping = F 141607 143424 2569 0.1 6752.4 1.0X grouping = T projectionSize= 32 109465 109603 196 0.2 5219.7 1.3X grouping = T projectionSize= 16 99752 100411 933 0.2 4756.5 1.4X Running benchmark: cube multianalysis agg : cube 7 fields k1, k2, k3, k4, k5, k6, k7 Running case: GROUPING_WITH_UNION off Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 64 Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 32 Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 16 Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15 Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz cube multianalysis agg: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ grouping = F 516941 519658 NaN 0.0 24649.7 1.0X grouping = T projectionSize= 64 267170 267547 533 0.1 12739.6 1.9X grouping = T projectionSize= 32 252023 255307 1769 0.1 12017.4 2.1X grouping = T projectionSize= 16 235929 236944 1436 0.1 11250.0 2.2X > add a batch for optimizing logicalPlan > -------------------------------------- > > Key: SPARK-32542 > URL: https://issues.apache.org/jira/browse/SPARK-32542 > Project: Spark > Issue Type: Improvement > Components: Optimizer > Affects Versions: 3.0.0 > Reporter: karl wang > Priority: Major > Fix For: 3.0.0 > > > Split an expand into several small Expand, which contains the Specified > number of projections. > For instance, like this sql.select a, b, c, d, count(1) from table1 group by > a, b, c, d with cube. It can expand 2^4 times of original data size. > Now we specify the spark.sql.optimizer.projections.size=4.The Expand will be > split into 2^4/4 smallExpand.It can reduce the shuffle pressure and improve > performance in multidimensional analysis when data is huge. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org