[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Guo updated SPARK-27792: ------------------------------ Description: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 900000000 AND 1050000000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 950000000 AND 950500000;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! was: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 900000000 AND 1050000000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 950000000 AND 950500000;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join > SkewJoin hint > ------------- > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL > Affects Versions: 2.4.3 > Reporter: Jason Guo > Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 900000000 AND 1050000000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 950000000 AND 950500000;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > !skew join DAG.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org