[jira] [Commented] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006571#comment-17006571 ] Jason Guo commented on SPARK-24906: --- [~lio...@taboola.com] Yes, estimating with sampling would do better than the original proposal for complex schema. The solution we are using is very similar as you proposed. And as you pointed out, it require format specific implementation. We estimate based on row group (the compressed size for each columns) for Parquet and estimate based on stripe(the uncompressed size for each column) for ORC. > Adaptively set split size for columnar file to ensure the task read data size > fit expectation > - > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Major > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29031) Materialized column to accelerate queries
[ https://issues.apache.org/jira/browse/SPARK-29031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930230#comment-16930230 ] Jason Guo commented on SPARK-29031: --- [~lishuming] `Materialized column` is supported in Clickhouse [https://clickhouse.yandex/docs/en/query_language/create/] > Materialized column to accelerate queries > - > > Key: SPARK-29031 > URL: https://issues.apache.org/jira/browse/SPARK-29031 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Guo >Priority: Major > Labels: SPIP > > Goals > * Add a new SQL grammar of Materialized column > * Implicitly rewrite SQL queries on the complex type of columns if there is > a materialized columns for it > * If the data type of the materialized columns is atomic type, even though > the origin column type is in complex type, enable vectorized read and filter > pushdown to improve performance > Example > Create a normal table > {quote}CREATE TABLE x ( > name STRING, > age INT, > params STRING, > event MAP > ) USING parquet; > {quote} > > Add materialized columns to an existing table > {quote}ALTER TABLE x ADD COLUMNS ( > new_age INT MATERIALIZED age + 1, > city STRING MATERIALIZED get_json_object(params, '$.city'), > label STRING MATERIALIZED event['label'] > ); > {quote} > > When issue a query as below > {quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label'] > FROM x > WHER event['label'] = 'newuser'; > {quote} > It's equivalent to > {quote}SELECT name, new_age, city, label > FROM x > WHERE label = 'newuser'; > {quote} > > The query performance improved dramatically because > # The new query (after rewritten) will read the new column city (in string > type) instead of read the whole map of params(in map string). Much lesser > data are need to read > # Vectorized read can be utilized in the new query and can not be used in > the old one. Because vectorized read can only be enabled when all required > columns are in atomic type > # Filter can be pushdown. Only filters on atomic column can be pushdown. The > original filter event['label'] = 'newuser' is on complex column, so it can > not be pushdown. > # The new query do not need to parse JSON any more. JSON parse is a CPU > intensive operation which will impact performance dramatically > > > > > -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29031) Materialized column to accelerate queries
[ https://issues.apache.org/jira/browse/SPARK-29031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-29031: -- Description: Goals * Add a new SQL grammar of Materialized column * Implicitly rewrite SQL queries on the complex type of columns if there is a materialized columns for it * If the data type of the materialized columns is atomic type, even though the origin column type is in complex type, enable vectorized read and filter pushdown to improve performance Example Create a normal table {quote}CREATE TABLE x ( name STRING, age INT, params STRING, event MAP ) USING parquet; {quote} Add materialized columns to an existing table {quote}ALTER TABLE x ADD COLUMNS ( new_age INT MATERIALIZED age + 1, city STRING MATERIALIZED get_json_object(params, '$.city'), label STRING MATERIALIZED event['label'] ); {quote} When issue a query as below {quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label'] FROM x WHER event['label'] = 'newuser'; {quote} It's equivalent to {quote}SELECT name, new_age, city, label FROM x WHERE label = 'newuser'; {quote} The query performance improved dramatically because # The new query (after rewritten) will read the new column city (in string type) instead of read the whole map of params(in map string). Much lesser data are need to read # Vectorized read can be utilized in the new query and can not be used in the old one. Because vectorized read can only be enabled when all required columns are in atomic type # Filter can be pushdown. Only filters on atomic column can be pushdown. The original filter event['label'] = 'newuser' is on complex column, so it can not be pushdown. # The new query do not need to parse JSON any more. JSON parse is a CPU intensive operation which will impact performance dramatically was: Goals * Add a new SQL grammar of Materialized column * Implicitly rewrite SQL queries on the complex type of columns if there is a materialized columns for it * If the data type of the materialized columns is atomic type, even though the origin column type is in complex type, enable vectorized read and filter pushdown to improve performance Example Create a normal table {quote}CREATE TABLE x ( name STRING, age INT, params STRING, event MAP ) USING parquet; {quote} Add materialized columns to an existing table {quote}ALTER TABLE x ADD COLUMNS ( new_age INT MATERIALIZED age + 1, city STRING MATERIALIZED get_json_object(params, '$.city'), label STRING MATERIALIZED event['label'] ); {quote} When issue a query as below {quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label'] FROM x WHER event['label'] = 'newuser'; {quote} It equals to {quote}SELECT name, new_age, city, label FROM x WHERE label = 'newuser'; {quote} The query performance improved dramatically because # The new query (after rewritten) will read the new column city (in string type) instead of read the whole map of params(in map string). Much lesser data are need to read # Vectorized read can be utilized in the new query and can not be used in the old one. Because vectorized read can only be enabled when all required columns are in atomic type # Filter can be pushdown. Only filters on atomic column can be pushdown. The original filter event['label'] = 'newuser' is on complex column, so it can not be pushdown. # The new query do not need to parse JSON any more. JSON parse is a CPU intensive operation which will impact performance dramatically > Materialized column to accelerate queries > - > > Key: SPARK-29031 > URL: https://issues.apache.org/jira/browse/SPARK-29031 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Guo >Priority: Major > Labels: SPIP > > Goals > * Add a new SQL grammar of Materialized column > * Implicitly rewrite SQL queries on the complex type of columns if there is > a materialized columns for it > * If the data type of the materialized columns is atomic type, even though > the origin column type is in complex type, enable vectorized read and filter > pushdown to improve performance > Example > Create a normal table > {quote}CREATE TABLE x ( > name STRING, > age INT, > params STRING, > event MAP > ) USING parquet; > {quote} > > Add materialized columns to an existing table > {quote}ALTER TABLE x ADD COLUMNS ( > new_age INT MATERIALIZED age + 1, > city STRING MATERIALIZED get_json_object(params, '$.city'), > label STRING MATERIALIZED event['label'] > ); > {quote} > > When issue a query as below > {quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label'] > FROM
[jira] [Created] (SPARK-29031) Materialized column to accelerate queries
Jason Guo created SPARK-29031: - Summary: Materialized column to accelerate queries Key: SPARK-29031 URL: https://issues.apache.org/jira/browse/SPARK-29031 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 3.0.0 Reporter: Jason Guo Goals * Add a new SQL grammar of Materialized column * Implicitly rewrite SQL queries on the complex type of columns if there is a materialized columns for it * If the data type of the materialized columns is atomic type, even though the origin column type is in complex type, enable vectorized read and filter pushdown to improve performance Example Create a normal table {quote}CREATE TABLE x ( name STRING, age INT, params STRING, event MAP ) USING parquet; {quote} Add materialized columns to an existing table {quote}ALTER TABLE x ADD COLUMNS ( new_age INT MATERIALIZED age + 1, city STRING MATERIALIZED get_json_object(params, '$.city'), label STRING MATERIALIZED event['label'] ); {quote} When issue a query as below {quote}SELECT name, age+1, get_json_object(params, '$.city'), event['label'] FROM x WHER event['label'] = 'newuser'; {quote} It equals to {quote}SELECT name, new_age, city, label FROM x WHERE label = 'newuser'; {quote} The query performance improved dramatically because # The new query (after rewritten) will read the new column city (in string type) instead of read the whole map of params(in map string). Much lesser data are need to read # Vectorized read can be utilized in the new query and can not be used in the old one. Because vectorized read can only be enabled when all required columns are in atomic type # Filter can be pushdown. Only filters on atomic column can be pushdown. The original filter event['label'] = 'newuser' is on complex column, so it can not be pushdown. # The new query do not need to parse JSON any more. JSON parse is a CPU intensive operation which will impact performance dramatically -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Shepherd: (was: Dongjoon Hyun) > SkewJoin--handle only skewed keys with broadcastjoin and other keys with > normal join > > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, > time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Solution* > * Provide a hint to indicate which keys are skewed keys > * Handle the skewed keys with broadcastjoin and join the non-skewed keys > with normal joint method > * For the small table, the whole table is larger than the broadcast > threshold. But total size of the records with the same keys which is skewed > keys in the big table is smaller than the broadcast threshold, so these > records can be joint with the big table with broadcast join > * For other records with non-skewed keys, they can be joint with normal join > method > * We can get the final result with union the above two parts > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > !sql.png! > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE big_skewed > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE small_even > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select big_skewed.id, tabig_skewed.value, small_even.value > from big_skewed > join small_even > on small_even.id=big_skewed.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > !skew join DAG.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Shepherd: Dongjoon Hyun (was: Liang-Chi Hsieh) > SkewJoin--handle only skewed keys with broadcastjoin and other keys with > normal join > > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, > time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Solution* > * Provide a hint to indicate which keys are skewed keys > * Handle the skewed keys with broadcastjoin and join the non-skewed keys > with normal joint method > * For the small table, the whole table is larger than the broadcast > threshold. But total size of the records with the same keys which is skewed > keys in the big table is smaller than the broadcast threshold, so these > records can be joint with the big table with broadcast join > * For other records with non-skewed keys, they can be joint with normal join > method > * We can get the final result with union the above two parts > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > !sql.png! > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE big_skewed > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE small_even > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select big_skewed.id, tabig_skewed.value, small_even.value > from big_skewed > join small_even > on small_even.id=big_skewed.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > !skew join DAG.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27865) Spark SQL support 1:N sort merge bucket join without shuffle
[ https://issues.apache.org/jira/browse/SPARK-27865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27865: -- Shepherd: Dongjoon Hyun > Spark SQL support 1:N sort merge bucket join without shuffle > > > Key: SPARK-27865 > URL: https://issues.apache.org/jira/browse/SPARK-27865 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > > This feature is to support 1:N bucket join > > Without the feature, only bucketed tables which have the same buckets can be > joint with sort merge bucket join. If two tables have different number of > buckets, shuffle is needed > > This feature make it possible to join two bucketed tables with sort merge > bucket join without shuffle -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27865) Spark SQL support 1:N sort merge bucket join without shuffle
[ https://issues.apache.org/jira/browse/SPARK-27865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27865: -- Summary: Spark SQL support 1:N sort merge bucket join without shuffle (was: Spark SQL support 1:N sort merge bucket join) > Spark SQL support 1:N sort merge bucket join without shuffle > > > Key: SPARK-27865 > URL: https://issues.apache.org/jira/browse/SPARK-27865 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > > This feature is to support 1:N bucket join > > Without the feature, only bucketed tables which have the same buckets can be > joint with sort merge bucket join. If two tables have different number of > buckets, shuffle is needed > > This feature make it possible to join two bucketed tables with sort merge > bucket join without shuffle -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27865) Spark SQL support 1:N sort merge bucket join
Jason Guo created SPARK-27865: - Summary: Spark SQL support 1:N sort merge bucket join Key: SPARK-27865 URL: https://issues.apache.org/jira/browse/SPARK-27865 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.3 Reporter: Jason Guo This feature is to support 1:N bucket join Without the feature, only bucketed tables which have the same buckets can be joint with sort merge bucket join. If two tables have different number of buckets, shuffle is needed This feature make it possible to join two bucketed tables with sort merge bucket join without shuffle -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Description: This feature is designed to handle data skew in Join *Senario* * A big table (big_skewed) which contains a a few skewed key * A small table (small_even) which has no skewed key and is larger than the broadcast threshold * When big_skewed.join(small_even), a few tasks will be much slower than other tasks because they need to handle the skewed key *Solution* * Provide a hint to indicate which keys are skewed keys * Handle the skewed keys with broadcastjoin and join the non-skewed keys with normal joint method * For the small table, the whole table is larger than the broadcast threshold. But total size of the records with the same keys which is skewed keys in the big table is smaller than the broadcast threshold, so these records can be joint with the big table with broadcast join * For other records with non-skewed keys, they can be joint with normal join method * We can get the final result with union the above two parts *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! !sql.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE big_skewed SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE small_even SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select big_skewed.id, tabig_skewed.value, small_even.value from big_skewed join small_even on small_even.id=big_skewed.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! was: This feature is designed to handle data skew in Join *Senario* * A big table (big_skewed) which contains a a few skewed key * A small table (small_even) which has no skewed key and is larger than the broadcast threshold * When big_skewed.join(small_even), a few tasks will be much slower than other tasks because they need to handle the skewed key *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! !sql.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE big_skewed SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE small_even SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select big_skewed.id, tabig_skewed.value, small_even.value from big_skewed join small_even on small_even.id=big_skewed.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! > SkewJoin--handle only skewed keys with broadcastjoin and other keys with > normal join > > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, > time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Solution* > * Provide a hint to indicate which keys are skewed keys > * Handle the skewed keys with broadcastjoin and join the non-skewed keys > with normal
[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Shepherd: Liang-Chi Hsieh > SkewJoin--handle only skewed keys with broadcastjoin and other keys with > normal join > > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, > time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > !sql.png! > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE big_skewed > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE small_even > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select big_skewed.id, tabig_skewed.value, small_even.value > from big_skewed > join small_even > on small_even.id=big_skewed.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > !skew join DAG.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Summary: SkewJoin--handle only skewed keys with broadcastjoin and other keys with normal join (was: SkewJoin hint) > SkewJoin--handle only skewed keys with broadcastjoin and other keys with > normal join > > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, > time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > !sql.png! > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE big_skewed > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE small_even > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select big_skewed.id, tabig_skewed.value, small_even.value > from big_skewed > join small_even > on small_even.id=big_skewed.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > !skew join DAG.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Attachment: sql.png > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, > time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE big_skewed > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE small_even > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select big_skewed.id, tabig_skewed.value, small_even.value > from big_skewed > join small_even > on small_even.id=big_skewed.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > !skew join DAG.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Description: This feature is designed to handle data skew in Join *Senario* * A big table (big_skewed) which contains a a few skewed key * A small table (small_even) which has no skewed key and is larger than the broadcast threshold * When big_skewed.join(small_even), a few tasks will be much slower than other tasks because they need to handle the skewed key *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! !sql.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE big_skewed SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE small_even SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select big_skewed.id, tabig_skewed.value, small_even.value from big_skewed join small_even on small_even.id=big_skewed.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! was: This feature is designed to handle data skew in Join *Senario* * A big table (big_skewed) which contains a a few skewed key * A small table (small_even) which has no skewed key and is larger than the broadcast threshold * When big_skewed.join(small_even), a few tasks will be much slower than other tasks because they need to handle the skewed key *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE big_skewed SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE small_even SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select big_skewed.id, tabig_skewed.value, small_even.value from big_skewed join small_even on small_even.id=big_skewed.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png, > time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > !sql.png! > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE big_skewed > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE small_even > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select big_skewed.id, tabig_skewed.value, small_even.value > from big_skewed > join small_even >
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Description: This feature is designed to handle data skew in Join *Senario* * A big table (big_skewed) which contains a a few skewed key * A small table (small_even) which has no skewed key and is larger than the broadcast threshold * When big_skewed.join(small_even), a few tasks will be much slower than other tasks because they need to handle the skewed key *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE big_skewed SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE small_even SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select big_skewed.id, tabig_skewed.value, small_even.value from big_skewed join small_even on small_even.id=big_skewed.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! was: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (big_skewed) which contains a a few skewed key > * A small table (small_even) which has no skewed key and is larger than the > broadcast threshold > * When big_skewed.join(small_even), a few tasks will be much slower than > other tasks because they need to handle the skewed key > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE big_skewed > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE small_even > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select big_skewed.id, tabig_skewed.value, small_even.value > from big_skewed > join small_even > on small_even.id=big_skewed.id; > {code} > > The sort merge join is slow with 2
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Description: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Effect* This feature reduce the join time from 5.7 minutes to 2.1 minutes !time.png! *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join !skew join DAG.png! was: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > *Effect* > This feature reduce the join time from 5.7 minutes to 2.1 minutes > !time.png! > > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > !skew
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Attachment: time.png skew join DAG.png > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, time.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Description: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Experiment* *Without this feature, the whole job took 5.7 minutes* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! !SMJ tasks.png! *With this feature, the job took only 2.1 minutes* The skewed keys are joint with broadcast join and the non-skewed keys are joint with sort merge join was: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Experiment* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > > *Experiment* > *Without this feature, the whole job took 5.7 minutes* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > !SMJ tasks.png! > > *With this feature, the job took only 2.1 minutes* > The skewed keys are joint with broadcast join and the non-skewed keys are > joint with sort merge join > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Attachment: SMJ tasks.png > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png, SMJ tasks.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > > *Experiment* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Attachment: SMJ DAG.png > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > > *Experiment* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > !image-2019-05-21-20-02-57-056.png! > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Description: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Experiment* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} The sort merge join is slow with 2 straggle tasks !SMJ DAG.png! was: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Experiment* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} !image-2019-05-21-20-02-57-056.png! > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > Attachments: SMJ DAG.png > > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > > *Experiment* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > The sort merge join is slow with 2 straggle tasks > !SMJ DAG.png! > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27792) SkewJoin hint
[ https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-27792: -- Description: This feature is designed to handle data skew in Join *Senario* * A big table (tableA) which contains a a few skewed key * A small table (tableB) which has no skewed key and is larger than the broadcast threshold * When tableA.join(tableB), a few tasks will be much slower than other tasks because they need to handle the skewed key *Experiment* tableA has 2 skewed keys 9500048 and 9500096 {code:java} INSERT OVERWRITE TABLE tableA SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), 'A' name FROM ids WHERE id BETWEEN 9 AND 105000;{code} tableB has no skewed keys {code:java} INSERT OVERWRITE TABLE tableB SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' name FROM ids WHERE id BETWEEN 95000 AND 95050;{code} Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 {code:java} insert overwrite table result_with_skew select tableA.id, tableA.value, tableB.value from tableA join tableB on tableA.id=tableB.id; {code} !image-2019-05-21-20-02-57-056.png! > SkewJoin hint > - > > Key: SPARK-27792 > URL: https://issues.apache.org/jira/browse/SPARK-27792 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jason Guo >Priority: Major > > This feature is designed to handle data skew in Join > > *Senario* > * A big table (tableA) which contains a a few skewed key > * A small table (tableB) which has no skewed key and is larger than the > broadcast threshold > * When tableA.join(tableB), a few tasks will be much slower than other tasks > because they need to handle the skewed key > > *Experiment* > tableA has 2 skewed keys 9500048 and 9500096 > {code:java} > INSERT OVERWRITE TABLE tableA > SELECT CAST(CASE WHEN id < 90800 THEN (950 + (CAST (RAND() * 2 AS > INT) + 1) * 48 ) > ELSE CAST(id/100 AS INT) END AS STRING), 'A' > name > FROM ids > WHERE id BETWEEN 9 AND 105000;{code} > tableB has no skewed keys > {code:java} > INSERT OVERWRITE TABLE tableB > SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B' > name > FROM ids > WHERE id BETWEEN 95000 AND 95050;{code} > > Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000 > {code:java} > insert overwrite table result_with_skew > select tableA.id, tableA.value, tableB.value > from tableA > join tableB > on tableA.id=tableB.id; > {code} > > !image-2019-05-21-20-02-57-056.png! > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27792) SkewJoin hint
Jason Guo created SPARK-27792: - Summary: SkewJoin hint Key: SPARK-27792 URL: https://issues.apache.org/jira/browse/SPARK-27792 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.3 Reporter: Jason Guo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571027#comment-16571027 ] Jason Guo commented on SPARK-25038: --- [~hyukjin.kwon] Gotcha I will create a PR for this today > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Major > Attachments: issue sql optimized.png, issue sql original.png, job > start optimized.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > Before optimization, it takes 2 minutes and 9 seconds to generate the Job > > The SQL is issued at 2018-08-07 09:07:41 > !issue sql original.png! > However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 > seconds later than the SQL issue time > !job start original.png! > > After the optimization, it takes only 4 seconds to generate the Job > The SQL is issued at 2018-08-07 09:20:15 > !issue sql optimized.png! > > And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later > than the SQL issue time > !job start optimized.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Description: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} Before optimization, it takes 2 minutes and 9 seconds to generate the Job The SQL is issued at 2018-08-07 09:07:41 !issue sql original.png! However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 seconds later than the SQL issue time !job start original.png! After the optimization, it takes only 4 seconds to generate the Job The SQL is issued at 2018-08-07 09:20:15 !issue sql optimized.png! And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later than the SQL issue time !job start optimized.png! was: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-07 08:43:48 !issue sql original.png! However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 seconds later than the SQL issue time !job start original.png! > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: issue sql optimized.png, issue sql original.png, job > start optimized.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > Before optimization, it takes 2 minutes and 9 seconds to generate the Job > > The SQL is issued at 2018-08-07 09:07:41 > !issue sql original.png! > However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 > seconds later than the SQL issue time > !job start original.png! > > After the optimization, it takes only 4 seconds to generate the Job > The SQL is issued at 2018-08-07 09:20:15 > !issue sql optimized.png! > > And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later > than the SQL issue time > !job start optimized.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: (was: job start original.png) > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: issue sql optimized.png, issue sql original.png, job > start optimized.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !issue sql original.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !job start original.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: (was: issue sql original.png) > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: issue sql optimized.png, issue sql original.png, job > start optimized.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !issue sql original.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !job start original.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: job start original.png job start optimized.png issue sql original.png issue sql optimized.png > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: issue sql optimized.png, issue sql original.png, job > start optimized.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !issue sql original.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !job start original.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Description: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-07 08:43:48 !issue sql original.png! However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 seconds later than the SQL issue time !job start original.png! was: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-07 08:43:48 !image-2018-08-07-08-52-00-558.png! However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 seconds later than the SQL issue time !image-2018-08-07-08-52-09-648.png! > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: issue sql original.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !issue sql original.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !job start original.png! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: issue sql original.png > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: issue sql original.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !image-2018-08-07-08-52-00-558.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !image-2018-08-07-08-52-09-648.png! > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: job start original.png > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: issue sql original.png, job start original.png > > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !image-2018-08-07-08-52-00-558.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !image-2018-08-07-08-52-09-648.png! > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: start.png issue.png > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !image-2018-08-07-08-52-00-558.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !image-2018-08-07-08-52-09-648.png! > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: (was: start.png) > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !image-2018-08-07-08-52-00-558.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !image-2018-08-07-08-52-09-648.png! > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Attachment: (was: issue.png) > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !image-2018-08-07-08-52-00-558.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !image-2018-08-07-08-52-09-648.png! > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Description: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-07 08:43:48 !image-2018-08-07-08-52-00-558.png! However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 seconds later than the SQL issue time !image-2018-08-07-08-52-09-648.png! was: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-07 08:43:48 !image-2018-08-07-08-48-28-753.png! However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 seconds later than the SQL issue time !image-2018-08-07-08-47-06-321.png! > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !image-2018-08-07-08-52-00-558.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !image-2018-08-07-08-52-09-648.png! > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
[ https://issues.apache.org/jira/browse/SPARK-25038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-25038: -- Description: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-07 08:43:48 !image-2018-08-07-08-48-28-753.png! However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and 17 seconds later than the SQL issue time !image-2018-08-07-08-47-06-321.png! was: When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-05 18:33:21 !image-2018-08-07-08-38-01-984.png! However, the job is submitted at 2018-08-05 18:34:45, which is 1minutes and 24 seconds later than the SQL issue time > Accelerate Spark Plan generation when Spark SQL read large amount of data > - > > Key: SPARK-25038 > URL: https://issues.apache.org/jira/browse/SPARK-25038 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > > When Spark SQL read large amount of data, it take a long time (more than 10 > minutes) to generate physical Plan and then ActiveJob > > Example: > There is a table which is partitioned by date and hour. There are more than > 13 TB data each hour and 185 TB per day. When we just issue a very simple > SQL, it take a long time to generate ActiveJob > > The SQL statement is > {code:java} > select count(device_id) from test_tbl where date=20180731 and hour='21'; > {code} > > The SQL is issued at 2018-08-07 08:43:48 > !image-2018-08-07-08-48-28-753.png! > However, the job is submitted at 2018-08-07 08:46:05, which is 2minutes and > 17 seconds later than the SQL issue time > !image-2018-08-07-08-47-06-321.png! > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25038) Accelerate Spark Plan generation when Spark SQL read large amount of data
Jason Guo created SPARK-25038: - Summary: Accelerate Spark Plan generation when Spark SQL read large amount of data Key: SPARK-25038 URL: https://issues.apache.org/jira/browse/SPARK-25038 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.1 Reporter: Jason Guo When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob Example: There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob The SQL statement is {code:java} select count(device_id) from test_tbl where date=20180731 and hour='21'; {code} The SQL is issued at 2018-08-05 18:33:21 !image-2018-08-07-08-38-01-984.png! However, the job is submitted at 2018-08-05 18:34:45, which is 1minutes and 24 seconds later than the SQL issue time -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-24906: -- Summary: Adaptively set split size for columnar file to ensure the task read data size fit expectation (was: Enlarge split size for columnar file to ensure the task read enough data) > Adaptively set split size for columnar file to ensure the task read data size > fit expectation > - > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556413#comment-16556413 ] Jason Guo commented on SPARK-24906: --- [~maropu] [~viirya] What do you think about this idea ? > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972 ] Jason Guo edited comment on SPARK-24906 at 7/25/18 6:09 AM: Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our cluster 1. When this will be enabled? There is a configuration item named {code:java} spark.sql.parquet.adaptiveFileSplit=false{code} Only when this is enabled, DataSourceScanExec will enlarge the mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc query will set it to true) With this configuration, user will know that spark will adjust the partition / split size adaptively. If user do not want to use this, he or she can disable it 2. How to calculate maxPartitionBytes and openCostInBytes Different data type has different length (calculated with DataType.defaultSize). First we get the total size of the whole table (henceforth referred to as the “T”). Then we get the total size of all the requiredSchema (henceforth referred to as the “R”). The multiplier should be T / R .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R times. was (Author: habren): Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our cluster (more than 40 thousands nodes in total and more than 10 thousands nodes for a single cluster). 1. When this will be enabled? There is a configuration item named {code:java} spark.sql.parquet.adaptiveFileSplit=false{code} Only when this is enabled, DataSourceScanExec will enlarge the mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc query will set it to true) With this configuration, user will know that spark will adjust the partition / split size adaptively. If user do not want to use this, he or she can disable it 2. How to calculate maxPartitionBytes and openCostInBytes Different data type has different length (calculated with DataType.defaultSize). First we get the total size of the whole table (henceforth referred to as the “T”). Then we get the total size of all the requiredSchema (henceforth referred to as the “R”). The multiplier should be T / R .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R times. > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972 ] Jason Guo edited comment on SPARK-24906 at 7/25/18 1:03 AM: Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our cluster (more than 40 thousands nodes in total and more than 10 thousands nodes for a single cluster). 1. When this will be enabled? There is a configuration item named {code:java} spark.sql.parquet.adaptiveFileSplit=false{code} Only when this is enabled, DataSourceScanExec will enlarge the mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc query will set it to true) With this configuration, user will know that spark will adjust the partition / split size adaptively. If user do not want to use this, he or she can disable it 2. How to calculate maxPartitionBytes and openCostInBytes Different data type has different length (calculated with DataType.defaultSize). First we get the total size of the whole table (henceforth referred to as the “T”). Then we get the total size of all the requiredSchema (henceforth referred to as the “R”). The multiplier should be T / R .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R times. was (Author: habren): Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our cluster (more than 40 thousands nodes in total and more than 10 thousands nodes for a single cluster). 1. When this is enabled There is a configuration item named {code:java} spark.sql.parquet.adaptiveFileSplit=false{code} Only when this is enabled, DataSourceScanExec will enlarge the mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc query will set it to true) 2. How to calculate maxPartitionBytes and openCostInBytes Different data type has different length (calculated with DataType.defaultSize). First we get the total size of the whole table (henceforth referred to as the “T”). Then we get the total size of all the requiredSchema (henceforth referred to as the “R”). The multiplier should be T / R .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R times. > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972 ] Jason Guo commented on SPARK-24906: --- Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our cluster (more than 40 thousands nodes in total and more than 10 thousands nodes for a single cluster). 1. When this is enabled There is a configuration item named {code:java} spark.sql.parquet.adaptiveFileSplit=false{code} Only when this is enabled, DataSourceScanExec will enlarge the mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc query will set it to true) 2. How to calculate maxPartitionBytes and openCostInBytes Different data type has different length (calculated with DataType.defaultSize). First we get the total size of the whole table (henceforth referred to as the “T”). Then we get the total size of all the requiredSchema (henceforth referred to as the “R”). The multiplier should be T / R .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R times. > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-24906: -- Description: For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns. In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure. Here is the test The table named test2 has more than 40 columns and there are more than 5 TB data each hour. When we issue a very simple query {code:java} select count(device_id) from test2 where date=20180708 and hour='23'{code} There are 72176 tasks and the duration of the job is 4.8 minutes !image-2018-07-24-20-26-32-441.png! Most tasks last less than 1 second and read less than 1.5 MB data !image-2018-07-24-20-28-06-269.png! After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster. !image-2018-07-24-20-29-24-797.png! The median of read data is 44.2MB. !image-2018-07-24-20-30-24-552.png! was: For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure. > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. >
[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-24906: -- Attachment: image-2018-07-24-20-30-24-552.png > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-24906: -- Attachment: image-2018-07-24-20-29-24-797.png > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Guo updated SPARK-24906: -- Attachment: image-2018-07-24-20-28-06-269.png > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org