[ https://issues.apache.org/jira/browse/SPARK-36443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-36443: ---------------------------------- Parent: (was: SPARK-33828) Issue Type: Bug (was: Sub-task) > Demote BroadcastJoin causes performance regression and increases OOM risks > -------------------------------------------------------------------------- > > Key: SPARK-36443 > URL: https://issues.apache.org/jira/browse/SPARK-36443 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.2 > Reporter: Kent Yao > Priority: Major > Attachments: image-2021-08-06-11-24-34-122.png, > image-2021-08-06-17-57-15-765.png, screenshot-1.png > > > > h2. A test case > Use bin/spark-sql with local mode and all other default settings with 3.1.2 > to run the case below > {code:sql} > // Some comments here > set spark.sql.shuffle.partitions=20; > set spark.sql.adaptive.enabled=true; > -- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- > (default 0.2)enable this for not demote bhj > set spark.sql.autoBroadcastJoinThreshold=200; > SELECT > l.id % 12345 k, > sum(l.id) sum, > count(l.id) cnt, > avg(l.id) avg, > min(l.id) min, > max(l.id) max > from (select id % 3 id from range(0, 1e8, 1, 100)) l > left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) > group by gid) r ON l.id = r.id > GROUP BY 1; > {code} > > 1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out > > | | > ||[Job Id > ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages: > Succeeded/Total||Tasks (for all stages): Succeeded/Total|| > |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06 > 17:31:37|71 ms|1/1 (4 skipped)|3/3 (205 skipped) > | > |3|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=3]|2021/08/06 > 17:31:18|19 s|1/1 (3 skipped)|4/4 (201 skipped) > | > |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06 > 17:31:18|87 ms|1/1 (1 skipped)|1/1 (100 skipped) > | > |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06 > 17:31:16|2 s|1/1|100/100 > | > |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06 > 17:31:15|2 s|1/1|100/100 | > 2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to > demote bhj > > ||[Job Id (Job Group) > ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id+%28Job+Group%29&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages: > Succeeded/Total||Tasks (for all stages): Succeeded/Total|| > |5|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=5]|2021/08/06 > 18:25:15|29 ms|1/1 (2 skipped)|3/3 (200 skipped) > | > |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06 > 18:25:13|2 s|1/1 (1 skipped)|100/100 (100 skipped) > | > |3 (700fefe1-8446-4761-9be2-b68ed6e84c11)|broadcast exchange (runId > 700fefe1-8446-4761-9be2-b68ed6e84c11)[$anonfun$withThreadLocalCaptured$1 at > FutureTask.java:266|http://localhost:4040/jobs/job/?id=3]|2021/08/06 > 18:25:13|54 ms|1/1 (2 skipped)|1/1 (101 skipped) > | > |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06 > 18:25:13|88 ms|1/1 (1 skipped)|1/1 (100 skipped) > | > |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06 > 18:25:10|2 s|1/1|100/100 > | > |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, > min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, > 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, > 100) group by gid) r ON l.id = r.id GROUP BY 1[main at > NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06 > 18:25:10|3 s|1/1|100/100 > | > The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces > highly compressed shuffle map output and 17/20 empty partitions at the > reduced side, where is also the AQE reOptimize point for DynamicJoinSelection. > {code:java} > Exchange > shuffle records written: 100,000,000 > shuffle write time total (min, med, max ) > 891 ms (2 ms, 5 ms, 33 ms ) > records read: 100,000,000 > local bytes read total (min, med, max ) > 10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB ) > fetch wait time total (min, med, max ) > 0 ms (0 ms, 0 ms, 0 ms ) > remote bytes read: 0.0 B > local blocks read: 300 > remote blocks read: 0 > data size total (min, med, max ) > 1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB ) > remote bytes read to disk: 0.0 B > shuffle bytes written total (min, med, max ) > 10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB ) > {code} > > In the case 1), the bhj is demoted and the `coalesce partitions rule` > successfully coalesces these 'small' partitions even set > *spark.sql.adaptive.advisoryPartitionSizeInBytes=1m*. See, > > !screenshot-1.png! > Then, as you can see at the smj phase, the former coalesce and the latter > expansion cause performance regression > > {code:java} > // code placeholder > Sort > sort time total (min, med, max (stageId: taskId)) > 166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203)) > peak memory total (min, med, max (stageId: taskId)) > 315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201)) > spill size total (min, med, max (stageId: taskId)) > 1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201) > {code} > > > |1|202|0|SUCCESS| |driver| | |2021-08-06 17:31:18|18 s|4 s|3.0 ms|10.0 ms| | > |105.3 MiB|1.0 ms|91 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| | > |2|203|0|SUCCESS| |driver| | |2021-08-06 17:31:18|19 s|4 s|4.0 ms|10.0 ms| | > |105.3 MiB|1.0 ms|89 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| | > |0|201|0|SUCCESS| |driver| | |2021-08-06 17:31:18|17 s|4 s|6.0 ms|10.0 ms| | > |105.3 MiB|1.0 ms|70 B / 1|3.3 MiB / 33333334|615 MiB|4.4 MiB| > > In the case 2), the bhj mode increases task numbers which will casue extra > schedule overhead and running unnecessary empty tasks, but it avoid the oom > risk and the performance regression described above. > h2. A real-world case, in which the expansion of the data increases the oom > risk to a very high level. > > > !image-2021-08-06-11-24-34-122.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org