[ 
https://issues.apache.org/jira/browse/SPARK-32542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

karl wang updated SPARK-32542:
------------------------------
    Description: 
Split an expand into several small Expand, which contains the Specified number 
of projections.
For instance, like this sql.select a, b, c, d, count(1) from table1 group by a, 
b, c, d with cube. It can expand 2^4 times of original data size.
Now we specify the spark.sql.optimizer.projections.size=4.The Expand will be 
split into 2^4/4 smallExpand.It can reduce the shuffle pressure and improve 
performance in multidimensional analysis when data is huge.


  was:
Split an expand into several small Expand, which contains the Specified number 
of projections.
For instance, like this sql.select a, b, c, d, count(1) from table1 group by a, 
b, c, d with cube. It can expand 2^4 times of original data size.
Now we specify the spark.sql.optimizer.projections.size=4.The Expand will be 
split into 2^4/4 smallExpand.It can reduce the shuffle pressure and improve 
performance in multidimensional analysis when data is huge.

runBenchmark("cube multianalysis agg") {
      val N = 20 << 20

      val benchmark = new Benchmark("cube multianalysis agg", N, output = 
output)

      def f(): Unit = {
        val df = spark.range(N).cache()
        df.selectExpr(
          "id",
          "(id & 1023) as k1",
          "cast(id & 1023 as string) as k2",
          "cast(id & 1023 as int) as k3",
          "cast(id & 1023 as double) as k4",
          "cast(id & 1023 as float) as k5")
          .cube("k1", "k2", "k3", "k4", "k5")
          .sum()
          .noop()
        df.unpersist()

      }

      benchmark.addCase("grouping = F") { _ =>
        withSQLConf(
          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
          SQLConf.GROUPING_WITH_UNION.key -> "false") {
          f()
        }
      }

      benchmark.addCase("grouping = T projectionSize= 16") { _ =>
        withSQLConf(
          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
          SQLConf.GROUPING_WITH_UNION.key -> "true",
          SQLConf.GROUPING_EXPAND_PROJECTIONS.key -> "16") {
          f()
        }
      }

      benchmark.addCase("grouping = T projectionSize= 8") { _ =>
        withSQLConf(
          SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
          SQLConf.GROUPING_WITH_UNION.key -> "true",
          SQLConf.GROUPING_EXPAND_PROJECTIONS.key -> "8") {
          f()
        }
      }
      benchmark.run()
    }
Running benchmark: cube multianalysis agg :
cube 5 fields k1, k2, k3, k4, k5
Running case: GROUPING_WITH_UNION off 
Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 16 
Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 8

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15
Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz
cube multianalysis agg:                   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
grouping = F                                      54329          54931         
852          0.4        2590.6       1.0X
grouping = T projectionSize= 16                   44584          44781         
278          0.5        2125.9       1.2X
grouping = T projectionSize= 8                    42764          43272         
718          0.5        2039.1       1.3X
Running benchmark: cube multianalysis agg :
cube 6 fields k1, k2, k3, k4, k5, k6
Running case: GROUPING_WITH_UNION off 
Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 32
Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 16

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15
Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz
cube multianalysis agg:                   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
grouping = F                                     141607         143424        
2569          0.1        6752.4       1.0X
grouping = T projectionSize= 32                  109465         109603         
196          0.2        5219.7       1.3X
grouping = T projectionSize= 16                   99752         100411         
933          0.2        4756.5       1.4X
Running benchmark: cube multianalysis agg :
cube 7 fields k1, k2, k3, k4, k5, k6, k7
Running case: GROUPING_WITH_UNION off 
Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 64
Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 32
Running case: GROUPING_WITH_UNION on and GROUPING_EXPAND_PROJECTIONS = 16

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15
Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz
cube multianalysis agg:                   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
grouping = F                                     516941         519658         
NaN          0.0       24649.7       1.0X
grouping = T projectionSize= 64                  267170         267547         
533          0.1       12739.6       1.9X
grouping = T projectionSize= 32                  252023         255307        
1769          0.1       12017.4       2.1X
grouping = T projectionSize= 16                  235929         236944        
1436          0.1       11250.0       2.2X


> add a batch for optimizing logicalPlan
> --------------------------------------
>
>                 Key: SPARK-32542
>                 URL: https://issues.apache.org/jira/browse/SPARK-32542
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer
>    Affects Versions: 3.0.0
>            Reporter: karl wang
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Split an expand into several small Expand, which contains the Specified 
> number of projections.
> For instance, like this sql.select a, b, c, d, count(1) from table1 group by 
> a, b, c, d with cube. It can expand 2^4 times of original data size.
> Now we specify the spark.sql.optimizer.projections.size=4.The Expand will be 
> split into 2^4/4 smallExpand.It can reduce the shuffle pressure and improve 
> performance in multidimensional analysis when data is huge.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to