[ 
https://issues.apache.org/jira/browse/SPARK-36443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408958#comment-17408958
 ] 

Dongjoon Hyun commented on SPARK-36443:
---------------------------------------

Thank you for the details.

> Demote BroadcastJoin causes performance regression and increases OOM risks
> --------------------------------------------------------------------------
>
>                 Key: SPARK-36443
>                 URL: https://issues.apache.org/jira/browse/SPARK-36443
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Kent Yao
>            Priority: Major
>         Attachments: image-2021-08-06-11-24-34-122.png, 
> image-2021-08-06-17-57-15-765.png, screenshot-1.png
>
>
>  
> h2. A test case
> Use bin/spark-sql with local mode and all other default settings with 3.1.2 
> to run the case below
> {code:sql}
> // Some comments here
> set spark.sql.shuffle.partitions=20;
> set spark.sql.adaptive.enabled=true;
> -- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- 
> (default 0.2)enable this for not demote bhj
> set spark.sql.autoBroadcastJoinThreshold=200;
> SELECT
>   l.id % 12345 k,
>   sum(l.id) sum,
>   count(l.id) cnt,
>   avg(l.id) avg,
>   min(l.id) min,
>   max(l.id) max
> from (select id % 3 id from range(0, 1e8, 1, 100)) l
>   left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) 
> group by gid) r ON l.id = r.id
> GROUP BY 1;
> {code}
>  
>  1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out
>  
> | |
> ||[Job Id 
> ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
>  Succeeded/Total||Tasks (for all stages): Succeeded/Total||
> |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
>  17:31:37|71 ms|1/1 (4 skipped)|3/3 (205 skipped) 
>   |
> |3|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=3]|2021/08/06
>  17:31:18|19 s|1/1 (3 skipped)|4/4 (201 skipped) 
>   |
> |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
>  17:31:18|87 ms|1/1 (1 skipped)|1/1 (100 skipped) 
>   |
> |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
>  17:31:16|2 s|1/1|100/100 
>   |
> |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
>  17:31:15|2 s|1/1|100/100 |
> 2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to 
> demote bhj
>  
> ||[Job Id (Job Group) 
> ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id+%28Job+Group%29&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
>  Succeeded/Total||Tasks (for all stages): Succeeded/Total||
> |5|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=5]|2021/08/06
>  18:25:15|29 ms|1/1 (2 skipped)|3/3 (200 skipped) 
>   |
> |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
>  18:25:13|2 s|1/1 (1 skipped)|100/100 (100 skipped) 
>   |
> |3 (700fefe1-8446-4761-9be2-b68ed6e84c11)|broadcast exchange (runId 
> 700fefe1-8446-4761-9be2-b68ed6e84c11)[$anonfun$withThreadLocalCaptured$1 at 
> FutureTask.java:266|http://localhost:4040/jobs/job/?id=3]|2021/08/06 
> 18:25:13|54 ms|1/1 (2 skipped)|1/1 (101 skipped) 
>   |
> |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
>  18:25:13|88 ms|1/1 (1 skipped)|1/1 (100 skipped) 
>   |
> |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
>  18:25:10|2 s|1/1|100/100 
>   |
> |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
>  18:25:10|3 s|1/1|100/100
>   |
> The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces 
> highly compressed shuffle map output and 17/20 empty partitions at the 
> reduced side, where is also the AQE reOptimize point for DynamicJoinSelection.
> {code:java}
> Exchange
> shuffle records written: 100,000,000
> shuffle write time total (min, med, max )
> 891 ms (2 ms, 5 ms, 33 ms )
> records read: 100,000,000
> local bytes read total (min, med, max )
> 10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB )
> fetch wait time total (min, med, max )
> 0 ms (0 ms, 0 ms, 0 ms )
> remote bytes read: 0.0 B
> local blocks read: 300
> remote blocks read: 0
> data size total (min, med, max )
> 1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB )
> remote bytes read to disk: 0.0 B
> shuffle bytes written total (min, med, max )
> 10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB )
> {code}
>  
> In the case 1), the bhj is demoted and the `coalesce partitions rule` 
> successfully coalesces these 'small' partitions even set 
> *spark.sql.adaptive.advisoryPartitionSizeInBytes=1m*. See,
>  
> !screenshot-1.png!
> Then, as you can see at the smj phase, the former coalesce and the latter 
> expansion cause performance regression
>  
> {code:java}
> // code placeholder
> Sort
> sort time total (min, med, max (stageId: taskId))
> 166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203))
> peak memory total (min, med, max (stageId: taskId))
> 315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201))
> spill size total (min, med, max (stageId: taskId))
> 1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201)
> {code}
>  
>  
> |1|202|0|SUCCESS| |driver| | |2021-08-06 17:31:18|18 s|4 s|3.0 ms|10.0 ms| | 
> |105.3 MiB|1.0 ms|91 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
> |2|203|0|SUCCESS| |driver| | |2021-08-06 17:31:18|19 s|4 s|4.0 ms|10.0 ms| | 
> |105.3 MiB|1.0 ms|89 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
> |0|201|0|SUCCESS| |driver| | |2021-08-06 17:31:18|17 s|4 s|6.0 ms|10.0 ms| | 
> |105.3 MiB|1.0 ms|70 B / 1|3.3 MiB / 33333334|615 MiB|4.4 MiB|
>  
> In the case 2), the bhj mode increases task numbers which will casue extra 
> schedule overhead and running unnecessary empty tasks, but it avoid the oom 
> risk and the performance regression described  above.
> h2. A real-world case, in which the expansion of the data increases the oom 
> risk to a very high level. 
>  
>  
> !image-2021-08-06-11-24-34-122.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to